Add server plugins for grpc and ttrpc

Migrate configuration and move server initialization

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2025-11-21 23:39:54 -08:00
parent 06c23567c0
commit c15ec24857
19 changed files with 777 additions and 291 deletions

View File

@@ -30,6 +30,8 @@ import (
_ "github.com/containerd/containerd/v2/plugins/restart"
_ "github.com/containerd/containerd/v2/plugins/sandbox"
_ "github.com/containerd/containerd/v2/plugins/server/debug"
_ "github.com/containerd/containerd/v2/plugins/server/grpc"
_ "github.com/containerd/containerd/v2/plugins/server/ttrpc"
_ "github.com/containerd/containerd/v2/plugins/services/containers"
_ "github.com/containerd/containerd/v2/plugins/services/content"
_ "github.com/containerd/containerd/v2/plugins/services/diff"

View File

@@ -125,18 +125,20 @@ func dumpConfig(cliContext *cli.Context) error {
func platformAgnosticDefaultConfig() *srvconfig.Config {
return &srvconfig.Config{
Version: version.ConfigVersion,
Root: defaults.DefaultRootDir,
State: defaults.DefaultStateDir,
GRPC: srvconfig.GRPCConfig{
Address: defaults.DefaultAddress,
MaxRecvMsgSize: defaults.DefaultMaxRecvMsgSize,
MaxSendMsgSize: defaults.DefaultMaxSendMsgSize,
},
Version: version.ConfigVersion,
Root: defaults.DefaultRootDir,
State: defaults.DefaultStateDir,
DisabledPlugins: []string{},
RequiredPlugins: []string{},
StreamProcessors: streamProcessors(),
Imports: []string{defaults.DefaultConfigIncludePattern},
Plugins: map[string]any{
"io.containerd.server.v1.grpc": map[string]any{
"address": defaults.DefaultAddress,
"max_recv_message_size": defaults.DefaultMaxRecvMsgSize,
"max_send_message_size": defaults.DefaultMaxSendMsgSize,
},
},
}
}

View File

@@ -34,7 +34,6 @@ import (
_ "github.com/containerd/containerd/v2/core/metrics" // import containerd build info
"github.com/containerd/containerd/v2/core/mount"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/pkg/sys"
"github.com/containerd/containerd/v2/pkg/tracing"
"github.com/containerd/containerd/v2/version"
"github.com/containerd/errdefs"
@@ -165,16 +164,6 @@ can be used and modified as necessary as a custom configuration.`
return err
}
if config.GRPC.Address == "" {
return fmt.Errorf("grpc address cannot be empty: %w", errdefs.ErrInvalidArgument)
}
if config.TTRPC.Address == "" {
// If TTRPC was not explicitly configured, use defaults based on GRPC.
config.TTRPC.Address = config.GRPC.Address + ".ttrpc"
config.TTRPC.UID = config.GRPC.UID
config.TTRPC.GID = config.GRPC.GID
}
// Make sure top-level directories are created early.
if err := server.CreateTopLevelDirectories(config); err != nil {
return err
@@ -229,6 +218,7 @@ can be used and modified as necessary as a custom configuration.`
go func() {
defer close(chsrv)
// TODO: When to set grpc address from flag? Migration should be done first
server, err := server.New(ctx, config)
if err != nil {
select {
@@ -270,6 +260,8 @@ can be used and modified as necessary as a custom configuration.`
if err := server.Start(ctx); err != nil {
return err
}
// TODO: Move to server.Start(ctx) error
if config.Metrics.Address != "" {
l, err := net.Listen("tcp", config.Metrics.Address)
if err != nil {
@@ -277,26 +269,8 @@ can be used and modified as necessary as a custom configuration.`
}
serve(ctx, l, server.ServeMetrics)
}
// setup the ttrpc endpoint
tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)
if err != nil {
return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err)
}
serve(ctx, tl, server.ServeTTRPC)
if config.GRPC.TCPAddress != "" {
l, err := net.Listen("tcp", config.GRPC.TCPAddress)
if err != nil {
return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err)
}
serve(ctx, l, server.ServeTCP)
}
// setup the main grpc endpoint
l, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID)
if err != nil {
return fmt.Errorf("failed to get listener for main endpoint: %w", err)
}
serve(ctx, l, server.ServeGRPC)
// end s.Start
readyC := make(chan struct{})
go func() {
@@ -351,10 +325,6 @@ func applyFlags(cliContext *cli.Context, config *srvconfig.Config) error {
name: "state",
d: &config.State,
},
{
name: "address",
d: &config.GRPC.Address,
},
} {
if s := cliContext.String(v.name); s != "" {
*v.d = s
@@ -368,6 +338,35 @@ func applyFlags(cliContext *cli.Context, config *srvconfig.Config) error {
}
}
if s := cliContext.String("address"); s != "" {
var (
grpcConfig map[string]any
ttrpcConfig map[string]any
)
v, ok := config.Plugins["io.containerd.server.v1.grpc"]
if !ok {
grpcConfig = make(map[string]any)
} else if grpcConfig, ok = v.(map[string]any); !ok {
return fmt.Errorf("grpc plugin has invalid configuration: %w", errdefs.ErrInvalidArgument)
}
grpcConfig["address"] = s
config.Plugins["io.containerd.server.v1.grpc"] = grpcConfig
_, ok = config.Plugins["io.containerd.server.v1.ttrpc"]
if !ok {
ttrpcConfig = map[string]any{
"address": s + ".ttrpc",
}
if uid, ok := grpcConfig["uid"]; ok {
ttrpcConfig["uid"] = uid
}
if gid, ok := grpcConfig["gid"]; ok {
ttrpcConfig["gid"] = gid
}
config.Plugins["io.containerd.server.v1.ttrpc"] = ttrpcConfig
}
}
applyPlatformFlags(cliContext)
return nil

View File

@@ -65,10 +65,6 @@ type Config struct {
State string `toml:"state"`
// TempDir is the path to a directory where to place containerd temporary files
TempDir string `toml:"temp"`
// GRPC configuration settings
GRPC GRPCConfig `toml:"grpc"`
// TTRPC configuration settings
TTRPC TTRPCConfig `toml:"ttrpc"`
// Debug log settings
Debug Debug `toml:"debug"`
// Metrics and monitoring settings
@@ -95,6 +91,19 @@ type Config struct {
Imports []string `toml:"imports"`
// StreamProcessors configuration
StreamProcessors map[string]StreamProcessor `toml:"stream_processors"`
// Deprecated fields must remain but should not be output when generating default or migrated configs.
// These fields are automatically migrated to the corresponding server plugin
// configuration blocks on startup (see serviceMigrate). In version 4 configs,
// server settings are configured directly under [plugins."<plugin-id>"].
// Deprecated: use server plugins io.containerd.server.v1.grpc and io.containerd.server.v1.grpc-tcp
GRPC GRPCConfig `toml:"grpc,omitempty"`
// Deprecated: use server plugin io.containerd.server.v1.ttrpc.
// In configs prior to version 4, an unset TTRPC address is derived from
// the GRPC address (grpcAddress + ".ttrpc") and inherits GRPC's UID/GID.
// In version 4, the TTRPC plugin uses its own defaults independently.
TTRPC TTRPCConfig `toml:"ttrpc,omitempty"`
}
// StreamProcessor provides configuration for diff content processors
@@ -206,6 +215,15 @@ func v1Migrate(ctx context.Context, c *Config) error {
return nil
}
// serviceMigrate moves server properties (GRPC, TTRPC, Debug, Metrics) from
// top-level config fields into their corresponding plugin configuration blocks
// for version 4.
//
// For configs prior to version 4, if the TTRPC address is not explicitly set
// it is derived from the GRPC address (grpcAddress + ".ttrpc") and inherits
// GRPC's UID/GID. In version 4, each server plugin is independently
// configured; the TTRPC plugin will use its own default address if its
// plugin block is omitted, regardless of the GRPC plugin's address.
func serviceMigrate(ctx context.Context, c *Config) error {
if c.Debug.Address != "" && c.Plugins["io.containerd.server.v1.debug"] == nil {
c.Plugins["io.containerd.server.v1.debug"] = map[string]any{
@@ -217,28 +235,94 @@ func serviceMigrate(ctx context.Context, c *Config) error {
c.Debug.UID = 0
c.Debug.GID = 0
}
// Capture legacy GRPC values up front so both grpc and grpc-tcp
// migrations see the same values, and so migrations that don't key on
// an address (uid/gid/max message sizes) are not dropped.
grpcAddress := c.GRPC.Address
grpcUID := c.GRPC.UID
grpcGID := c.GRPC.GID
grpcMaxRecv := c.GRPC.MaxRecvMsgSize
grpcMaxSend := c.GRPC.MaxSendMsgSize
grpcHasLegacy := grpcAddress != "" || grpcUID != 0 || grpcGID != 0 || grpcMaxRecv != 0 || grpcMaxSend != 0
if grpcHasLegacy && c.Plugins["io.containerd.server.v1.grpc"] == nil {
c.Plugins["io.containerd.server.v1.grpc"] = map[string]any{
"address": grpcAddress,
"uid": grpcUID,
"gid": grpcGID,
"max_recv_message_size": grpcMaxRecv,
"max_send_message_size": grpcMaxSend,
}
}
if c.GRPC.TCPAddress != "" && c.Plugins["io.containerd.server.v1.grpc-tcp"] == nil {
c.Plugins["io.containerd.server.v1.grpc-tcp"] = map[string]any{
"address": c.GRPC.TCPAddress,
"tls_ca": c.GRPC.TCPTLSCA,
"tls_cert": c.GRPC.TCPTLSCert,
"tls_key": c.GRPC.TCPTLSKey,
"tls_common_name": c.GRPC.TCPTLSCName,
"max_recv_message_size": grpcMaxRecv,
"max_send_message_size": grpcMaxSend,
}
}
if grpcHasLegacy || c.GRPC.TCPAddress != "" {
c.GRPC = GRPCConfig{}
}
if c.Plugins["io.containerd.server.v1.ttrpc"] == nil {
ttrpcAddress := c.TTRPC.Address
ttrpcUID := c.TTRPC.UID
ttrpcGID := c.TTRPC.GID
if ttrpcAddress == "" && grpcAddress != "" {
ttrpcAddress = grpcAddress + ".ttrpc"
if ttrpcUID == 0 {
ttrpcUID = grpcUID
}
if ttrpcGID == 0 {
ttrpcGID = grpcGID
}
}
if ttrpcAddress != "" || ttrpcUID != 0 || ttrpcGID != 0 {
c.Plugins["io.containerd.server.v1.ttrpc"] = map[string]any{
"address": ttrpcAddress,
"uid": ttrpcUID,
"gid": ttrpcGID,
}
c.TTRPC.Address = ""
c.TTRPC.UID = 0
c.TTRPC.GID = 0
}
}
return nil
}
// GRPCConfig provides GRPC configuration for the socket
type GRPCConfig struct {
Address string `toml:"address"`
TCPAddress string `toml:"tcp_address"`
TCPTLSCA string `toml:"tcp_tls_ca"`
TCPTLSCert string `toml:"tcp_tls_cert"`
TCPTLSKey string `toml:"tcp_tls_key"`
UID int `toml:"uid"`
GID int `toml:"gid"`
MaxRecvMsgSize int `toml:"max_recv_message_size"`
MaxSendMsgSize int `toml:"max_send_message_size"`
TCPTLSCName string `toml:"tcp_tls_common_name"`
// Deprecated: use server plugin io.containerd.server.v1.grpc
Address string `toml:"address,omitempty"`
// Deprecated: use server plugin io.containerd.server.v1.grpc-tcp
TCPAddress string `toml:"tcp_address,omitempty"`
// Deprecated: use server plugin io.containerd.server.v1.grpc-tcp
TCPTLSCA string `toml:"tcp_tls_ca,omitempty"`
// Deprecated: use server plugin io.containerd.server.v1.grpc-tcp
TCPTLSCert string `toml:"tcp_tls_cert,omitempty"`
// Deprecated: use server plugin io.containerd.server.v1.grpc-tcp
TCPTLSKey string `toml:"tcp_tls_key,omitempty"`
// Deprecated: use server plugin io.containerd.server.v1.grpc
UID int `toml:"uid,omitempty"`
// Deprecated: use server plugin io.containerd.server.v1.grpc
GID int `toml:"gid,omitempty"`
// Deprecated: use server plugin io.containerd.server.v1.grpc
MaxRecvMsgSize int `toml:"max_recv_message_size,omitempty"`
// Deprecated: use server plugin io.containerd.server.v1.grpc
MaxSendMsgSize int `toml:"max_send_message_size,omitempty"`
// Deprecated: use server plugin io.containerd.server.v1.grpc-tcp
TCPTLSCName string `toml:"tcp_tls_common_name,omitempty"`
}
// TTRPCConfig provides TTRPC configuration for the socket
type TTRPCConfig struct {
Address string `toml:"address"`
UID int `toml:"uid"`
GID int `toml:"gid"`
Address string `toml:"address,omitempty"`
UID int `toml:"uid,omitempty"`
GID int `toml:"gid,omitempty"`
}
// Debug provides debug configuration
@@ -258,8 +342,9 @@ type Debug struct {
// MetricsConfig provides metrics configuration
type MetricsConfig struct {
Address string `toml:"address"`
GRPCHistogram bool `toml:"grpc_histogram"`
Address string `toml:"address"`
// Deprecated: use metrics plugin io.containerd.metrics.v1.grpc-prometheus
GRPCHistogram bool `toml:"grpc_histogram,omitempty"`
}
// CgroupConfig provides cgroup configuration

View File

@@ -18,8 +18,6 @@ package server
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
@@ -33,15 +31,11 @@ import (
"time"
"github.com/containerd/log"
"github.com/containerd/ttrpc"
"github.com/docker/go-metrics"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
diffapi "github.com/containerd/containerd/api/services/diff/v1"
@@ -58,7 +52,6 @@ import (
sbproxy "github.com/containerd/containerd/v2/core/sandbox/proxy"
ssproxy "github.com/containerd/containerd/v2/core/snapshots/proxy"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/internal/wintls"
"github.com/containerd/containerd/v2/pkg/dialer"
"github.com/containerd/containerd/v2/pkg/sys"
"github.com/containerd/containerd/v2/pkg/timeout"
@@ -146,105 +139,21 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
for id, p := range config.StreamProcessors {
diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args, p.Env))
}
var prometheusServerMetricsOpts []grpc_prometheus.ServerMetricsOption
if config.Metrics.GRPCHistogram {
// Enable grpc time histograms to measure rpc latencies
prometheusServerMetricsOpts = append(prometheusServerMetricsOpts, grpc_prometheus.WithServerHandlingTimeHistogram())
}
prometheusServerMetrics := grpc_prometheus.NewServerMetrics(prometheusServerMetricsOpts...)
prometheus.MustRegister(prometheusServerMetrics)
serverOpts := []grpc.ServerOption{
grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.ChainStreamInterceptor(
streamNamespaceInterceptor,
prometheusServerMetrics.StreamServerInterceptor(),
),
grpc.ChainUnaryInterceptor(
unaryNamespaceInterceptor,
prometheusServerMetrics.UnaryServerInterceptor(),
),
}
if config.GRPC.MaxRecvMsgSize > 0 {
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize))
}
if config.GRPC.MaxSendMsgSize > 0 {
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize))
}
ttrpcServer, err := newTTRPCServer()
if err != nil {
return nil, err
}
tcpServerOpts := serverOpts
if config.GRPC.TCPTLSCert != "" {
log.G(ctx).Info("setting up tls on tcp GRPC services...")
tlsCert, err := tls.LoadX509KeyPair(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey)
if err != nil {
return nil, err
}
tlsConfig := &tls.Config{Certificates: []tls.Certificate{tlsCert}}
if config.GRPC.TCPTLSCA != "" {
caCertPool := x509.NewCertPool()
caCert, err := os.ReadFile(config.GRPC.TCPTLSCA)
if err != nil {
return nil, fmt.Errorf("failed to load CA file: %w", err)
}
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.ClientCAs = caCertPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
}
tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
} else if config.GRPC.TCPTLSCName != "" {
tlsConfig, CA, res, err :=
wintls.SetupTLSFromWindowsCertStore(ctx, config.GRPC.TCPTLSCName)
if err != nil {
return nil, fmt.Errorf("failed to setup TLS from Windows cert store: %w", err)
}
// Cache resource for cleanup (Windows only)
setTLSResource(res)
if CA != nil {
tlsConfig.ClientCAs = CA
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
}
tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
// grpcService allows GRPC services to be registered with the underlying server
type grpcService interface {
Register(*grpc.Server) error
}
// tcpService allows GRPC services to be registered with the underlying tcp server
type tcpService interface {
RegisterTCP(*grpc.Server) error
}
// ttrpcService allows TTRPC services to be registered with the underlying server
type ttrpcService interface {
RegisterTTRPC(*ttrpc.Server) error
}
var (
grpcServer = grpc.NewServer(serverOpts...)
tcpServer = grpc.NewServer(tcpServerOpts...)
grpcServices []grpcService
tcpServices []tcpService
ttrpcServices []ttrpcService
s = &Server{
prometheusServerMetrics: prometheusServerMetrics,
grpcServer: grpcServer,
tcpServer: tcpServer,
ttrpcServer: ttrpcServer,
config: config,
s = &Server{
config: config,
}
initialized = plugin.NewPluginSet()
required = make(map[string]struct{})
initialized = plugin.NewPluginSet()
required = make(map[string]struct{})
grpcAddress = readString(config.Plugins, "io.containerd.server.v1.grpc", "address")
ttrpcAddress = readString(config.Plugins, "io.containerd.server.v1.ttrpc", "address")
)
if grpcAddress == "" {
grpcAddress = defaults.DefaultAddress
}
if ttrpcAddress == "" {
ttrpcAddress = defaults.DefaultAddress + ".ttrpc"
}
for _, r := range config.RequiredPlugins {
required[r] = struct{}{}
}
@@ -260,8 +169,8 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
map[string]string{
plugins.PropertyRootDir: filepath.Join(config.Root, id),
plugins.PropertyStateDir: filepath.Join(config.State, id),
plugins.PropertyGRPCAddress: config.GRPC.Address,
plugins.PropertyTTRPCAddress: config.TTRPC.Address,
plugins.PropertyGRPCAddress: grpcAddress,
plugins.PropertyTTRPCAddress: ttrpcAddress,
},
)
initContext.RegisterReadiness = func() func() {
@@ -309,17 +218,6 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
}
}
// check for grpc services that should be registered with the server
if src, ok := instance.(grpcService); ok {
grpcServices = append(grpcServices, src)
}
if src, ok := instance.(ttrpcService); ok {
ttrpcServices = append(ttrpcServices, src)
}
if service, ok := instance.(tcpService); ok {
tcpServices = append(tcpServices, service)
}
s.plugins = append(s.plugins, result)
}
if len(required) != 0 {
@@ -330,23 +228,6 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
return nil, fmt.Errorf("required plugin %s not included", missing)
}
// register services after all plugins have been initialized
for _, service := range grpcServices {
if err := service.Register(grpcServer); err != nil {
return nil, err
}
}
for _, service := range ttrpcServices {
if err := service.RegisterTTRPC(ttrpcServer); err != nil {
return nil, err
}
}
for _, service := range tcpServices {
if err := service.RegisterTCP(tcpServer); err != nil {
return nil, err
}
}
recordConfigDeprecations(ctx, config, initialized)
return s, nil
}
@@ -380,25 +261,10 @@ type server interface {
// Server is the containerd main daemon
type Server struct {
prometheusServerMetrics *grpc_prometheus.ServerMetrics
grpcServer *grpc.Server
ttrpcServer *ttrpc.Server
tcpServer *grpc.Server
servers []server
config *srvconfig.Config
plugins []*plugin.Plugin
ready sync.WaitGroup
}
// ServeGRPC provides the containerd grpc APIs on the provided listener
func (s *Server) ServeGRPC(l net.Listener) error {
s.prometheusServerMetrics.InitializeMetrics(s.grpcServer)
return trapClosedConnErr(s.grpcServer.Serve(l))
}
// ServeTTRPC provides the containerd ttrpc APIs on the provided listener
func (s *Server) ServeTTRPC(l net.Listener) error {
return trapClosedConnErr(s.ttrpcServer.Serve(context.Background(), l))
servers []server
config *srvconfig.Config
plugins []*plugin.Plugin
ready sync.WaitGroup
}
// ServeMetrics provides a prometheus endpoint for exposing metrics
@@ -412,12 +278,6 @@ func (s *Server) ServeMetrics(l net.Listener) error {
return trapClosedConnErr(srv.Serve(l))
}
// ServeTCP allows services to serve over tcp
func (s *Server) ServeTCP(l net.Listener) error {
s.prometheusServerMetrics.InitializeMetrics(s.tcpServer)
return trapClosedConnErr(s.tcpServer.Serve(l))
}
// Start the services, this will normally start listening on sockets
// and serving requests.
func (s *Server) Start(ctx context.Context) error {
@@ -431,9 +291,6 @@ func (s *Server) Start(ctx context.Context) error {
// Stop the containerd server canceling any open connections
func (s *Server) Stop() {
s.grpcServer.Stop()
// Clean up TLS resources (Windows only)
cleanupTLSResources()
for i := len(s.plugins) - 1; i >= 0; i-- {
p := s.plugins[i]
instance, err := p.Instance()
@@ -580,6 +437,31 @@ func (pc *proxyClients) getClient(address string) (*grpc.ClientConn, error) {
return conn, nil
}
func readString(properties map[string]any, key ...string) string {
for i, k := range key {
v, ok := properties[k]
if !ok {
return ""
}
if i < len(key)-1 {
properties, ok = v.(map[string]any)
if !ok {
return ""
}
continue
}
switch v := v.(type) {
case string:
return v
case fmt.Stringer:
return v.String()
case []byte:
return string(v)
}
}
return ""
}
func trapClosedConnErr(err error) error {
if err == nil || errors.Is(err, net.ErrClosed) {
return nil

View File

@@ -24,11 +24,8 @@ import (
cgroup1 "github.com/containerd/cgroups/v3/cgroup1"
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
"github.com/containerd/containerd/v2/internal/wintls"
"github.com/containerd/containerd/v2/pkg/sys"
"github.com/containerd/log"
"github.com/containerd/otelttrpc"
"github.com/containerd/ttrpc"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
@@ -66,14 +63,3 @@ func apply(ctx context.Context, config *srvconfig.Config) error {
}
return nil
}
func newTTRPCServer() (*ttrpc.Server, error) {
return ttrpc.NewServer(
ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()),
ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()),
)
}
// TLS resource helpers are no-ops on Linux.
func setTLSResource(r wintls.CertResource) {}
func cleanupTLSResources() {}

View File

@@ -1,4 +1,4 @@
//go:build !linux && !windows && !solaris
//go:build !linux
/*
Copyright The containerd Authors.
@@ -22,18 +22,8 @@ import (
"context"
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
"github.com/containerd/containerd/v2/internal/wintls"
"github.com/containerd/ttrpc"
)
func apply(_ context.Context, _ *srvconfig.Config) error {
return nil
}
func newTTRPCServer() (*ttrpc.Server, error) {
return ttrpc.NewServer()
}
// TLS resource helpers are no-ops on other unsupported platforms.
func setTLSResource(r wintls.CertResource) {}
func cleanupTLSResources() {}

View File

@@ -24,9 +24,7 @@ import (
"github.com/containerd/containerd/v2/cmd/containerd/server"
"github.com/containerd/containerd/v2/cmd/containerd/server/config"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/pkg/sys"
"github.com/containerd/containerd/v2/version"
"github.com/containerd/log"
)
const (
@@ -71,19 +69,12 @@ func startDaemon() {
return
}
l, err := sys.GetLocalListener(srvconfig.GRPC.Address, srvconfig.GRPC.UID, srvconfig.GRPC.GID)
if err != nil {
defer server.Stop()
if err := server.Start(ctx); err != nil {
errC <- err
return
}
go func() {
defer l.Close()
if err := server.ServeGRPC(l); err != nil {
log.G(ctx).WithError(err).WithField("address", srvconfig.GRPC.Address).Fatal("serve failure")
}
}()
server.Wait()
}()

View File

@@ -0,0 +1,67 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grpc
import (
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"github.com/containerd/containerd/v2/plugins"
)
type metricsConfig struct {
GRPCHistogram bool `toml:"grpc_histogram"`
}
func init() {
registry.Register(&plugin.Registration{
Type: plugins.MetricsPlugin,
ID: "grpc-prometheus",
Requires: []plugin.Type{
plugins.GRPCPlugin,
},
Config: &metricsConfig{
GRPCHistogram: false,
},
InitFn: func(ic *plugin.InitContext) (any, error) {
c := ic.Config.(*metricsConfig)
var prometheusServerMetricsOpts []grpc_prometheus.ServerMetricsOption
if c.GRPCHistogram {
// Enable grpc time histograms to measure rpc latencies
prometheusServerMetricsOpts = append(prometheusServerMetricsOpts, grpc_prometheus.WithServerHandlingTimeHistogram())
}
prometheusServerMetrics := grpc_prometheus.NewServerMetrics(prometheusServerMetricsOpts...)
prometheus.MustRegister(prometheusServerMetrics)
return prometheusServerMetrics, nil
},
})
registry.Register(&plugin.Registration{
Type: plugins.MetricsPlugin,
ID: "grpc-otel",
Requires: []plugin.Type{
plugins.GRPCPlugin,
},
InitFn: func(ic *plugin.InitContext) (any, error) {
return otelgrpc.NewServerHandler(), nil
},
})
}

View File

@@ -14,7 +14,7 @@
limitations under the License.
*/
package server
package grpc
import (
"context"

View File

@@ -0,0 +1,292 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grpc
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net"
"os"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/stats"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/internal/wintls"
"github.com/containerd/containerd/v2/pkg/sys"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/plugins/server/internal"
)
type config struct {
Address string `toml:"address"`
UID int `toml:"uid"`
GID int `toml:"gid"`
MaxRecvMsgSize int `toml:"max_recv_message_size"`
MaxSendMsgSize int `toml:"max_send_message_size"`
}
type tcpConfig struct {
Address string `toml:"address"`
TLSCA string `toml:"tls_ca"`
TLSCert string `toml:"tls_cert"`
TLSKey string `toml:"tls_key"`
TLSCName string `toml:"tls_common_name"`
MaxRecvMsgSize int `toml:"max_recv_message_size"`
MaxSendMsgSize int `toml:"max_send_message_size"`
}
func init() {
registry.Register(&plugin.Registration{
Type: plugins.ServerPlugin,
ID: "grpc",
Requires: []plugin.Type{
plugins.GRPCPlugin,
plugins.MetricsPlugin,
},
Config: &config{
Address: defaults.DefaultAddress,
MaxRecvMsgSize: defaults.DefaultMaxRecvMsgSize,
MaxSendMsgSize: defaults.DefaultMaxSendMsgSize,
},
InitFn: func(ic *plugin.InitContext) (any, error) {
c := ic.Config.(*config)
if c.Address == "" {
return nil, fmt.Errorf("grpc address cannot be empty: %w", errdefs.ErrInvalidArgument)
}
var (
streamOpts = []grpc.StreamServerInterceptor{streamNamespaceInterceptor}
unaryOpts = []grpc.UnaryServerInterceptor{unaryNamespaceInterceptor}
serverOpts = []grpc.ServerOption{}
prometheusServerMetrics *grpc_prometheus.ServerMetrics // This should be grpc handler
)
if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-prometheus"); err == nil {
prometheusServerMetrics = p.(*grpc_prometheus.ServerMetrics)
streamOpts = append(streamOpts, prometheusServerMetrics.StreamServerInterceptor())
unaryOpts = append(unaryOpts, prometheusServerMetrics.UnaryServerInterceptor())
}
if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-otel"); err == nil {
serverOpts = append(serverOpts, grpc.StatsHandler(p.(stats.Handler)))
}
serverOpts = append(serverOpts, grpc.ChainStreamInterceptor(streamOpts...))
serverOpts = append(serverOpts, grpc.ChainUnaryInterceptor(unaryOpts...))
if c.MaxRecvMsgSize > 0 {
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(c.MaxRecvMsgSize))
}
if c.MaxSendMsgSize > 0 {
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(c.MaxSendMsgSize))
}
// grpcService allows GRPC services to be registered with the underlying server
type grpcService interface {
Register(*grpc.Server) error
}
s := grpc.NewServer(serverOpts...)
ps, err := ic.GetByType(plugins.GRPCPlugin) // ensure grpc plugin is initialized
if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) {
return nil, err
}
for _, p := range ps {
if gs, ok := p.(grpcService); ok {
if err := gs.Register(s); err != nil {
return nil, fmt.Errorf("failed to register grpc service: %w", err)
}
}
}
if prometheusServerMetrics != nil {
prometheusServerMetrics.InitializeMetrics(s)
}
return grpcServer{
Server: s,
config: *c,
}, nil
},
})
registry.Register(&plugin.Registration{
Type: plugins.ServerPlugin,
ID: "grpc-tcp",
Requires: []plugin.Type{
plugins.GRPCPlugin,
plugins.MetricsPlugin,
},
Config: &tcpConfig{
MaxRecvMsgSize: defaults.DefaultMaxRecvMsgSize,
MaxSendMsgSize: defaults.DefaultMaxSendMsgSize,
},
InitFn: func(ic *plugin.InitContext) (any, error) {
c := ic.Config.(*tcpConfig)
if c.Address == "" {
return nil, plugin.ErrSkipPlugin
}
var (
streamOpts = []grpc.StreamServerInterceptor{streamNamespaceInterceptor}
unaryOpts = []grpc.UnaryServerInterceptor{unaryNamespaceInterceptor}
serverOpts = []grpc.ServerOption{}
prometheusServerMetrics *grpc_prometheus.ServerMetrics // This should be grpc handler
)
if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-prometheus"); err == nil {
prometheusServerMetrics = p.(*grpc_prometheus.ServerMetrics)
streamOpts = append(streamOpts, prometheusServerMetrics.StreamServerInterceptor())
unaryOpts = append(unaryOpts, prometheusServerMetrics.UnaryServerInterceptor())
prometheusServerMetrics.InitializeMetrics(nil)
}
if p, err := ic.GetByID(plugins.MetricsPlugin, "grpc-otel"); err == nil {
serverOpts = append(serverOpts, grpc.StatsHandler(p.(stats.Handler)))
}
serverOpts = append(serverOpts, grpc.ChainStreamInterceptor(streamOpts...))
serverOpts = append(serverOpts, grpc.ChainUnaryInterceptor(unaryOpts...))
if c.MaxRecvMsgSize > 0 {
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(c.MaxRecvMsgSize))
}
if c.MaxSendMsgSize > 0 {
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(c.MaxSendMsgSize))
}
if c.TLSCert != "" {
log.G(ic.Context).Info("setting up tls on tcp GRPC services...")
tlsCert, err := tls.LoadX509KeyPair(c.TLSCert, c.TLSKey)
if err != nil {
return nil, err
}
tlsConfig := &tls.Config{Certificates: []tls.Certificate{tlsCert}}
if c.TLSCA != "" {
caCertPool := x509.NewCertPool()
caCert, err := os.ReadFile(c.TLSCA)
if err != nil {
return nil, fmt.Errorf("failed to load CA file: %w", err)
}
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.ClientCAs = caCertPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
}
serverOpts = append(serverOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
} else if c.TLSCName != "" {
tlsConfig, CA, res, err :=
wintls.SetupTLSFromWindowsCertStore(ic.Context, c.TLSCName)
if err != nil {
return nil, fmt.Errorf("failed to setup TLS from Windows cert store: %w", err)
}
// Cache resource for cleanup (Windows only)
setTLSResource(res)
if CA != nil {
tlsConfig.ClientCAs = CA
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
}
serverOpts = append(serverOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
// tcpService allows GRPC services to be registered with the underlying tcp server
type tcpService interface {
RegisterTCP(*grpc.Server) error
}
s := grpc.NewServer(serverOpts...)
ps, err := ic.GetByType(plugins.GRPCPlugin) // ensure grpc plugin is initialized
if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) {
return nil, err
}
var hasService bool
for _, p := range ps {
if gs, ok := p.(tcpService); ok {
if err := gs.RegisterTCP(s); err != nil {
return nil, fmt.Errorf("failed to register grpc service: %w", err)
}
hasService = true
}
}
if !hasService {
return nil, fmt.Errorf("no tcp grpc services configured: %w", plugin.ErrSkipPlugin)
}
if prometheusServerMetrics != nil {
prometheusServerMetrics.InitializeMetrics(s)
}
return tcpServer{
Server: s,
config: *c,
}, nil
},
})
}
type grpcServer struct {
*grpc.Server
config config
}
func (s grpcServer) Start(ctx context.Context) error {
log.G(ctx).WithField("address", s.config.Address).WithField("uid", s.config.UID).WithField("gid", s.config.GID).Info("starting GRPC server")
l, err := sys.GetLocalListener(s.config.Address, s.config.UID, s.config.GID)
if err != nil {
return fmt.Errorf("failed to get listener for main endpoint: %w", err)
}
internal.Serve(ctx, l, s.Serve)
return nil
}
func (s grpcServer) Close() error {
s.Stop()
return nil
}
type tcpServer struct {
*grpc.Server
config tcpConfig
}
func (s tcpServer) Start(ctx context.Context) error {
l, err := net.Listen("tcp", s.config.Address)
if err != nil {
return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err)
}
internal.Serve(ctx, l, s.Serve)
return nil
}
func (s tcpServer) Close() error {
s.Stop()
// Clean up TLS resources (Windows only)
cleanupTLSResources()
return nil
}

View File

@@ -0,0 +1,28 @@
//go:build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grpc
import (
"github.com/containerd/containerd/v2/internal/wintls"
)
// TLS resource helpers are no-ops on other unsupported platforms.
func setTLSResource(r wintls.CertResource) {}
func cleanupTLSResources() {}

View File

@@ -14,16 +14,11 @@
limitations under the License.
*/
package server
package grpc
import (
"context"
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
"github.com/containerd/containerd/v2/internal/wintls"
"github.com/containerd/log"
"github.com/containerd/otelttrpc"
"github.com/containerd/ttrpc"
)
// tlsResource holds Windows-specific TLS resources for cleanup on Stop.
@@ -41,14 +36,3 @@ func cleanupTLSResources() {
tlsResource = nil
}
}
// Windows-specific apply and TTRPC server constructor
func apply(_ context.Context, _ *srvconfig.Config) error {
return nil
}
func newTTRPCServer() (*ttrpc.Server, error) {
return ttrpc.NewServer(
ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()),
)
}

View File

@@ -20,8 +20,10 @@ import (
"context"
"errors"
"net"
"net/http"
"github.com/containerd/log"
"github.com/containerd/ttrpc"
)
func Serve(ctx context.Context, l net.Listener, serveFunc func(net.Listener) error) {
@@ -30,7 +32,7 @@ func Serve(ctx context.Context, l net.Listener, serveFunc func(net.Listener) err
go func() {
defer l.Close()
if err := serveFunc(l); err != nil && !errors.Is(err, net.ErrClosed) {
if err := serveFunc(l); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, ttrpc.ErrServerClosed) {
log.G(ctx).WithError(err).WithField("address", path).Fatal("serve failure")
}
}()

View File

@@ -0,0 +1,128 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttrpc
import (
"context"
"errors"
"fmt"
"net"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/pkg/sys"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/plugins/server/internal"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
"github.com/containerd/ttrpc"
)
type config struct {
Address string `toml:"address"`
UID int `toml:"uid"`
GID int `toml:"gid"`
}
func (c *config) GetAddress() string {
return c.Address
}
func (c *config) SetAddress(s string) {
c.Address = s
}
func init() {
// Keep the default the same for compatibility
defaultAddress := defaults.DefaultAddress + ".ttrpc"
registry.Register(&plugin.Registration{
Type: plugins.ServerPlugin,
ID: "ttrpc",
Requires: []plugin.Type{
plugins.TTRPCPlugin,
plugins.GRPCPlugin,
},
Config: &config{
Address: defaultAddress,
},
InitFn: func(ic *plugin.InitContext) (any, error) {
c := ic.Config.(*config)
if c.Address == "" {
return nil, plugin.ErrSkipPlugin
}
s, err := newTTRPCServer()
if err != nil {
return nil, err
}
// ttrpcService allows TTRPC services to be registered with the underlying server
type ttrpcService interface {
RegisterTTRPC(*ttrpc.Server) error
}
var hasService bool
ps, err := ic.GetByType(plugins.TTRPCPlugin) // ensure grpc plugin is initialized
if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) {
return nil, err
}
for _, p := range ps {
if gs, ok := p.(ttrpcService); ok {
if err := gs.RegisterTTRPC(s); err != nil {
return nil, fmt.Errorf("failed to register ttrpc service: %w", err)
}
hasService = true
}
}
ps, err = ic.GetByType(plugins.GRPCPlugin) // ensure grpc plugin is initialized
if err != nil && !errors.Is(err, plugin.ErrPluginNotFound) {
return nil, err
}
for _, p := range ps {
if gs, ok := p.(ttrpcService); ok {
if err := gs.RegisterTTRPC(s); err != nil {
return nil, fmt.Errorf("failed to register ttrpc service: %w", err)
}
hasService = true
}
}
if !hasService {
return nil, fmt.Errorf("no ttrpc services configured: %w", plugin.ErrSkipPlugin)
}
return server{
Server: s,
config: *c,
}, nil
},
})
}
type server struct {
*ttrpc.Server
config config
}
func (s server) Start(ctx context.Context) error {
// setup the ttrpc endpoint
tl, err := sys.GetLocalListener(s.config.Address, s.config.UID, s.config.GID)
if err != nil {
return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err)
}
internal.Serve(ctx, tl, func(l net.Listener) error {
return s.Serve(context.WithoutCancel(ctx), l)
})
return nil
}

View File

@@ -14,28 +14,16 @@
limitations under the License.
*/
package server
package ttrpc
import (
"context"
srvconfig "github.com/containerd/containerd/v2/cmd/containerd/server/config"
"github.com/containerd/containerd/v2/internal/wintls"
"github.com/containerd/otelttrpc"
"github.com/containerd/ttrpc"
)
func apply(_ context.Context, _ *srvconfig.Config) error {
return nil
}
// TLS resource helpers are no-ops on Solaris.
func setTLSResource(r wintls.CertResource) {}
func cleanupTLSResources() {}
// newTTRPCServer provides the ttrpc server for Solaris builds.
func newTTRPCServer() (*ttrpc.Server, error) {
return ttrpc.NewServer(
ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()),
ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()),
)
}

View File

@@ -0,0 +1,31 @@
//go:build windows || solaris
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttrpc
import (
"github.com/containerd/otelttrpc"
"github.com/containerd/ttrpc"
)
// ttrpc server with otel interceptor
func newTTRPCServer() (*ttrpc.Server, error) {
return ttrpc.NewServer(
ttrpc.WithUnaryServerInterceptor(otelttrpc.UnaryServerInterceptor()),
)
}

View File

@@ -0,0 +1,27 @@
//go:build !linux && !windows && !solaris
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttrpc
import (
"github.com/containerd/ttrpc"
)
func newTTRPCServer() (*ttrpc.Server, error) {
return ttrpc.NewServer()
}

View File

@@ -57,6 +57,8 @@ const (
StreamingPlugin plugin.Type = "io.containerd.streaming.v1"
// TracingProcessorPlugin implements an open telemetry span processor
TracingProcessorPlugin plugin.Type = "io.containerd.tracing.processor.v1"
// MetricsPlugin implements a metrics handler
MetricsPlugin plugin.Type = "io.containerd.metrics.v1"
// NRIApiPlugin implements the NRI adaptation interface for containerd.
NRIApiPlugin plugin.Type = "io.containerd.nri.v1"
// TransferPlugin implements a transfer service