diff --git a/cmd/containerd/builtins/builtins.go b/cmd/containerd/builtins/builtins.go index ec7ae1ff39..ce85c5f9a8 100644 --- a/cmd/containerd/builtins/builtins.go +++ b/cmd/containerd/builtins/builtins.go @@ -30,6 +30,8 @@ import ( _ "github.com/containerd/containerd/v2/plugins/restart" _ "github.com/containerd/containerd/v2/plugins/sandbox" _ "github.com/containerd/containerd/v2/plugins/server/debug" + _ "github.com/containerd/containerd/v2/plugins/server/grpc" + _ "github.com/containerd/containerd/v2/plugins/server/ttrpc" _ "github.com/containerd/containerd/v2/plugins/services/containers" _ "github.com/containerd/containerd/v2/plugins/services/content" _ "github.com/containerd/containerd/v2/plugins/services/diff" diff --git a/cmd/containerd/command/config.go b/cmd/containerd/command/config.go index 28a2dfb9b3..7bbb988b34 100644 --- a/cmd/containerd/command/config.go +++ b/cmd/containerd/command/config.go @@ -125,18 +125,20 @@ func dumpConfig(cliContext *cli.Context) error { func platformAgnosticDefaultConfig() *srvconfig.Config { return &srvconfig.Config{ - Version: version.ConfigVersion, - Root: defaults.DefaultRootDir, - State: defaults.DefaultStateDir, - GRPC: srvconfig.GRPCConfig{ - Address: defaults.DefaultAddress, - MaxRecvMsgSize: defaults.DefaultMaxRecvMsgSize, - MaxSendMsgSize: defaults.DefaultMaxSendMsgSize, - }, + Version: version.ConfigVersion, + Root: defaults.DefaultRootDir, + State: defaults.DefaultStateDir, DisabledPlugins: []string{}, RequiredPlugins: []string{}, StreamProcessors: streamProcessors(), Imports: []string{defaults.DefaultConfigIncludePattern}, + Plugins: map[string]any{ + "io.containerd.server.v1.grpc": map[string]any{ + "address": defaults.DefaultAddress, + "max_recv_message_size": defaults.DefaultMaxRecvMsgSize, + "max_send_message_size": defaults.DefaultMaxSendMsgSize, + }, + }, } } diff --git a/cmd/containerd/command/main.go b/cmd/containerd/command/main.go index 5837b602a8..586dc14ea0 100644 --- a/cmd/containerd/command/main.go +++ b/cmd/containerd/command/main.go @@ -34,7 +34,6 @@ import ( _ "github.com/containerd/containerd/v2/core/metrics" // import containerd build info "github.com/containerd/containerd/v2/core/mount" "github.com/containerd/containerd/v2/defaults" - "github.com/containerd/containerd/v2/pkg/sys" "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/containerd/v2/version" "github.com/containerd/errdefs" @@ -165,16 +164,6 @@ can be used and modified as necessary as a custom configuration.` return err } - if config.GRPC.Address == "" { - return fmt.Errorf("grpc address cannot be empty: %w", errdefs.ErrInvalidArgument) - } - if config.TTRPC.Address == "" { - // If TTRPC was not explicitly configured, use defaults based on GRPC. - config.TTRPC.Address = config.GRPC.Address + ".ttrpc" - config.TTRPC.UID = config.GRPC.UID - config.TTRPC.GID = config.GRPC.GID - } - // Make sure top-level directories are created early. if err := server.CreateTopLevelDirectories(config); err != nil { return err @@ -229,6 +218,7 @@ can be used and modified as necessary as a custom configuration.` go func() { defer close(chsrv) + // TODO: When to set grpc address from flag? Migration should be done first server, err := server.New(ctx, config) if err != nil { select { @@ -270,6 +260,8 @@ can be used and modified as necessary as a custom configuration.` if err := server.Start(ctx); err != nil { return err } + + // TODO: Move to server.Start(ctx) error if config.Metrics.Address != "" { l, err := net.Listen("tcp", config.Metrics.Address) if err != nil { @@ -277,26 +269,8 @@ can be used and modified as necessary as a custom configuration.` } serve(ctx, l, server.ServeMetrics) } - // setup the ttrpc endpoint - tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID) - if err != nil { - return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err) - } - serve(ctx, tl, server.ServeTTRPC) - if config.GRPC.TCPAddress != "" { - l, err := net.Listen("tcp", config.GRPC.TCPAddress) - if err != nil { - return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err) - } - serve(ctx, l, server.ServeTCP) - } - // setup the main grpc endpoint - l, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID) - if err != nil { - return fmt.Errorf("failed to get listener for main endpoint: %w", err) - } - serve(ctx, l, server.ServeGRPC) + // end s.Start readyC := make(chan struct{}) go func() { @@ -351,10 +325,6 @@ func applyFlags(cliContext *cli.Context, config *srvconfig.Config) error { name: "state", d: &config.State, }, - { - name: "address", - d: &config.GRPC.Address, - }, } { if s := cliContext.String(v.name); s != "" { *v.d = s @@ -368,6 +338,35 @@ func applyFlags(cliContext *cli.Context, config *srvconfig.Config) error { } } + if s := cliContext.String("address"); s != "" { + var ( + grpcConfig map[string]any + ttrpcConfig map[string]any + ) + v, ok := config.Plugins["io.containerd.server.v1.grpc"] + if !ok { + grpcConfig = make(map[string]any) + } else if grpcConfig, ok = v.(map[string]any); !ok { + return fmt.Errorf("grpc plugin has invalid configuration: %w", errdefs.ErrInvalidArgument) + } + grpcConfig["address"] = s + config.Plugins["io.containerd.server.v1.grpc"] = grpcConfig + + _, ok = config.Plugins["io.containerd.server.v1.ttrpc"] + if !ok { + ttrpcConfig = map[string]any{ + "address": s + ".ttrpc", + } + if uid, ok := grpcConfig["uid"]; ok { + ttrpcConfig["uid"] = uid + } + if gid, ok := grpcConfig["gid"]; ok { + ttrpcConfig["gid"] = gid + } + config.Plugins["io.containerd.server.v1.ttrpc"] = ttrpcConfig + } + } + applyPlatformFlags(cliContext) return nil diff --git a/cmd/containerd/server/config/config.go b/cmd/containerd/server/config/config.go index 17b061903d..0fef148119 100644 --- a/cmd/containerd/server/config/config.go +++ b/cmd/containerd/server/config/config.go @@ -65,10 +65,6 @@ type Config struct { State string `toml:"state"` // TempDir is the path to a directory where to place containerd temporary files TempDir string `toml:"temp"` - // GRPC configuration settings - GRPC GRPCConfig `toml:"grpc"` - // TTRPC configuration settings - TTRPC TTRPCConfig `toml:"ttrpc"` // Debug log settings Debug Debug `toml:"debug"` // Metrics and monitoring settings @@ -95,6 +91,19 @@ type Config struct { Imports []string `toml:"imports"` // StreamProcessors configuration StreamProcessors map[string]StreamProcessor `toml:"stream_processors"` + + // Deprecated fields must remain but should not be output when generating default or migrated configs. + // These fields are automatically migrated to the corresponding server plugin + // configuration blocks on startup (see serviceMigrate). In version 4 configs, + // server settings are configured directly under [plugins.""]. + + // Deprecated: use server plugins io.containerd.server.v1.grpc and io.containerd.server.v1.grpc-tcp + GRPC GRPCConfig `toml:"grpc,omitempty"` + // Deprecated: use server plugin io.containerd.server.v1.ttrpc. + // In configs prior to version 4, an unset TTRPC address is derived from + // the GRPC address (grpcAddress + ".ttrpc") and inherits GRPC's UID/GID. + // In version 4, the TTRPC plugin uses its own defaults independently. + TTRPC TTRPCConfig `toml:"ttrpc,omitempty"` } // StreamProcessor provides configuration for diff content processors @@ -206,6 +215,15 @@ func v1Migrate(ctx context.Context, c *Config) error { return nil } +// serviceMigrate moves server properties (GRPC, TTRPC, Debug, Metrics) from +// top-level config fields into their corresponding plugin configuration blocks +// for version 4. +// +// For configs prior to version 4, if the TTRPC address is not explicitly set +// it is derived from the GRPC address (grpcAddress + ".ttrpc") and inherits +// GRPC's UID/GID. In version 4, each server plugin is independently +// configured; the TTRPC plugin will use its own default address if its +// plugin block is omitted, regardless of the GRPC plugin's address. func serviceMigrate(ctx context.Context, c *Config) error { if c.Debug.Address != "" && c.Plugins["io.containerd.server.v1.debug"] == nil { c.Plugins["io.containerd.server.v1.debug"] = map[string]any{ @@ -217,28 +235,94 @@ func serviceMigrate(ctx context.Context, c *Config) error { c.Debug.UID = 0 c.Debug.GID = 0 } + // Capture legacy GRPC values up front so both grpc and grpc-tcp + // migrations see the same values, and so migrations that don't key on + // an address (uid/gid/max message sizes) are not dropped. + grpcAddress := c.GRPC.Address + grpcUID := c.GRPC.UID + grpcGID := c.GRPC.GID + grpcMaxRecv := c.GRPC.MaxRecvMsgSize + grpcMaxSend := c.GRPC.MaxSendMsgSize + grpcHasLegacy := grpcAddress != "" || grpcUID != 0 || grpcGID != 0 || grpcMaxRecv != 0 || grpcMaxSend != 0 + if grpcHasLegacy && c.Plugins["io.containerd.server.v1.grpc"] == nil { + c.Plugins["io.containerd.server.v1.grpc"] = map[string]any{ + "address": grpcAddress, + "uid": grpcUID, + "gid": grpcGID, + "max_recv_message_size": grpcMaxRecv, + "max_send_message_size": grpcMaxSend, + } + } + if c.GRPC.TCPAddress != "" && c.Plugins["io.containerd.server.v1.grpc-tcp"] == nil { + c.Plugins["io.containerd.server.v1.grpc-tcp"] = map[string]any{ + "address": c.GRPC.TCPAddress, + "tls_ca": c.GRPC.TCPTLSCA, + "tls_cert": c.GRPC.TCPTLSCert, + "tls_key": c.GRPC.TCPTLSKey, + "tls_common_name": c.GRPC.TCPTLSCName, + "max_recv_message_size": grpcMaxRecv, + "max_send_message_size": grpcMaxSend, + } + } + if grpcHasLegacy || c.GRPC.TCPAddress != "" { + c.GRPC = GRPCConfig{} + } + if c.Plugins["io.containerd.server.v1.ttrpc"] == nil { + ttrpcAddress := c.TTRPC.Address + ttrpcUID := c.TTRPC.UID + ttrpcGID := c.TTRPC.GID + if ttrpcAddress == "" && grpcAddress != "" { + ttrpcAddress = grpcAddress + ".ttrpc" + if ttrpcUID == 0 { + ttrpcUID = grpcUID + } + if ttrpcGID == 0 { + ttrpcGID = grpcGID + } + } + if ttrpcAddress != "" || ttrpcUID != 0 || ttrpcGID != 0 { + c.Plugins["io.containerd.server.v1.ttrpc"] = map[string]any{ + "address": ttrpcAddress, + "uid": ttrpcUID, + "gid": ttrpcGID, + } + c.TTRPC.Address = "" + c.TTRPC.UID = 0 + c.TTRPC.GID = 0 + } + } return nil } // GRPCConfig provides GRPC configuration for the socket type GRPCConfig struct { - Address string `toml:"address"` - TCPAddress string `toml:"tcp_address"` - TCPTLSCA string `toml:"tcp_tls_ca"` - TCPTLSCert string `toml:"tcp_tls_cert"` - TCPTLSKey string `toml:"tcp_tls_key"` - UID int `toml:"uid"` - GID int `toml:"gid"` - MaxRecvMsgSize int `toml:"max_recv_message_size"` - MaxSendMsgSize int `toml:"max_send_message_size"` - TCPTLSCName string `toml:"tcp_tls_common_name"` + // Deprecated: use server plugin io.containerd.server.v1.grpc + Address string `toml:"address,omitempty"` + // Deprecated: use server plugin io.containerd.server.v1.grpc-tcp + TCPAddress string `toml:"tcp_address,omitempty"` + // Deprecated: use server plugin io.containerd.server.v1.grpc-tcp + TCPTLSCA string `toml:"tcp_tls_ca,omitempty"` + // Deprecated: use server plugin io.containerd.server.v1.grpc-tcp + TCPTLSCert string `toml:"tcp_tls_cert,omitempty"` + // Deprecated: use server plugin io.containerd.server.v1.grpc-tcp + TCPTLSKey string `toml:"tcp_tls_key,omitempty"` + // Deprecated: use server plugin io.containerd.server.v1.grpc + UID int `toml:"uid,omitempty"` + // Deprecated: use server plugin io.containerd.server.v1.grpc + GID int `toml:"gid,omitempty"` + // Deprecated: use server plugin io.containerd.server.v1.grpc + MaxRecvMsgSize int `toml:"max_recv_message_size,omitempty"` + // Deprecated: use server plugin io.containerd.server.v1.grpc + MaxSendMsgSize int `toml:"max_send_message_size,omitempty"` + // Deprecated: use server plugin io.containerd.server.v1.grpc-tcp + TCPTLSCName string `toml:"tcp_tls_common_name,omitempty"` } // TTRPCConfig provides TTRPC configuration for the socket type TTRPCConfig struct { - Address string `toml:"address"` - UID int `toml:"uid"` - GID int `toml:"gid"` + Address string `toml:"address,omitempty"` + UID int `toml:"uid,omitempty"` + GID int `toml:"gid,omitempty"` } // Debug provides debug configuration @@ -258,8 +342,9 @@ type Debug struct { // MetricsConfig provides metrics configuration type MetricsConfig struct { - Address string `toml:"address"` - GRPCHistogram bool `toml:"grpc_histogram"` + Address string `toml:"address"` + // Deprecated: use metrics plugin io.containerd.metrics.v1.grpc-prometheus + GRPCHistogram bool `toml:"grpc_histogram,omitempty"` } // CgroupConfig provides cgroup configuration diff --git a/cmd/containerd/server/server.go b/cmd/containerd/server/server.go index 59c1db9d88..6b98c7fad3 100644 --- a/cmd/containerd/server/server.go +++ b/cmd/containerd/server/server.go @@ -18,8 +18,6 @@ package server import ( "context" - "crypto/tls" - "crypto/x509" "errors" "fmt" "io" @@ -33,15 +31,11 @@ import ( "time" "github.com/containerd/log" - "github.com/containerd/ttrpc" "github.com/docker/go-metrics" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" v1 "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/backoff" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" diffapi "github.com/containerd/containerd/api/services/diff/v1" @@ -58,7 +52,6 @@ import ( sbproxy "github.com/containerd/containerd/v2/core/sandbox/proxy" ssproxy "github.com/containerd/containerd/v2/core/snapshots/proxy" "github.com/containerd/containerd/v2/defaults" - "github.com/containerd/containerd/v2/internal/wintls" "github.com/containerd/containerd/v2/pkg/dialer" "github.com/containerd/containerd/v2/pkg/sys" "github.com/containerd/containerd/v2/pkg/timeout" @@ -146,105 +139,21 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { for id, p := range config.StreamProcessors { diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args, p.Env)) } - - var prometheusServerMetricsOpts []grpc_prometheus.ServerMetricsOption - if config.Metrics.GRPCHistogram { - // Enable grpc time histograms to measure rpc latencies - prometheusServerMetricsOpts = append(prometheusServerMetricsOpts, grpc_prometheus.WithServerHandlingTimeHistogram()) - } - - prometheusServerMetrics := grpc_prometheus.NewServerMetrics(prometheusServerMetricsOpts...) - prometheus.MustRegister(prometheusServerMetrics) - - serverOpts := []grpc.ServerOption{ - grpc.StatsHandler(otelgrpc.NewServerHandler()), - grpc.ChainStreamInterceptor( - streamNamespaceInterceptor, - prometheusServerMetrics.StreamServerInterceptor(), - ), - grpc.ChainUnaryInterceptor( - unaryNamespaceInterceptor, - prometheusServerMetrics.UnaryServerInterceptor(), - ), - } - if config.GRPC.MaxRecvMsgSize > 0 { - serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize)) - } - if config.GRPC.MaxSendMsgSize > 0 { - serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize)) - } - ttrpcServer, err := newTTRPCServer() - if err != nil { - return nil, err - } - tcpServerOpts := serverOpts - if config.GRPC.TCPTLSCert != "" { - log.G(ctx).Info("setting up tls on tcp GRPC services...") - - tlsCert, err := tls.LoadX509KeyPair(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey) - if err != nil { - return nil, err - } - tlsConfig := &tls.Config{Certificates: []tls.Certificate{tlsCert}} - - if config.GRPC.TCPTLSCA != "" { - caCertPool := x509.NewCertPool() - caCert, err := os.ReadFile(config.GRPC.TCPTLSCA) - if err != nil { - return nil, fmt.Errorf("failed to load CA file: %w", err) - } - caCertPool.AppendCertsFromPEM(caCert) - tlsConfig.ClientCAs = caCertPool - tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert - } - tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig))) - } else if config.GRPC.TCPTLSCName != "" { - tlsConfig, CA, res, err := - wintls.SetupTLSFromWindowsCertStore(ctx, config.GRPC.TCPTLSCName) - if err != nil { - return nil, fmt.Errorf("failed to setup TLS from Windows cert store: %w", err) - } - // Cache resource for cleanup (Windows only) - setTLSResource(res) - if CA != nil { - tlsConfig.ClientCAs = CA - tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert - } - tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig))) - } - - // grpcService allows GRPC services to be registered with the underlying server - type grpcService interface { - Register(*grpc.Server) error - } - - // tcpService allows GRPC services to be registered with the underlying tcp server - type tcpService interface { - RegisterTCP(*grpc.Server) error - } - - // ttrpcService allows TTRPC services to be registered with the underlying server - type ttrpcService interface { - RegisterTTRPC(*ttrpc.Server) error - } - var ( - grpcServer = grpc.NewServer(serverOpts...) - tcpServer = grpc.NewServer(tcpServerOpts...) - - grpcServices []grpcService - tcpServices []tcpService - ttrpcServices []ttrpcService - s = &Server{ - prometheusServerMetrics: prometheusServerMetrics, - grpcServer: grpcServer, - tcpServer: tcpServer, - ttrpcServer: ttrpcServer, - config: config, + s = &Server{ + config: config, } - initialized = plugin.NewPluginSet() - required = make(map[string]struct{}) + initialized = plugin.NewPluginSet() + required = make(map[string]struct{}) + grpcAddress = readString(config.Plugins, "io.containerd.server.v1.grpc", "address") + ttrpcAddress = readString(config.Plugins, "io.containerd.server.v1.ttrpc", "address") ) + if grpcAddress == "" { + grpcAddress = defaults.DefaultAddress + } + if ttrpcAddress == "" { + ttrpcAddress = defaults.DefaultAddress + ".ttrpc" + } for _, r := range config.RequiredPlugins { required[r] = struct{}{} } @@ -260,8 +169,8 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { map[string]string{ plugins.PropertyRootDir: filepath.Join(config.Root, id), plugins.PropertyStateDir: filepath.Join(config.State, id), - plugins.PropertyGRPCAddress: config.GRPC.Address, - plugins.PropertyTTRPCAddress: config.TTRPC.Address, + plugins.PropertyGRPCAddress: grpcAddress, + plugins.PropertyTTRPCAddress: ttrpcAddress, }, ) initContext.RegisterReadiness = func() func() { @@ -309,17 +218,6 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { } } - // check for grpc services that should be registered with the server - if src, ok := instance.(grpcService); ok { - grpcServices = append(grpcServices, src) - } - if src, ok := instance.(ttrpcService); ok { - ttrpcServices = append(ttrpcServices, src) - } - if service, ok := instance.(tcpService); ok { - tcpServices = append(tcpServices, service) - } - s.plugins = append(s.plugins, result) } if len(required) != 0 { @@ -330,23 +228,6 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { return nil, fmt.Errorf("required plugin %s not included", missing) } - // register services after all plugins have been initialized - for _, service := range grpcServices { - if err := service.Register(grpcServer); err != nil { - return nil, err - } - } - for _, service := range ttrpcServices { - if err := service.RegisterTTRPC(ttrpcServer); err != nil { - return nil, err - } - } - for _, service := range tcpServices { - if err := service.RegisterTCP(tcpServer); err != nil { - return nil, err - } - } - recordConfigDeprecations(ctx, config, initialized) return s, nil } @@ -380,25 +261,10 @@ type server interface { // Server is the containerd main daemon type Server struct { - prometheusServerMetrics *grpc_prometheus.ServerMetrics - grpcServer *grpc.Server - ttrpcServer *ttrpc.Server - tcpServer *grpc.Server - servers []server - config *srvconfig.Config - plugins []*plugin.Plugin - ready sync.WaitGroup -} - -// ServeGRPC provides the containerd grpc APIs on the provided listener -func (s *Server) ServeGRPC(l net.Listener) error { - s.prometheusServerMetrics.InitializeMetrics(s.grpcServer) - return trapClosedConnErr(s.grpcServer.Serve(l)) -} - -// ServeTTRPC provides the containerd ttrpc APIs on the provided listener -func (s *Server) ServeTTRPC(l net.Listener) error { - return trapClosedConnErr(s.ttrpcServer.Serve(context.Background(), l)) + servers []server + config *srvconfig.Config + plugins []*plugin.Plugin + ready sync.WaitGroup } // ServeMetrics provides a prometheus endpoint for exposing metrics @@ -412,12 +278,6 @@ func (s *Server) ServeMetrics(l net.Listener) error { return trapClosedConnErr(srv.Serve(l)) } -// ServeTCP allows services to serve over tcp -func (s *Server) ServeTCP(l net.Listener) error { - s.prometheusServerMetrics.InitializeMetrics(s.tcpServer) - return trapClosedConnErr(s.tcpServer.Serve(l)) -} - // Start the services, this will normally start listening on sockets // and serving requests. func (s *Server) Start(ctx context.Context) error { @@ -431,9 +291,6 @@ func (s *Server) Start(ctx context.Context) error { // Stop the containerd server canceling any open connections func (s *Server) Stop() { - s.grpcServer.Stop() - // Clean up TLS resources (Windows only) - cleanupTLSResources() for i := len(s.plugins) - 1; i >= 0; i-- { p := s.plugins[i] instance, err := p.Instance() @@ -580,6 +437,31 @@ func (pc *proxyClients) getClient(address string) (*grpc.ClientConn, error) { return conn, nil } +func readString(properties map[string]any, key ...string) string { + for i, k := range key { + v, ok := properties[k] + if !ok { + return "" + } + if i < len(key)-1 { + properties, ok = v.(map[string]any) + if !ok { + return "" + } + continue + } + switch v := v.(type) { + case string: + return v + case fmt.Stringer: + return v.String() + case []byte: + return string(v) + } + } + return "" +} + func trapClosedConnErr(err error) error { if err == nil || errors.Is(err, net.ErrClosed) { return nil diff --git a/cmd/containerd/server/server_linux.go b/cmd/containerd/server/server_linux.go index 3e1cc46d95..7356d965fb 100644 --- a/cmd/containerd/server/server_linux.go +++ b/cmd/containerd/server/server_linux.go @@ -24,11 +24,8 @@ import ( cgroup1 "github.com/containerd/cgroups/v3/cgroup1" cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2" srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config" - "github.com/containerd/containerd/v2/internal/wintls" "github.com/containerd/containerd/v2/pkg/sys" "github.com/containerd/log" - "github.com/containerd/otelttrpc" - "github.com/containerd/ttrpc" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -66,14 +63,3 @@ func apply(ctx context.Context, config *srvconfig.Config) error { } return nil } - -func newTTRPCServer() (*ttrpc.Server, error) { - return ttrpc.NewServer( - ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()), - ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()), - ) -} - -// TLS resource helpers are no-ops on Linux. -func setTLSResource(r wintls.CertResource) {} -func cleanupTLSResources() {} diff --git a/cmd/containerd/server/server_unsupported.go b/cmd/containerd/server/server_other.go similarity index 68% rename from cmd/containerd/server/server_unsupported.go rename to cmd/containerd/server/server_other.go index b9d9d115a4..2a69439c28 100644 --- a/cmd/containerd/server/server_unsupported.go +++ b/cmd/containerd/server/server_other.go @@ -1,4 +1,4 @@ -//go:build !linux && !windows && !solaris +//go:build !linux /* Copyright The containerd Authors. @@ -22,18 +22,8 @@ import ( "context" srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config" - "github.com/containerd/containerd/v2/internal/wintls" - "github.com/containerd/ttrpc" ) func apply(_ context.Context, _ *srvconfig.Config) error { return nil } - -func newTTRPCServer() (*ttrpc.Server, error) { - return ttrpc.NewServer() -} - -// TLS resource helpers are no-ops on other unsupported platforms. -func setTLSResource(r wintls.CertResource) {} -func cleanupTLSResources() {} diff --git a/contrib/fuzz/daemon.go b/contrib/fuzz/daemon.go index 59e0ee0360..8b3c31bb84 100644 --- a/contrib/fuzz/daemon.go +++ b/contrib/fuzz/daemon.go @@ -24,9 +24,7 @@ import ( "github.com/containerd/containerd/v2/cmd/containerd/server" "github.com/containerd/containerd/v2/cmd/containerd/server/config" "github.com/containerd/containerd/v2/defaults" - "github.com/containerd/containerd/v2/pkg/sys" "github.com/containerd/containerd/v2/version" - "github.com/containerd/log" ) const ( @@ -71,19 +69,12 @@ func startDaemon() { return } - l, err := sys.GetLocalListener(srvconfig.GRPC.Address, srvconfig.GRPC.UID, srvconfig.GRPC.GID) - if err != nil { + defer server.Stop() + if err := server.Start(ctx); err != nil { errC <- err return } - go func() { - defer l.Close() - if err := server.ServeGRPC(l); err != nil { - log.G(ctx).WithError(err).WithField("address", srvconfig.GRPC.Address).Fatal("serve failure") - } - }() - server.Wait() }() diff --git a/plugins/server/grpc/metrics.go b/plugins/server/grpc/metrics.go new file mode 100644 index 0000000000..83cf87de8d --- /dev/null +++ b/plugins/server/grpc/metrics.go @@ -0,0 +1,67 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package grpc + +import ( + "github.com/containerd/plugin" + "github.com/containerd/plugin/registry" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + + "github.com/containerd/containerd/v2/plugins" +) + +type metricsConfig struct { + GRPCHistogram bool `toml:"grpc_histogram"` +} + +func init() { + registry.Register(&plugin.Registration{ + Type: plugins.MetricsPlugin, + ID: "grpc-prometheus", + Requires: []plugin.Type{ + plugins.GRPCPlugin, + }, + Config: &metricsConfig{ + GRPCHistogram: false, + }, + InitFn: func(ic *plugin.InitContext) (any, error) { + c := ic.Config.(*metricsConfig) + var prometheusServerMetricsOpts []grpc_prometheus.ServerMetricsOption + if c.GRPCHistogram { + // Enable grpc time histograms to measure rpc latencies + prometheusServerMetricsOpts = append(prometheusServerMetricsOpts, grpc_prometheus.WithServerHandlingTimeHistogram()) + } + + prometheusServerMetrics := grpc_prometheus.NewServerMetrics(prometheusServerMetricsOpts...) + prometheus.MustRegister(prometheusServerMetrics) + + return prometheusServerMetrics, nil + }, + }) + registry.Register(&plugin.Registration{ + Type: plugins.MetricsPlugin, + ID: "grpc-otel", + Requires: []plugin.Type{ + plugins.GRPCPlugin, + }, + InitFn: func(ic *plugin.InitContext) (any, error) { + return otelgrpc.NewServerHandler(), nil + }, + }) +} diff --git a/cmd/containerd/server/namespace.go b/plugins/server/grpc/namespace.go similarity index 99% rename from cmd/containerd/server/namespace.go rename to plugins/server/grpc/namespace.go index 654e7ea1e9..d264fc080b 100644 --- a/cmd/containerd/server/namespace.go +++ b/plugins/server/grpc/namespace.go @@ -14,7 +14,7 @@ limitations under the License. */ -package server +package grpc import ( "context" diff --git a/plugins/server/grpc/plugin.go b/plugins/server/grpc/plugin.go new file mode 100644 index 0000000000..7d4e83b8d2 --- /dev/null +++ b/plugins/server/grpc/plugin.go @@ -0,0 +1,292 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package grpc + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "net" + "os" + + "github.com/containerd/errdefs" + "github.com/containerd/log" + "github.com/containerd/plugin" + "github.com/containerd/plugin/registry" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/stats" + + "github.com/containerd/containerd/v2/defaults" + "github.com/containerd/containerd/v2/internal/wintls" + "github.com/containerd/containerd/v2/pkg/sys" + "github.com/containerd/containerd/v2/plugins" + "github.com/containerd/containerd/v2/plugins/server/internal" +) + +type config struct { + Address string `toml:"address"` + UID int `toml:"uid"` + GID int `toml:"gid"` + MaxRecvMsgSize int `toml:"max_recv_message_size"` + MaxSendMsgSize int `toml:"max_send_message_size"` +} + +type tcpConfig struct { + Address string `toml:"address"` + TLSCA string `toml:"tls_ca"` + TLSCert string `toml:"tls_cert"` + TLSKey string `toml:"tls_key"` + TLSCName string `toml:"tls_common_name"` + MaxRecvMsgSize int `toml:"max_recv_message_size"` + MaxSendMsgSize int `toml:"max_send_message_size"` +} + +func init() { + registry.Register(&plugin.Registration{ + Type: plugins.ServerPlugin, + ID: "grpc", + Requires: []plugin.Type{ + plugins.GRPCPlugin, + plugins.MetricsPlugin, + }, + Config: &config{ + Address: defaults.DefaultAddress, + MaxRecvMsgSize: defaults.DefaultMaxRecvMsgSize, + MaxSendMsgSize: defaults.DefaultMaxSendMsgSize, + }, + InitFn: func(ic *plugin.InitContext) (any, error) { + c := ic.Config.(*config) + if c.Address == "" { + return nil, fmt.Errorf("grpc address cannot be empty: %w", errdefs.ErrInvalidArgument) + } + + var ( + streamOpts = []grpc.StreamServerInterceptor{streamNamespaceInterceptor} + unaryOpts = []grpc.UnaryServerInterceptor{unaryNamespaceInterceptor} + serverOpts = []grpc.ServerOption{} + prometheusServerMetrics *grpc_prometheus.ServerMetrics // This should be grpc handler + ) + + if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-prometheus"); err == nil { + prometheusServerMetrics = p.(*grpc_prometheus.ServerMetrics) + streamOpts = append(streamOpts, prometheusServerMetrics.StreamServerInterceptor()) + unaryOpts = append(unaryOpts, prometheusServerMetrics.UnaryServerInterceptor()) + } + + if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-otel"); err == nil { + serverOpts = append(serverOpts, grpc.StatsHandler(p.(stats.Handler))) + } + + serverOpts = append(serverOpts, grpc.ChainStreamInterceptor(streamOpts...)) + serverOpts = append(serverOpts, grpc.ChainUnaryInterceptor(unaryOpts...)) + if c.MaxRecvMsgSize > 0 { + serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(c.MaxRecvMsgSize)) + } + if c.MaxSendMsgSize > 0 { + serverOpts = append(serverOpts, grpc.MaxSendMsgSize(c.MaxSendMsgSize)) + } + + // grpcService allows GRPC services to be registered with the underlying server + type grpcService interface { + Register(*grpc.Server) error + } + + s := grpc.NewServer(serverOpts...) + ps, err := ic.GetByType(plugins.GRPCPlugin) // ensure grpc plugin is initialized + if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) { + return nil, err + } + for _, p := range ps { + if gs, ok := p.(grpcService); ok { + if err := gs.Register(s); err != nil { + return nil, fmt.Errorf("failed to register grpc service: %w", err) + } + } + } + if prometheusServerMetrics != nil { + prometheusServerMetrics.InitializeMetrics(s) + } + return grpcServer{ + Server: s, + config: *c, + }, nil + }, + }) + + registry.Register(&plugin.Registration{ + Type: plugins.ServerPlugin, + ID: "grpc-tcp", + Requires: []plugin.Type{ + plugins.GRPCPlugin, + plugins.MetricsPlugin, + }, + Config: &tcpConfig{ + MaxRecvMsgSize: defaults.DefaultMaxRecvMsgSize, + MaxSendMsgSize: defaults.DefaultMaxSendMsgSize, + }, + InitFn: func(ic *plugin.InitContext) (any, error) { + c := ic.Config.(*tcpConfig) + if c.Address == "" { + return nil, plugin.ErrSkipPlugin + } + + var ( + streamOpts = []grpc.StreamServerInterceptor{streamNamespaceInterceptor} + unaryOpts = []grpc.UnaryServerInterceptor{unaryNamespaceInterceptor} + serverOpts = []grpc.ServerOption{} + prometheusServerMetrics *grpc_prometheus.ServerMetrics // This should be grpc handler + ) + + if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-prometheus"); err == nil { + prometheusServerMetrics = p.(*grpc_prometheus.ServerMetrics) + streamOpts = append(streamOpts, prometheusServerMetrics.StreamServerInterceptor()) + unaryOpts = append(unaryOpts, prometheusServerMetrics.UnaryServerInterceptor()) + prometheusServerMetrics.InitializeMetrics(nil) + } + + if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-otel"); err == nil { + serverOpts = append(serverOpts, grpc.StatsHandler(p.(stats.Handler))) + } + + serverOpts = append(serverOpts, grpc.ChainStreamInterceptor(streamOpts...)) + serverOpts = append(serverOpts, grpc.ChainUnaryInterceptor(unaryOpts...)) + + if c.MaxRecvMsgSize > 0 { + serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(c.MaxRecvMsgSize)) + } + if c.MaxSendMsgSize > 0 { + serverOpts = append(serverOpts, grpc.MaxSendMsgSize(c.MaxSendMsgSize)) + } + + if c.TLSCert != "" { + log.G(ic.Context).Info("setting up tls on tcp GRPC services...") + + tlsCert, err := tls.LoadX509KeyPair(c.TLSCert, c.TLSKey) + if err != nil { + return nil, err + } + tlsConfig := &tls.Config{Certificates: []tls.Certificate{tlsCert}} + + if c.TLSCA != "" { + caCertPool := x509.NewCertPool() + caCert, err := os.ReadFile(c.TLSCA) + if err != nil { + return nil, fmt.Errorf("failed to load CA file: %w", err) + } + caCertPool.AppendCertsFromPEM(caCert) + tlsConfig.ClientCAs = caCertPool + tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert + } + serverOpts = append(serverOpts, grpc.Creds(credentials.NewTLS(tlsConfig))) + } else if c.TLSCName != "" { + tlsConfig, CA, res, err := + wintls.SetupTLSFromWindowsCertStore(ic.Context, c.TLSCName) + if err != nil { + return nil, fmt.Errorf("failed to setup TLS from Windows cert store: %w", err) + } + // Cache resource for cleanup (Windows only) + setTLSResource(res) + if CA != nil { + tlsConfig.ClientCAs = CA + tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert + } + serverOpts = append(serverOpts, grpc.Creds(credentials.NewTLS(tlsConfig))) + } + + // tcpService allows GRPC services to be registered with the underlying tcp server + type tcpService interface { + RegisterTCP(*grpc.Server) error + } + + s := grpc.NewServer(serverOpts...) + ps, err := ic.GetByType(plugins.GRPCPlugin) // ensure grpc plugin is initialized + if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) { + return nil, err + } + var hasService bool + for _, p := range ps { + if gs, ok := p.(tcpService); ok { + if err := gs.RegisterTCP(s); err != nil { + return nil, fmt.Errorf("failed to register grpc service: %w", err) + } + hasService = true + } + } + if !hasService { + return nil, fmt.Errorf("no tcp grpc services configured: %w", plugin.ErrSkipPlugin) + } + if prometheusServerMetrics != nil { + prometheusServerMetrics.InitializeMetrics(s) + } + return tcpServer{ + Server: s, + config: *c, + }, nil + }, + }) +} + +type grpcServer struct { + *grpc.Server + config config +} + +func (s grpcServer) Start(ctx context.Context) error { + log.G(ctx).WithField("address", s.config.Address).WithField("uid", s.config.UID).WithField("gid", s.config.GID).Info("starting GRPC server") + l, err := sys.GetLocalListener(s.config.Address, s.config.UID, s.config.GID) + if err != nil { + return fmt.Errorf("failed to get listener for main endpoint: %w", err) + } + + internal.Serve(ctx, l, s.Serve) + + return nil +} +func (s grpcServer) Close() error { + s.Stop() + + return nil +} + +type tcpServer struct { + *grpc.Server + config tcpConfig +} + +func (s tcpServer) Start(ctx context.Context) error { + l, err := net.Listen("tcp", s.config.Address) + if err != nil { + return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err) + } + + internal.Serve(ctx, l, s.Serve) + + return nil +} + +func (s tcpServer) Close() error { + s.Stop() + + // Clean up TLS resources (Windows only) + cleanupTLSResources() + + return nil +} diff --git a/plugins/server/grpc/tls_other.go b/plugins/server/grpc/tls_other.go new file mode 100644 index 0000000000..0221f2ca1a --- /dev/null +++ b/plugins/server/grpc/tls_other.go @@ -0,0 +1,28 @@ +//go:build !windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package grpc + +import ( + "github.com/containerd/containerd/v2/internal/wintls" +) + +// TLS resource helpers are no-ops on other unsupported platforms. +func setTLSResource(r wintls.CertResource) {} + +func cleanupTLSResources() {} diff --git a/cmd/containerd/server/server_windows.go b/plugins/server/grpc/tls_windows.go similarity index 72% rename from cmd/containerd/server/server_windows.go rename to plugins/server/grpc/tls_windows.go index 6a415650b8..f9e8c3cd1c 100644 --- a/cmd/containerd/server/server_windows.go +++ b/plugins/server/grpc/tls_windows.go @@ -14,16 +14,11 @@ limitations under the License. */ -package server +package grpc import ( - "context" - - srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config" "github.com/containerd/containerd/v2/internal/wintls" "github.com/containerd/log" - "github.com/containerd/otelttrpc" - "github.com/containerd/ttrpc" ) // tlsResource holds Windows-specific TLS resources for cleanup on Stop. @@ -41,14 +36,3 @@ func cleanupTLSResources() { tlsResource = nil } } - -// Windows-specific apply and TTRPC server constructor -func apply(_ context.Context, _ *srvconfig.Config) error { - return nil -} - -func newTTRPCServer() (*ttrpc.Server, error) { - return ttrpc.NewServer( - ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()), - ) -} diff --git a/plugins/server/internal/serve.go b/plugins/server/internal/serve.go index 1ab7de7e22..444f2e55b7 100644 --- a/plugins/server/internal/serve.go +++ b/plugins/server/internal/serve.go @@ -20,8 +20,10 @@ import ( "context" "errors" "net" + "net/http" "github.com/containerd/log" + "github.com/containerd/ttrpc" ) func Serve(ctx context.Context, l net.Listener, serveFunc func(net.Listener) error) { @@ -30,7 +32,7 @@ func Serve(ctx context.Context, l net.Listener, serveFunc func(net.Listener) err go func() { defer l.Close() - if err := serveFunc(l); err != nil && !errors.Is(err, net.ErrClosed) { + if err := serveFunc(l); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, ttrpc.ErrServerClosed) { log.G(ctx).WithError(err).WithField("address", path).Fatal("serve failure") } }() diff --git a/plugins/server/ttrpc/plugin.go b/plugins/server/ttrpc/plugin.go new file mode 100644 index 0000000000..8ae566fc67 --- /dev/null +++ b/plugins/server/ttrpc/plugin.go @@ -0,0 +1,128 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package ttrpc + +import ( + "context" + "errors" + "fmt" + "net" + + "github.com/containerd/containerd/v2/defaults" + "github.com/containerd/containerd/v2/pkg/sys" + "github.com/containerd/containerd/v2/plugins" + "github.com/containerd/containerd/v2/plugins/server/internal" + "github.com/containerd/plugin" + "github.com/containerd/plugin/registry" + "github.com/containerd/ttrpc" +) + +type config struct { + Address string `toml:"address"` + UID int `toml:"uid"` + GID int `toml:"gid"` +} + +func (c *config) GetAddress() string { + return c.Address +} + +func (c *config) SetAddress(s string) { + c.Address = s +} + +func init() { + // Keep the default the same for compatibility + defaultAddress := defaults.DefaultAddress + ".ttrpc" + registry.Register(&plugin.Registration{ + Type: plugins.ServerPlugin, + ID: "ttrpc", + Requires: []plugin.Type{ + plugins.TTRPCPlugin, + plugins.GRPCPlugin, + }, + Config: &config{ + Address: defaultAddress, + }, + InitFn: func(ic *plugin.InitContext) (any, error) { + c := ic.Config.(*config) + if c.Address == "" { + return nil, plugin.ErrSkipPlugin + } + + s, err := newTTRPCServer() + if err != nil { + return nil, err + } + // ttrpcService allows TTRPC services to be registered with the underlying server + type ttrpcService interface { + RegisterTTRPC(*ttrpc.Server) error + } + var hasService bool + ps, err := ic.GetByType(plugins.TTRPCPlugin) // ensure grpc plugin is initialized + if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) { + return nil, err + } + for _, p := range ps { + if gs, ok := p.(ttrpcService); ok { + if err := gs.RegisterTTRPC(s); err != nil { + return nil, fmt.Errorf("failed to register ttrpc service: %w", err) + } + hasService = true + } + } + ps, err = ic.GetByType(plugins.GRPCPlugin) // ensure grpc plugin is initialized + if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) { + return nil, err + } + for _, p := range ps { + if gs, ok := p.(ttrpcService); ok { + if err := gs.RegisterTTRPC(s); err != nil { + return nil, fmt.Errorf("failed to register ttrpc service: %w", err) + } + hasService = true + } + } + if !hasService { + return nil, fmt.Errorf("no ttrpc services configured: %w", plugin.ErrSkipPlugin) + } + return server{ + Server: s, + config: *c, + }, nil + }, + }) +} + +type server struct { + *ttrpc.Server + config config +} + +func (s server) Start(ctx context.Context) error { + // setup the ttrpc endpoint + tl, err := sys.GetLocalListener(s.config.Address, s.config.UID, s.config.GID) + if err != nil { + return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err) + } + + internal.Serve(ctx, tl, func(l net.Listener) error { + return s.Serve(context.WithoutCancel(ctx), l) + }) + + return nil +} diff --git a/cmd/containerd/server/server_solaris.go b/plugins/server/ttrpc/server_linux.go similarity index 65% rename from cmd/containerd/server/server_solaris.go rename to plugins/server/ttrpc/server_linux.go index 416d3cdeed..6f4f05cc75 100644 --- a/cmd/containerd/server/server_solaris.go +++ b/plugins/server/ttrpc/server_linux.go @@ -14,28 +14,16 @@ limitations under the License. */ -package server +package ttrpc import ( - "context" - - srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config" - "github.com/containerd/containerd/v2/internal/wintls" "github.com/containerd/otelttrpc" "github.com/containerd/ttrpc" ) -func apply(_ context.Context, _ *srvconfig.Config) error { - return nil -} - -// TLS resource helpers are no-ops on Solaris. -func setTLSResource(r wintls.CertResource) {} -func cleanupTLSResources() {} - -// newTTRPCServer provides the ttrpc server for Solaris builds. func newTTRPCServer() (*ttrpc.Server, error) { return ttrpc.NewServer( + ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()), ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()), ) } diff --git a/plugins/server/ttrpc/server_otel.go b/plugins/server/ttrpc/server_otel.go new file mode 100644 index 0000000000..56351961c7 --- /dev/null +++ b/plugins/server/ttrpc/server_otel.go @@ -0,0 +1,31 @@ +//go:build windows || solaris + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package ttrpc + +import ( + "github.com/containerd/otelttrpc" + "github.com/containerd/ttrpc" +) + +// ttrpc server with otel interceptor +func newTTRPCServer() (*ttrpc.Server, error) { + return ttrpc.NewServer( + ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()), + ) +} diff --git a/plugins/server/ttrpc/server_other.go b/plugins/server/ttrpc/server_other.go new file mode 100644 index 0000000000..ada8f69777 --- /dev/null +++ b/plugins/server/ttrpc/server_other.go @@ -0,0 +1,27 @@ +//go:build !linux && !windows && !solaris + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package ttrpc + +import ( + "github.com/containerd/ttrpc" +) + +func newTTRPCServer() (*ttrpc.Server, error) { + return ttrpc.NewServer() +} diff --git a/plugins/types.go b/plugins/types.go index 76e2de10d4..1f67d00074 100644 --- a/plugins/types.go +++ b/plugins/types.go @@ -57,6 +57,8 @@ const ( StreamingPlugin plugin.Type = "io.containerd.streaming.v1" // TracingProcessorPlugin implements an open telemetry span processor TracingProcessorPlugin plugin.Type = "io.containerd.tracing.processor.v1" + // MetricsPlugin implements a metrics handler + MetricsPlugin plugin.Type = "io.containerd.metrics.v1" // NRIApiPlugin implements the NRI adaptation interface for containerd. NRIApiPlugin plugin.Type = "io.containerd.nri.v1" // TransferPlugin implements a transfer service