mirror of
https://github.com/containerd/containerd.git
synced 2026-07-01 04:08:10 +00:00
Add server plugins for grpc and ttrpc
Migrate configuration and move server initialization Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
@@ -30,6 +30,8 @@ import (
|
||||
_ "github.com/containerd/containerd/v2/plugins/restart"
|
||||
_ "github.com/containerd/containerd/v2/plugins/sandbox"
|
||||
_ "github.com/containerd/containerd/v2/plugins/server/debug"
|
||||
_ "github.com/containerd/containerd/v2/plugins/server/grpc"
|
||||
_ "github.com/containerd/containerd/v2/plugins/server/ttrpc"
|
||||
_ "github.com/containerd/containerd/v2/plugins/services/containers"
|
||||
_ "github.com/containerd/containerd/v2/plugins/services/content"
|
||||
_ "github.com/containerd/containerd/v2/plugins/services/diff"
|
||||
|
||||
@@ -125,18 +125,20 @@ func dumpConfig(cliContext *cli.Context) error {
|
||||
|
||||
func platformAgnosticDefaultConfig() *srvconfig.Config {
|
||||
return &srvconfig.Config{
|
||||
Version: version.ConfigVersion,
|
||||
Root: defaults.DefaultRootDir,
|
||||
State: defaults.DefaultStateDir,
|
||||
GRPC: srvconfig.GRPCConfig{
|
||||
Address: defaults.DefaultAddress,
|
||||
MaxRecvMsgSize: defaults.DefaultMaxRecvMsgSize,
|
||||
MaxSendMsgSize: defaults.DefaultMaxSendMsgSize,
|
||||
},
|
||||
Version: version.ConfigVersion,
|
||||
Root: defaults.DefaultRootDir,
|
||||
State: defaults.DefaultStateDir,
|
||||
DisabledPlugins: []string{},
|
||||
RequiredPlugins: []string{},
|
||||
StreamProcessors: streamProcessors(),
|
||||
Imports: []string{defaults.DefaultConfigIncludePattern},
|
||||
Plugins: map[string]any{
|
||||
"io.containerd.server.v1.grpc": map[string]any{
|
||||
"address": defaults.DefaultAddress,
|
||||
"max_recv_message_size": defaults.DefaultMaxRecvMsgSize,
|
||||
"max_send_message_size": defaults.DefaultMaxSendMsgSize,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,6 @@ import (
|
||||
_ "github.com/containerd/containerd/v2/core/metrics" // import containerd build info
|
||||
"github.com/containerd/containerd/v2/core/mount"
|
||||
"github.com/containerd/containerd/v2/defaults"
|
||||
"github.com/containerd/containerd/v2/pkg/sys"
|
||||
"github.com/containerd/containerd/v2/pkg/tracing"
|
||||
"github.com/containerd/containerd/v2/version"
|
||||
"github.com/containerd/errdefs"
|
||||
@@ -165,16 +164,6 @@ can be used and modified as necessary as a custom configuration.`
|
||||
return err
|
||||
}
|
||||
|
||||
if config.GRPC.Address == "" {
|
||||
return fmt.Errorf("grpc address cannot be empty: %w", errdefs.ErrInvalidArgument)
|
||||
}
|
||||
if config.TTRPC.Address == "" {
|
||||
// If TTRPC was not explicitly configured, use defaults based on GRPC.
|
||||
config.TTRPC.Address = config.GRPC.Address + ".ttrpc"
|
||||
config.TTRPC.UID = config.GRPC.UID
|
||||
config.TTRPC.GID = config.GRPC.GID
|
||||
}
|
||||
|
||||
// Make sure top-level directories are created early.
|
||||
if err := server.CreateTopLevelDirectories(config); err != nil {
|
||||
return err
|
||||
@@ -229,6 +218,7 @@ can be used and modified as necessary as a custom configuration.`
|
||||
go func() {
|
||||
defer close(chsrv)
|
||||
|
||||
// TODO: When to set grpc address from flag? Migration should be done first
|
||||
server, err := server.New(ctx, config)
|
||||
if err != nil {
|
||||
select {
|
||||
@@ -270,6 +260,8 @@ can be used and modified as necessary as a custom configuration.`
|
||||
if err := server.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: Move to server.Start(ctx) error
|
||||
if config.Metrics.Address != "" {
|
||||
l, err := net.Listen("tcp", config.Metrics.Address)
|
||||
if err != nil {
|
||||
@@ -277,26 +269,8 @@ can be used and modified as necessary as a custom configuration.`
|
||||
}
|
||||
serve(ctx, l, server.ServeMetrics)
|
||||
}
|
||||
// setup the ttrpc endpoint
|
||||
tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err)
|
||||
}
|
||||
serve(ctx, tl, server.ServeTTRPC)
|
||||
|
||||
if config.GRPC.TCPAddress != "" {
|
||||
l, err := net.Listen("tcp", config.GRPC.TCPAddress)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err)
|
||||
}
|
||||
serve(ctx, l, server.ServeTCP)
|
||||
}
|
||||
// setup the main grpc endpoint
|
||||
l, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get listener for main endpoint: %w", err)
|
||||
}
|
||||
serve(ctx, l, server.ServeGRPC)
|
||||
// end s.Start
|
||||
|
||||
readyC := make(chan struct{})
|
||||
go func() {
|
||||
@@ -351,10 +325,6 @@ func applyFlags(cliContext *cli.Context, config *srvconfig.Config) error {
|
||||
name: "state",
|
||||
d: &config.State,
|
||||
},
|
||||
{
|
||||
name: "address",
|
||||
d: &config.GRPC.Address,
|
||||
},
|
||||
} {
|
||||
if s := cliContext.String(v.name); s != "" {
|
||||
*v.d = s
|
||||
@@ -368,6 +338,35 @@ func applyFlags(cliContext *cli.Context, config *srvconfig.Config) error {
|
||||
}
|
||||
}
|
||||
|
||||
if s := cliContext.String("address"); s != "" {
|
||||
var (
|
||||
grpcConfig map[string]any
|
||||
ttrpcConfig map[string]any
|
||||
)
|
||||
v, ok := config.Plugins["io.containerd.server.v1.grpc"]
|
||||
if !ok {
|
||||
grpcConfig = make(map[string]any)
|
||||
} else if grpcConfig, ok = v.(map[string]any); !ok {
|
||||
return fmt.Errorf("grpc plugin has invalid configuration: %w", errdefs.ErrInvalidArgument)
|
||||
}
|
||||
grpcConfig["address"] = s
|
||||
config.Plugins["io.containerd.server.v1.grpc"] = grpcConfig
|
||||
|
||||
_, ok = config.Plugins["io.containerd.server.v1.ttrpc"]
|
||||
if !ok {
|
||||
ttrpcConfig = map[string]any{
|
||||
"address": s + ".ttrpc",
|
||||
}
|
||||
if uid, ok := grpcConfig["uid"]; ok {
|
||||
ttrpcConfig["uid"] = uid
|
||||
}
|
||||
if gid, ok := grpcConfig["gid"]; ok {
|
||||
ttrpcConfig["gid"] = gid
|
||||
}
|
||||
config.Plugins["io.containerd.server.v1.ttrpc"] = ttrpcConfig
|
||||
}
|
||||
}
|
||||
|
||||
applyPlatformFlags(cliContext)
|
||||
|
||||
return nil
|
||||
|
||||
@@ -65,10 +65,6 @@ type Config struct {
|
||||
State string `toml:"state"`
|
||||
// TempDir is the path to a directory where to place containerd temporary files
|
||||
TempDir string `toml:"temp"`
|
||||
// GRPC configuration settings
|
||||
GRPC GRPCConfig `toml:"grpc"`
|
||||
// TTRPC configuration settings
|
||||
TTRPC TTRPCConfig `toml:"ttrpc"`
|
||||
// Debug log settings
|
||||
Debug Debug `toml:"debug"`
|
||||
// Metrics and monitoring settings
|
||||
@@ -95,6 +91,19 @@ type Config struct {
|
||||
Imports []string `toml:"imports"`
|
||||
// StreamProcessors configuration
|
||||
StreamProcessors map[string]StreamProcessor `toml:"stream_processors"`
|
||||
|
||||
// Deprecated fields must remain but should not be output when generating default or migrated configs.
|
||||
// These fields are automatically migrated to the corresponding server plugin
|
||||
// configuration blocks on startup (see serviceMigrate). In version 4 configs,
|
||||
// server settings are configured directly under [plugins."<plugin-id>"].
|
||||
|
||||
// Deprecated: use server plugins io.containerd.server.v1.grpc and io.containerd.server.v1.grpc-tcp
|
||||
GRPC GRPCConfig `toml:"grpc,omitempty"`
|
||||
// Deprecated: use server plugin io.containerd.server.v1.ttrpc.
|
||||
// In configs prior to version 4, an unset TTRPC address is derived from
|
||||
// the GRPC address (grpcAddress + ".ttrpc") and inherits GRPC's UID/GID.
|
||||
// In version 4, the TTRPC plugin uses its own defaults independently.
|
||||
TTRPC TTRPCConfig `toml:"ttrpc,omitempty"`
|
||||
}
|
||||
|
||||
// StreamProcessor provides configuration for diff content processors
|
||||
@@ -206,6 +215,15 @@ func v1Migrate(ctx context.Context, c *Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// serviceMigrate moves server properties (GRPC, TTRPC, Debug, Metrics) from
|
||||
// top-level config fields into their corresponding plugin configuration blocks
|
||||
// for version 4.
|
||||
//
|
||||
// For configs prior to version 4, if the TTRPC address is not explicitly set
|
||||
// it is derived from the GRPC address (grpcAddress + ".ttrpc") and inherits
|
||||
// GRPC's UID/GID. In version 4, each server plugin is independently
|
||||
// configured; the TTRPC plugin will use its own default address if its
|
||||
// plugin block is omitted, regardless of the GRPC plugin's address.
|
||||
func serviceMigrate(ctx context.Context, c *Config) error {
|
||||
if c.Debug.Address != "" && c.Plugins["io.containerd.server.v1.debug"] == nil {
|
||||
c.Plugins["io.containerd.server.v1.debug"] = map[string]any{
|
||||
@@ -217,28 +235,94 @@ func serviceMigrate(ctx context.Context, c *Config) error {
|
||||
c.Debug.UID = 0
|
||||
c.Debug.GID = 0
|
||||
}
|
||||
// Capture legacy GRPC values up front so both grpc and grpc-tcp
|
||||
// migrations see the same values, and so migrations that don't key on
|
||||
// an address (uid/gid/max message sizes) are not dropped.
|
||||
grpcAddress := c.GRPC.Address
|
||||
grpcUID := c.GRPC.UID
|
||||
grpcGID := c.GRPC.GID
|
||||
grpcMaxRecv := c.GRPC.MaxRecvMsgSize
|
||||
grpcMaxSend := c.GRPC.MaxSendMsgSize
|
||||
grpcHasLegacy := grpcAddress != "" || grpcUID != 0 || grpcGID != 0 || grpcMaxRecv != 0 || grpcMaxSend != 0
|
||||
if grpcHasLegacy && c.Plugins["io.containerd.server.v1.grpc"] == nil {
|
||||
c.Plugins["io.containerd.server.v1.grpc"] = map[string]any{
|
||||
"address": grpcAddress,
|
||||
"uid": grpcUID,
|
||||
"gid": grpcGID,
|
||||
"max_recv_message_size": grpcMaxRecv,
|
||||
"max_send_message_size": grpcMaxSend,
|
||||
}
|
||||
}
|
||||
if c.GRPC.TCPAddress != "" && c.Plugins["io.containerd.server.v1.grpc-tcp"] == nil {
|
||||
c.Plugins["io.containerd.server.v1.grpc-tcp"] = map[string]any{
|
||||
"address": c.GRPC.TCPAddress,
|
||||
"tls_ca": c.GRPC.TCPTLSCA,
|
||||
"tls_cert": c.GRPC.TCPTLSCert,
|
||||
"tls_key": c.GRPC.TCPTLSKey,
|
||||
"tls_common_name": c.GRPC.TCPTLSCName,
|
||||
"max_recv_message_size": grpcMaxRecv,
|
||||
"max_send_message_size": grpcMaxSend,
|
||||
}
|
||||
}
|
||||
if grpcHasLegacy || c.GRPC.TCPAddress != "" {
|
||||
c.GRPC = GRPCConfig{}
|
||||
}
|
||||
if c.Plugins["io.containerd.server.v1.ttrpc"] == nil {
|
||||
ttrpcAddress := c.TTRPC.Address
|
||||
ttrpcUID := c.TTRPC.UID
|
||||
ttrpcGID := c.TTRPC.GID
|
||||
if ttrpcAddress == "" && grpcAddress != "" {
|
||||
ttrpcAddress = grpcAddress + ".ttrpc"
|
||||
if ttrpcUID == 0 {
|
||||
ttrpcUID = grpcUID
|
||||
}
|
||||
if ttrpcGID == 0 {
|
||||
ttrpcGID = grpcGID
|
||||
}
|
||||
}
|
||||
if ttrpcAddress != "" || ttrpcUID != 0 || ttrpcGID != 0 {
|
||||
c.Plugins["io.containerd.server.v1.ttrpc"] = map[string]any{
|
||||
"address": ttrpcAddress,
|
||||
"uid": ttrpcUID,
|
||||
"gid": ttrpcGID,
|
||||
}
|
||||
c.TTRPC.Address = ""
|
||||
c.TTRPC.UID = 0
|
||||
c.TTRPC.GID = 0
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GRPCConfig provides GRPC configuration for the socket
|
||||
type GRPCConfig struct {
|
||||
Address string `toml:"address"`
|
||||
TCPAddress string `toml:"tcp_address"`
|
||||
TCPTLSCA string `toml:"tcp_tls_ca"`
|
||||
TCPTLSCert string `toml:"tcp_tls_cert"`
|
||||
TCPTLSKey string `toml:"tcp_tls_key"`
|
||||
UID int `toml:"uid"`
|
||||
GID int `toml:"gid"`
|
||||
MaxRecvMsgSize int `toml:"max_recv_message_size"`
|
||||
MaxSendMsgSize int `toml:"max_send_message_size"`
|
||||
TCPTLSCName string `toml:"tcp_tls_common_name"`
|
||||
// Deprecated: use server plugin io.containerd.server.v1.grpc
|
||||
Address string `toml:"address,omitempty"`
|
||||
// Deprecated: use server plugin io.containerd.server.v1.grpc-tcp
|
||||
TCPAddress string `toml:"tcp_address,omitempty"`
|
||||
// Deprecated: use server plugin io.containerd.server.v1.grpc-tcp
|
||||
TCPTLSCA string `toml:"tcp_tls_ca,omitempty"`
|
||||
// Deprecated: use server plugin io.containerd.server.v1.grpc-tcp
|
||||
TCPTLSCert string `toml:"tcp_tls_cert,omitempty"`
|
||||
// Deprecated: use server plugin io.containerd.server.v1.grpc-tcp
|
||||
TCPTLSKey string `toml:"tcp_tls_key,omitempty"`
|
||||
// Deprecated: use server plugin io.containerd.server.v1.grpc
|
||||
UID int `toml:"uid,omitempty"`
|
||||
// Deprecated: use server plugin io.containerd.server.v1.grpc
|
||||
GID int `toml:"gid,omitempty"`
|
||||
// Deprecated: use server plugin io.containerd.server.v1.grpc
|
||||
MaxRecvMsgSize int `toml:"max_recv_message_size,omitempty"`
|
||||
// Deprecated: use server plugin io.containerd.server.v1.grpc
|
||||
MaxSendMsgSize int `toml:"max_send_message_size,omitempty"`
|
||||
// Deprecated: use server plugin io.containerd.server.v1.grpc-tcp
|
||||
TCPTLSCName string `toml:"tcp_tls_common_name,omitempty"`
|
||||
}
|
||||
|
||||
// TTRPCConfig provides TTRPC configuration for the socket
|
||||
type TTRPCConfig struct {
|
||||
Address string `toml:"address"`
|
||||
UID int `toml:"uid"`
|
||||
GID int `toml:"gid"`
|
||||
Address string `toml:"address,omitempty"`
|
||||
UID int `toml:"uid,omitempty"`
|
||||
GID int `toml:"gid,omitempty"`
|
||||
}
|
||||
|
||||
// Debug provides debug configuration
|
||||
@@ -258,8 +342,9 @@ type Debug struct {
|
||||
|
||||
// MetricsConfig provides metrics configuration
|
||||
type MetricsConfig struct {
|
||||
Address string `toml:"address"`
|
||||
GRPCHistogram bool `toml:"grpc_histogram"`
|
||||
Address string `toml:"address"`
|
||||
// Deprecated: use metrics plugin io.containerd.metrics.v1.grpc-prometheus
|
||||
GRPCHistogram bool `toml:"grpc_histogram,omitempty"`
|
||||
}
|
||||
|
||||
// CgroupConfig provides cgroup configuration
|
||||
|
||||
@@ -18,8 +18,6 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -33,15 +31,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/ttrpc"
|
||||
"github.com/docker/go-metrics"
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
|
||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
diffapi "github.com/containerd/containerd/api/services/diff/v1"
|
||||
@@ -58,7 +52,6 @@ import (
|
||||
sbproxy "github.com/containerd/containerd/v2/core/sandbox/proxy"
|
||||
ssproxy "github.com/containerd/containerd/v2/core/snapshots/proxy"
|
||||
"github.com/containerd/containerd/v2/defaults"
|
||||
"github.com/containerd/containerd/v2/internal/wintls"
|
||||
"github.com/containerd/containerd/v2/pkg/dialer"
|
||||
"github.com/containerd/containerd/v2/pkg/sys"
|
||||
"github.com/containerd/containerd/v2/pkg/timeout"
|
||||
@@ -146,105 +139,21 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
||||
for id, p := range config.StreamProcessors {
|
||||
diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args, p.Env))
|
||||
}
|
||||
|
||||
var prometheusServerMetricsOpts []grpc_prometheus.ServerMetricsOption
|
||||
if config.Metrics.GRPCHistogram {
|
||||
// Enable grpc time histograms to measure rpc latencies
|
||||
prometheusServerMetricsOpts = append(prometheusServerMetricsOpts, grpc_prometheus.WithServerHandlingTimeHistogram())
|
||||
}
|
||||
|
||||
prometheusServerMetrics := grpc_prometheus.NewServerMetrics(prometheusServerMetricsOpts...)
|
||||
prometheus.MustRegister(prometheusServerMetrics)
|
||||
|
||||
serverOpts := []grpc.ServerOption{
|
||||
grpc.StatsHandler(otelgrpc.NewServerHandler()),
|
||||
grpc.ChainStreamInterceptor(
|
||||
streamNamespaceInterceptor,
|
||||
prometheusServerMetrics.StreamServerInterceptor(),
|
||||
),
|
||||
grpc.ChainUnaryInterceptor(
|
||||
unaryNamespaceInterceptor,
|
||||
prometheusServerMetrics.UnaryServerInterceptor(),
|
||||
),
|
||||
}
|
||||
if config.GRPC.MaxRecvMsgSize > 0 {
|
||||
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize))
|
||||
}
|
||||
if config.GRPC.MaxSendMsgSize > 0 {
|
||||
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize))
|
||||
}
|
||||
ttrpcServer, err := newTTRPCServer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tcpServerOpts := serverOpts
|
||||
if config.GRPC.TCPTLSCert != "" {
|
||||
log.G(ctx).Info("setting up tls on tcp GRPC services...")
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig := &tls.Config{Certificates: []tls.Certificate{tlsCert}}
|
||||
|
||||
if config.GRPC.TCPTLSCA != "" {
|
||||
caCertPool := x509.NewCertPool()
|
||||
caCert, err := os.ReadFile(config.GRPC.TCPTLSCA)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load CA file: %w", err)
|
||||
}
|
||||
caCertPool.AppendCertsFromPEM(caCert)
|
||||
tlsConfig.ClientCAs = caCertPool
|
||||
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
}
|
||||
tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
|
||||
} else if config.GRPC.TCPTLSCName != "" {
|
||||
tlsConfig, CA, res, err :=
|
||||
wintls.SetupTLSFromWindowsCertStore(ctx, config.GRPC.TCPTLSCName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to setup TLS from Windows cert store: %w", err)
|
||||
}
|
||||
// Cache resource for cleanup (Windows only)
|
||||
setTLSResource(res)
|
||||
if CA != nil {
|
||||
tlsConfig.ClientCAs = CA
|
||||
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
}
|
||||
tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
|
||||
// grpcService allows GRPC services to be registered with the underlying server
|
||||
type grpcService interface {
|
||||
Register(*grpc.Server) error
|
||||
}
|
||||
|
||||
// tcpService allows GRPC services to be registered with the underlying tcp server
|
||||
type tcpService interface {
|
||||
RegisterTCP(*grpc.Server) error
|
||||
}
|
||||
|
||||
// ttrpcService allows TTRPC services to be registered with the underlying server
|
||||
type ttrpcService interface {
|
||||
RegisterTTRPC(*ttrpc.Server) error
|
||||
}
|
||||
|
||||
var (
|
||||
grpcServer = grpc.NewServer(serverOpts...)
|
||||
tcpServer = grpc.NewServer(tcpServerOpts...)
|
||||
|
||||
grpcServices []grpcService
|
||||
tcpServices []tcpService
|
||||
ttrpcServices []ttrpcService
|
||||
s = &Server{
|
||||
prometheusServerMetrics: prometheusServerMetrics,
|
||||
grpcServer: grpcServer,
|
||||
tcpServer: tcpServer,
|
||||
ttrpcServer: ttrpcServer,
|
||||
config: config,
|
||||
s = &Server{
|
||||
config: config,
|
||||
}
|
||||
initialized = plugin.NewPluginSet()
|
||||
required = make(map[string]struct{})
|
||||
initialized = plugin.NewPluginSet()
|
||||
required = make(map[string]struct{})
|
||||
grpcAddress = readString(config.Plugins, "io.containerd.server.v1.grpc", "address")
|
||||
ttrpcAddress = readString(config.Plugins, "io.containerd.server.v1.ttrpc", "address")
|
||||
)
|
||||
if grpcAddress == "" {
|
||||
grpcAddress = defaults.DefaultAddress
|
||||
}
|
||||
if ttrpcAddress == "" {
|
||||
ttrpcAddress = defaults.DefaultAddress + ".ttrpc"
|
||||
}
|
||||
for _, r := range config.RequiredPlugins {
|
||||
required[r] = struct{}{}
|
||||
}
|
||||
@@ -260,8 +169,8 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
||||
map[string]string{
|
||||
plugins.PropertyRootDir: filepath.Join(config.Root, id),
|
||||
plugins.PropertyStateDir: filepath.Join(config.State, id),
|
||||
plugins.PropertyGRPCAddress: config.GRPC.Address,
|
||||
plugins.PropertyTTRPCAddress: config.TTRPC.Address,
|
||||
plugins.PropertyGRPCAddress: grpcAddress,
|
||||
plugins.PropertyTTRPCAddress: ttrpcAddress,
|
||||
},
|
||||
)
|
||||
initContext.RegisterReadiness = func() func() {
|
||||
@@ -309,17 +218,6 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// check for grpc services that should be registered with the server
|
||||
if src, ok := instance.(grpcService); ok {
|
||||
grpcServices = append(grpcServices, src)
|
||||
}
|
||||
if src, ok := instance.(ttrpcService); ok {
|
||||
ttrpcServices = append(ttrpcServices, src)
|
||||
}
|
||||
if service, ok := instance.(tcpService); ok {
|
||||
tcpServices = append(tcpServices, service)
|
||||
}
|
||||
|
||||
s.plugins = append(s.plugins, result)
|
||||
}
|
||||
if len(required) != 0 {
|
||||
@@ -330,23 +228,6 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
||||
return nil, fmt.Errorf("required plugin %s not included", missing)
|
||||
}
|
||||
|
||||
// register services after all plugins have been initialized
|
||||
for _, service := range grpcServices {
|
||||
if err := service.Register(grpcServer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for _, service := range ttrpcServices {
|
||||
if err := service.RegisterTTRPC(ttrpcServer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for _, service := range tcpServices {
|
||||
if err := service.RegisterTCP(tcpServer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
recordConfigDeprecations(ctx, config, initialized)
|
||||
return s, nil
|
||||
}
|
||||
@@ -380,25 +261,10 @@ type server interface {
|
||||
|
||||
// Server is the containerd main daemon
|
||||
type Server struct {
|
||||
prometheusServerMetrics *grpc_prometheus.ServerMetrics
|
||||
grpcServer *grpc.Server
|
||||
ttrpcServer *ttrpc.Server
|
||||
tcpServer *grpc.Server
|
||||
servers []server
|
||||
config *srvconfig.Config
|
||||
plugins []*plugin.Plugin
|
||||
ready sync.WaitGroup
|
||||
}
|
||||
|
||||
// ServeGRPC provides the containerd grpc APIs on the provided listener
|
||||
func (s *Server) ServeGRPC(l net.Listener) error {
|
||||
s.prometheusServerMetrics.InitializeMetrics(s.grpcServer)
|
||||
return trapClosedConnErr(s.grpcServer.Serve(l))
|
||||
}
|
||||
|
||||
// ServeTTRPC provides the containerd ttrpc APIs on the provided listener
|
||||
func (s *Server) ServeTTRPC(l net.Listener) error {
|
||||
return trapClosedConnErr(s.ttrpcServer.Serve(context.Background(), l))
|
||||
servers []server
|
||||
config *srvconfig.Config
|
||||
plugins []*plugin.Plugin
|
||||
ready sync.WaitGroup
|
||||
}
|
||||
|
||||
// ServeMetrics provides a prometheus endpoint for exposing metrics
|
||||
@@ -412,12 +278,6 @@ func (s *Server) ServeMetrics(l net.Listener) error {
|
||||
return trapClosedConnErr(srv.Serve(l))
|
||||
}
|
||||
|
||||
// ServeTCP allows services to serve over tcp
|
||||
func (s *Server) ServeTCP(l net.Listener) error {
|
||||
s.prometheusServerMetrics.InitializeMetrics(s.tcpServer)
|
||||
return trapClosedConnErr(s.tcpServer.Serve(l))
|
||||
}
|
||||
|
||||
// Start the services, this will normally start listening on sockets
|
||||
// and serving requests.
|
||||
func (s *Server) Start(ctx context.Context) error {
|
||||
@@ -431,9 +291,6 @@ func (s *Server) Start(ctx context.Context) error {
|
||||
|
||||
// Stop the containerd server canceling any open connections
|
||||
func (s *Server) Stop() {
|
||||
s.grpcServer.Stop()
|
||||
// Clean up TLS resources (Windows only)
|
||||
cleanupTLSResources()
|
||||
for i := len(s.plugins) - 1; i >= 0; i-- {
|
||||
p := s.plugins[i]
|
||||
instance, err := p.Instance()
|
||||
@@ -580,6 +437,31 @@ func (pc *proxyClients) getClient(address string) (*grpc.ClientConn, error) {
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func readString(properties map[string]any, key ...string) string {
|
||||
for i, k := range key {
|
||||
v, ok := properties[k]
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
if i < len(key)-1 {
|
||||
properties, ok = v.(map[string]any)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
continue
|
||||
}
|
||||
switch v := v.(type) {
|
||||
case string:
|
||||
return v
|
||||
case fmt.Stringer:
|
||||
return v.String()
|
||||
case []byte:
|
||||
return string(v)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func trapClosedConnErr(err error) error {
|
||||
if err == nil || errors.Is(err, net.ErrClosed) {
|
||||
return nil
|
||||
|
||||
@@ -24,11 +24,8 @@ import (
|
||||
cgroup1 "github.com/containerd/cgroups/v3/cgroup1"
|
||||
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
|
||||
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
|
||||
"github.com/containerd/containerd/v2/internal/wintls"
|
||||
"github.com/containerd/containerd/v2/pkg/sys"
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/otelttrpc"
|
||||
"github.com/containerd/ttrpc"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
)
|
||||
|
||||
@@ -66,14 +63,3 @@ func apply(ctx context.Context, config *srvconfig.Config) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func newTTRPCServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer(
|
||||
ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()),
|
||||
ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()),
|
||||
)
|
||||
}
|
||||
|
||||
// TLS resource helpers are no-ops on Linux.
|
||||
func setTLSResource(r wintls.CertResource) {}
|
||||
func cleanupTLSResources() {}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//go:build !linux && !windows && !solaris
|
||||
//go:build !linux
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
@@ -22,18 +22,8 @@ import (
|
||||
"context"
|
||||
|
||||
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
|
||||
"github.com/containerd/containerd/v2/internal/wintls"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
func apply(_ context.Context, _ *srvconfig.Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newTTRPCServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer()
|
||||
}
|
||||
|
||||
// TLS resource helpers are no-ops on other unsupported platforms.
|
||||
func setTLSResource(r wintls.CertResource) {}
|
||||
func cleanupTLSResources() {}
|
||||
@@ -24,9 +24,7 @@ import (
|
||||
"github.com/containerd/containerd/v2/cmd/containerd/server"
|
||||
"github.com/containerd/containerd/v2/cmd/containerd/server/config"
|
||||
"github.com/containerd/containerd/v2/defaults"
|
||||
"github.com/containerd/containerd/v2/pkg/sys"
|
||||
"github.com/containerd/containerd/v2/version"
|
||||
"github.com/containerd/log"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -71,19 +69,12 @@ func startDaemon() {
|
||||
return
|
||||
}
|
||||
|
||||
l, err := sys.GetLocalListener(srvconfig.GRPC.Address, srvconfig.GRPC.UID, srvconfig.GRPC.GID)
|
||||
if err != nil {
|
||||
defer server.Stop()
|
||||
if err := server.Start(ctx); err != nil {
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer l.Close()
|
||||
if err := server.ServeGRPC(l); err != nil {
|
||||
log.G(ctx).WithError(err).WithField("address", srvconfig.GRPC.Address).Fatal("serve failure")
|
||||
}
|
||||
}()
|
||||
|
||||
server.Wait()
|
||||
}()
|
||||
|
||||
|
||||
67
plugins/server/grpc/metrics.go
Normal file
67
plugins/server/grpc/metrics.go
Normal file
@@ -0,0 +1,67 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"github.com/containerd/plugin"
|
||||
"github.com/containerd/plugin/registry"
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
|
||||
"github.com/containerd/containerd/v2/plugins"
|
||||
)
|
||||
|
||||
type metricsConfig struct {
|
||||
GRPCHistogram bool `toml:"grpc_histogram"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
registry.Register(&plugin.Registration{
|
||||
Type: plugins.MetricsPlugin,
|
||||
ID: "grpc-prometheus",
|
||||
Requires: []plugin.Type{
|
||||
plugins.GRPCPlugin,
|
||||
},
|
||||
Config: &metricsConfig{
|
||||
GRPCHistogram: false,
|
||||
},
|
||||
InitFn: func(ic *plugin.InitContext) (any, error) {
|
||||
c := ic.Config.(*metricsConfig)
|
||||
var prometheusServerMetricsOpts []grpc_prometheus.ServerMetricsOption
|
||||
if c.GRPCHistogram {
|
||||
// Enable grpc time histograms to measure rpc latencies
|
||||
prometheusServerMetricsOpts = append(prometheusServerMetricsOpts, grpc_prometheus.WithServerHandlingTimeHistogram())
|
||||
}
|
||||
|
||||
prometheusServerMetrics := grpc_prometheus.NewServerMetrics(prometheusServerMetricsOpts...)
|
||||
prometheus.MustRegister(prometheusServerMetrics)
|
||||
|
||||
return prometheusServerMetrics, nil
|
||||
},
|
||||
})
|
||||
registry.Register(&plugin.Registration{
|
||||
Type: plugins.MetricsPlugin,
|
||||
ID: "grpc-otel",
|
||||
Requires: []plugin.Type{
|
||||
plugins.GRPCPlugin,
|
||||
},
|
||||
InitFn: func(ic *plugin.InitContext) (any, error) {
|
||||
return otelgrpc.NewServerHandler(), nil
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -14,7 +14,7 @@
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
292
plugins/server/grpc/plugin.go
Normal file
292
plugins/server/grpc/plugin.go
Normal file
@@ -0,0 +1,292 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
|
||||
"github.com/containerd/errdefs"
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/plugin"
|
||||
"github.com/containerd/plugin/registry"
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/stats"
|
||||
|
||||
"github.com/containerd/containerd/v2/defaults"
|
||||
"github.com/containerd/containerd/v2/internal/wintls"
|
||||
"github.com/containerd/containerd/v2/pkg/sys"
|
||||
"github.com/containerd/containerd/v2/plugins"
|
||||
"github.com/containerd/containerd/v2/plugins/server/internal"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
Address string `toml:"address"`
|
||||
UID int `toml:"uid"`
|
||||
GID int `toml:"gid"`
|
||||
MaxRecvMsgSize int `toml:"max_recv_message_size"`
|
||||
MaxSendMsgSize int `toml:"max_send_message_size"`
|
||||
}
|
||||
|
||||
type tcpConfig struct {
|
||||
Address string `toml:"address"`
|
||||
TLSCA string `toml:"tls_ca"`
|
||||
TLSCert string `toml:"tls_cert"`
|
||||
TLSKey string `toml:"tls_key"`
|
||||
TLSCName string `toml:"tls_common_name"`
|
||||
MaxRecvMsgSize int `toml:"max_recv_message_size"`
|
||||
MaxSendMsgSize int `toml:"max_send_message_size"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
registry.Register(&plugin.Registration{
|
||||
Type: plugins.ServerPlugin,
|
||||
ID: "grpc",
|
||||
Requires: []plugin.Type{
|
||||
plugins.GRPCPlugin,
|
||||
plugins.MetricsPlugin,
|
||||
},
|
||||
Config: &config{
|
||||
Address: defaults.DefaultAddress,
|
||||
MaxRecvMsgSize: defaults.DefaultMaxRecvMsgSize,
|
||||
MaxSendMsgSize: defaults.DefaultMaxSendMsgSize,
|
||||
},
|
||||
InitFn: func(ic *plugin.InitContext) (any, error) {
|
||||
c := ic.Config.(*config)
|
||||
if c.Address == "" {
|
||||
return nil, fmt.Errorf("grpc address cannot be empty: %w", errdefs.ErrInvalidArgument)
|
||||
}
|
||||
|
||||
var (
|
||||
streamOpts = []grpc.StreamServerInterceptor{streamNamespaceInterceptor}
|
||||
unaryOpts = []grpc.UnaryServerInterceptor{unaryNamespaceInterceptor}
|
||||
serverOpts = []grpc.ServerOption{}
|
||||
prometheusServerMetrics *grpc_prometheus.ServerMetrics // This should be grpc handler
|
||||
)
|
||||
|
||||
if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-prometheus"); err == nil {
|
||||
prometheusServerMetrics = p.(*grpc_prometheus.ServerMetrics)
|
||||
streamOpts = append(streamOpts, prometheusServerMetrics.StreamServerInterceptor())
|
||||
unaryOpts = append(unaryOpts, prometheusServerMetrics.UnaryServerInterceptor())
|
||||
}
|
||||
|
||||
if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-otel"); err == nil {
|
||||
serverOpts = append(serverOpts, grpc.StatsHandler(p.(stats.Handler)))
|
||||
}
|
||||
|
||||
serverOpts = append(serverOpts, grpc.ChainStreamInterceptor(streamOpts...))
|
||||
serverOpts = append(serverOpts, grpc.ChainUnaryInterceptor(unaryOpts...))
|
||||
if c.MaxRecvMsgSize > 0 {
|
||||
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(c.MaxRecvMsgSize))
|
||||
}
|
||||
if c.MaxSendMsgSize > 0 {
|
||||
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(c.MaxSendMsgSize))
|
||||
}
|
||||
|
||||
// grpcService allows GRPC services to be registered with the underlying server
|
||||
type grpcService interface {
|
||||
Register(*grpc.Server) error
|
||||
}
|
||||
|
||||
s := grpc.NewServer(serverOpts...)
|
||||
ps, err := ic.GetByType(plugins.GRPCPlugin) // ensure grpc plugin is initialized
|
||||
if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
for _, p := range ps {
|
||||
if gs, ok := p.(grpcService); ok {
|
||||
if err := gs.Register(s); err != nil {
|
||||
return nil, fmt.Errorf("failed to register grpc service: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if prometheusServerMetrics != nil {
|
||||
prometheusServerMetrics.InitializeMetrics(s)
|
||||
}
|
||||
return grpcServer{
|
||||
Server: s,
|
||||
config: *c,
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
|
||||
registry.Register(&plugin.Registration{
|
||||
Type: plugins.ServerPlugin,
|
||||
ID: "grpc-tcp",
|
||||
Requires: []plugin.Type{
|
||||
plugins.GRPCPlugin,
|
||||
plugins.MetricsPlugin,
|
||||
},
|
||||
Config: &tcpConfig{
|
||||
MaxRecvMsgSize: defaults.DefaultMaxRecvMsgSize,
|
||||
MaxSendMsgSize: defaults.DefaultMaxSendMsgSize,
|
||||
},
|
||||
InitFn: func(ic *plugin.InitContext) (any, error) {
|
||||
c := ic.Config.(*tcpConfig)
|
||||
if c.Address == "" {
|
||||
return nil, plugin.ErrSkipPlugin
|
||||
}
|
||||
|
||||
var (
|
||||
streamOpts = []grpc.StreamServerInterceptor{streamNamespaceInterceptor}
|
||||
unaryOpts = []grpc.UnaryServerInterceptor{unaryNamespaceInterceptor}
|
||||
serverOpts = []grpc.ServerOption{}
|
||||
prometheusServerMetrics *grpc_prometheus.ServerMetrics // This should be grpc handler
|
||||
)
|
||||
|
||||
if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-prometheus"); err == nil {
|
||||
prometheusServerMetrics = p.(*grpc_prometheus.ServerMetrics)
|
||||
streamOpts = append(streamOpts, prometheusServerMetrics.StreamServerInterceptor())
|
||||
unaryOpts = append(unaryOpts, prometheusServerMetrics.UnaryServerInterceptor())
|
||||
prometheusServerMetrics.InitializeMetrics(nil)
|
||||
}
|
||||
|
||||
if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-otel"); err == nil {
|
||||
serverOpts = append(serverOpts, grpc.StatsHandler(p.(stats.Handler)))
|
||||
}
|
||||
|
||||
serverOpts = append(serverOpts, grpc.ChainStreamInterceptor(streamOpts...))
|
||||
serverOpts = append(serverOpts, grpc.ChainUnaryInterceptor(unaryOpts...))
|
||||
|
||||
if c.MaxRecvMsgSize > 0 {
|
||||
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(c.MaxRecvMsgSize))
|
||||
}
|
||||
if c.MaxSendMsgSize > 0 {
|
||||
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(c.MaxSendMsgSize))
|
||||
}
|
||||
|
||||
if c.TLSCert != "" {
|
||||
log.G(ic.Context).Info("setting up tls on tcp GRPC services...")
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(c.TLSCert, c.TLSKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig := &tls.Config{Certificates: []tls.Certificate{tlsCert}}
|
||||
|
||||
if c.TLSCA != "" {
|
||||
caCertPool := x509.NewCertPool()
|
||||
caCert, err := os.ReadFile(c.TLSCA)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load CA file: %w", err)
|
||||
}
|
||||
caCertPool.AppendCertsFromPEM(caCert)
|
||||
tlsConfig.ClientCAs = caCertPool
|
||||
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
}
|
||||
serverOpts = append(serverOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
|
||||
} else if c.TLSCName != "" {
|
||||
tlsConfig, CA, res, err :=
|
||||
wintls.SetupTLSFromWindowsCertStore(ic.Context, c.TLSCName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to setup TLS from Windows cert store: %w", err)
|
||||
}
|
||||
// Cache resource for cleanup (Windows only)
|
||||
setTLSResource(res)
|
||||
if CA != nil {
|
||||
tlsConfig.ClientCAs = CA
|
||||
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
}
|
||||
serverOpts = append(serverOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
|
||||
// tcpService allows GRPC services to be registered with the underlying tcp server
|
||||
type tcpService interface {
|
||||
RegisterTCP(*grpc.Server) error
|
||||
}
|
||||
|
||||
s := grpc.NewServer(serverOpts...)
|
||||
ps, err := ic.GetByType(plugins.GRPCPlugin) // ensure grpc plugin is initialized
|
||||
if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
var hasService bool
|
||||
for _, p := range ps {
|
||||
if gs, ok := p.(tcpService); ok {
|
||||
if err := gs.RegisterTCP(s); err != nil {
|
||||
return nil, fmt.Errorf("failed to register grpc service: %w", err)
|
||||
}
|
||||
hasService = true
|
||||
}
|
||||
}
|
||||
if !hasService {
|
||||
return nil, fmt.Errorf("no tcp grpc services configured: %w", plugin.ErrSkipPlugin)
|
||||
}
|
||||
if prometheusServerMetrics != nil {
|
||||
prometheusServerMetrics.InitializeMetrics(s)
|
||||
}
|
||||
return tcpServer{
|
||||
Server: s,
|
||||
config: *c,
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
type grpcServer struct {
|
||||
*grpc.Server
|
||||
config config
|
||||
}
|
||||
|
||||
func (s grpcServer) Start(ctx context.Context) error {
|
||||
log.G(ctx).WithField("address", s.config.Address).WithField("uid", s.config.UID).WithField("gid", s.config.GID).Info("starting GRPC server")
|
||||
l, err := sys.GetLocalListener(s.config.Address, s.config.UID, s.config.GID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get listener for main endpoint: %w", err)
|
||||
}
|
||||
|
||||
internal.Serve(ctx, l, s.Serve)
|
||||
|
||||
return nil
|
||||
}
|
||||
func (s grpcServer) Close() error {
|
||||
s.Stop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type tcpServer struct {
|
||||
*grpc.Server
|
||||
config tcpConfig
|
||||
}
|
||||
|
||||
func (s tcpServer) Start(ctx context.Context) error {
|
||||
l, err := net.Listen("tcp", s.config.Address)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err)
|
||||
}
|
||||
|
||||
internal.Serve(ctx, l, s.Serve)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s tcpServer) Close() error {
|
||||
s.Stop()
|
||||
|
||||
// Clean up TLS resources (Windows only)
|
||||
cleanupTLSResources()
|
||||
|
||||
return nil
|
||||
}
|
||||
28
plugins/server/grpc/tls_other.go
Normal file
28
plugins/server/grpc/tls_other.go
Normal file
@@ -0,0 +1,28 @@
|
||||
//go:build !windows
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"github.com/containerd/containerd/v2/internal/wintls"
|
||||
)
|
||||
|
||||
// TLS resource helpers are no-ops on other unsupported platforms.
|
||||
func setTLSResource(r wintls.CertResource) {}
|
||||
|
||||
func cleanupTLSResources() {}
|
||||
@@ -14,16 +14,11 @@
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
|
||||
"github.com/containerd/containerd/v2/internal/wintls"
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/otelttrpc"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
// tlsResource holds Windows-specific TLS resources for cleanup on Stop.
|
||||
@@ -41,14 +36,3 @@ func cleanupTLSResources() {
|
||||
tlsResource = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Windows-specific apply and TTRPC server constructor
|
||||
func apply(_ context.Context, _ *srvconfig.Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newTTRPCServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer(
|
||||
ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()),
|
||||
)
|
||||
}
|
||||
@@ -20,8 +20,10 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
func Serve(ctx context.Context, l net.Listener, serveFunc func(net.Listener) error) {
|
||||
@@ -30,7 +32,7 @@ func Serve(ctx context.Context, l net.Listener, serveFunc func(net.Listener) err
|
||||
go func() {
|
||||
defer l.Close()
|
||||
|
||||
if err := serveFunc(l); err != nil && !errors.Is(err, net.ErrClosed) {
|
||||
if err := serveFunc(l); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, ttrpc.ErrServerClosed) {
|
||||
log.G(ctx).WithError(err).WithField("address", path).Fatal("serve failure")
|
||||
}
|
||||
}()
|
||||
|
||||
128
plugins/server/ttrpc/plugin.go
Normal file
128
plugins/server/ttrpc/plugin.go
Normal file
@@ -0,0 +1,128 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ttrpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/containerd/containerd/v2/defaults"
|
||||
"github.com/containerd/containerd/v2/pkg/sys"
|
||||
"github.com/containerd/containerd/v2/plugins"
|
||||
"github.com/containerd/containerd/v2/plugins/server/internal"
|
||||
"github.com/containerd/plugin"
|
||||
"github.com/containerd/plugin/registry"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
Address string `toml:"address"`
|
||||
UID int `toml:"uid"`
|
||||
GID int `toml:"gid"`
|
||||
}
|
||||
|
||||
func (c *config) GetAddress() string {
|
||||
return c.Address
|
||||
}
|
||||
|
||||
func (c *config) SetAddress(s string) {
|
||||
c.Address = s
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Keep the default the same for compatibility
|
||||
defaultAddress := defaults.DefaultAddress + ".ttrpc"
|
||||
registry.Register(&plugin.Registration{
|
||||
Type: plugins.ServerPlugin,
|
||||
ID: "ttrpc",
|
||||
Requires: []plugin.Type{
|
||||
plugins.TTRPCPlugin,
|
||||
plugins.GRPCPlugin,
|
||||
},
|
||||
Config: &config{
|
||||
Address: defaultAddress,
|
||||
},
|
||||
InitFn: func(ic *plugin.InitContext) (any, error) {
|
||||
c := ic.Config.(*config)
|
||||
if c.Address == "" {
|
||||
return nil, plugin.ErrSkipPlugin
|
||||
}
|
||||
|
||||
s, err := newTTRPCServer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// ttrpcService allows TTRPC services to be registered with the underlying server
|
||||
type ttrpcService interface {
|
||||
RegisterTTRPC(*ttrpc.Server) error
|
||||
}
|
||||
var hasService bool
|
||||
ps, err := ic.GetByType(plugins.TTRPCPlugin) // ensure grpc plugin is initialized
|
||||
if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
for _, p := range ps {
|
||||
if gs, ok := p.(ttrpcService); ok {
|
||||
if err := gs.RegisterTTRPC(s); err != nil {
|
||||
return nil, fmt.Errorf("failed to register ttrpc service: %w", err)
|
||||
}
|
||||
hasService = true
|
||||
}
|
||||
}
|
||||
ps, err = ic.GetByType(plugins.GRPCPlugin) // ensure grpc plugin is initialized
|
||||
if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
for _, p := range ps {
|
||||
if gs, ok := p.(ttrpcService); ok {
|
||||
if err := gs.RegisterTTRPC(s); err != nil {
|
||||
return nil, fmt.Errorf("failed to register ttrpc service: %w", err)
|
||||
}
|
||||
hasService = true
|
||||
}
|
||||
}
|
||||
if !hasService {
|
||||
return nil, fmt.Errorf("no ttrpc services configured: %w", plugin.ErrSkipPlugin)
|
||||
}
|
||||
return server{
|
||||
Server: s,
|
||||
config: *c,
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
type server struct {
|
||||
*ttrpc.Server
|
||||
config config
|
||||
}
|
||||
|
||||
func (s server) Start(ctx context.Context) error {
|
||||
// setup the ttrpc endpoint
|
||||
tl, err := sys.GetLocalListener(s.config.Address, s.config.UID, s.config.GID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err)
|
||||
}
|
||||
|
||||
internal.Serve(ctx, tl, func(l net.Listener) error {
|
||||
return s.Serve(context.WithoutCancel(ctx), l)
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -14,28 +14,16 @@
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
package ttrpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
|
||||
"github.com/containerd/containerd/v2/internal/wintls"
|
||||
"github.com/containerd/otelttrpc"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
func apply(_ context.Context, _ *srvconfig.Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TLS resource helpers are no-ops on Solaris.
|
||||
func setTLSResource(r wintls.CertResource) {}
|
||||
func cleanupTLSResources() {}
|
||||
|
||||
// newTTRPCServer provides the ttrpc server for Solaris builds.
|
||||
func newTTRPCServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer(
|
||||
ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()),
|
||||
ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()),
|
||||
)
|
||||
}
|
||||
31
plugins/server/ttrpc/server_otel.go
Normal file
31
plugins/server/ttrpc/server_otel.go
Normal file
@@ -0,0 +1,31 @@
|
||||
//go:build windows || solaris
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ttrpc
|
||||
|
||||
import (
|
||||
"github.com/containerd/otelttrpc"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
// ttrpc server with otel interceptor
|
||||
func newTTRPCServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer(
|
||||
ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()),
|
||||
)
|
||||
}
|
||||
27
plugins/server/ttrpc/server_other.go
Normal file
27
plugins/server/ttrpc/server_other.go
Normal file
@@ -0,0 +1,27 @@
|
||||
//go:build !linux && !windows && !solaris
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ttrpc
|
||||
|
||||
import (
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
func newTTRPCServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer()
|
||||
}
|
||||
@@ -57,6 +57,8 @@ const (
|
||||
StreamingPlugin plugin.Type = "io.containerd.streaming.v1"
|
||||
// TracingProcessorPlugin implements an open telemetry span processor
|
||||
TracingProcessorPlugin plugin.Type = "io.containerd.tracing.processor.v1"
|
||||
// MetricsPlugin implements a metrics handler
|
||||
MetricsPlugin plugin.Type = "io.containerd.metrics.v1"
|
||||
// NRIApiPlugin implements the NRI adaptation interface for containerd.
|
||||
NRIApiPlugin plugin.Type = "io.containerd.nri.v1"
|
||||
// TransferPlugin implements a transfer service
|
||||
|
||||
Reference in New Issue
Block a user