From 6a01ad3e16c57c631febb92090bbca5c331e2f7d Mon Sep 17 00:00:00 2001 From: Krisztian Litkey Date: Fri, 31 Jan 2025 20:15:12 +0200 Subject: [PATCH] cri,nri: block NRI plugin sync. during event processing. Block the synchronization of registering NRI plugins during CRI events to avoid the plugin ending up in an inconsistent starting state after initial sync (missing pods, containers or missed events for some pods or containers). Signed-off-by: Krisztian Litkey --- internal/cri/nri/nri_api_linux.go | 9 +++++++++ internal/cri/nri/nri_api_other.go | 8 ++++++++ internal/cri/server/container_create.go | 2 ++ internal/cri/server/container_remove.go | 3 +++ internal/cri/server/container_start.go | 2 ++ internal/cri/server/container_stop.go | 3 +++ internal/cri/server/container_update_resources.go | 2 ++ internal/cri/server/sandbox_remove.go | 3 +++ internal/cri/server/sandbox_run.go | 2 ++ internal/cri/server/sandbox_stop.go | 3 +++ internal/nri/nri.go | 12 ++++++++++++ 11 files changed, 49 insertions(+) diff --git a/internal/cri/nri/nri_api_linux.go b/internal/cri/nri/nri_api_linux.go index a3eecbce5f..f23184300d 100644 --- a/internal/cri/nri/nri_api_linux.go +++ b/internal/cri/nri/nri_api_linux.go @@ -358,6 +358,15 @@ func (a *API) WithContainerExit(criCtr *cstore.Container) containerd.ProcessDele } } +type PluginSyncBlock = nri.PluginSyncBlock + +func (a *API) BlockPluginSync() *PluginSyncBlock { + if a.IsDisabled() { + return nil + } + return a.nri.BlockPluginSync() +} + // // NRI-CRI 'domain' interface // diff --git a/internal/cri/nri/nri_api_other.go b/internal/cri/nri/nri_api_other.go index 14bd7ddd5d..808a36048d 100644 --- a/internal/cri/nri/nri_api_other.go +++ b/internal/cri/nri/nri_api_other.go @@ -108,6 +108,14 @@ func (*API) WithContainerExit(*cstore.Container) containerd.ProcessDeleteOpts { } } +type PluginSyncBlock struct{} + +func (*API) BlockPluginSync() *PluginSyncBlock { + return nil +} + +func (*PluginSyncBlock) Unblock() {} + // // NRI-CRI no-op 'domain' interface // diff --git a/internal/cri/server/container_create.go b/internal/cri/server/container_create.go index 97c1c08716..2c884dfae1 100644 --- a/internal/cri/server/container_create.go +++ b/internal/cri/server/container_create.go @@ -312,6 +312,8 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta } }() + defer c.nri.BlockPluginSync().Unblock() + var cntr containerd.Container if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil { return nil, fmt.Errorf("failed to create containerd container: %w", err) diff --git a/internal/cri/server/container_remove.go b/internal/cri/server/container_remove.go index a666456e38..76cfaf6ea5 100644 --- a/internal/cri/server/container_remove.go +++ b/internal/cri/server/container_remove.go @@ -44,6 +44,9 @@ func (c *criService) RemoveContainer(ctx context.Context, r *runtime.RemoveConta log.G(ctx).Tracef("RemoveContainer called for container %q that does not exist", ctrID) return &runtime.RemoveContainerResponse{}, nil } + + defer c.nri.BlockPluginSync().Unblock() + id := container.ID span.SetAttributes(tracing.Attribute("container.id", id)) i, err := container.Container.Info(ctx) diff --git a/internal/cri/server/container_start.go b/internal/cri/server/container_start.go index 36e4d72469..b69d7cc7d7 100644 --- a/internal/cri/server/container_start.go +++ b/internal/cri/server/container_start.go @@ -156,6 +156,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain return nil, fmt.Errorf("failed to wait for containerd task: %w", err) } + defer c.nri.BlockPluginSync().Unblock() + defer func() { if retErr != nil { deferCtx, deferCancel := ctrdutil.DeferContext() diff --git a/internal/cri/server/container_stop.go b/internal/cri/server/container_stop.go index 736911fbd7..408bf9f105 100644 --- a/internal/cri/server/container_stop.go +++ b/internal/cri/server/container_stop.go @@ -51,6 +51,9 @@ func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainer // https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L67-L68 return &runtime.StopContainerResponse{}, nil } + + defer c.nri.BlockPluginSync().Unblock() + span.SetAttributes(tracing.Attribute("container.id", container.ID)) if err := c.stopContainer(ctx, container, time.Duration(r.GetTimeout())*time.Second); err != nil { return nil, err diff --git a/internal/cri/server/container_update_resources.go b/internal/cri/server/container_update_resources.go index a68431e5c4..d2c5212509 100644 --- a/internal/cri/server/container_update_resources.go +++ b/internal/cri/server/container_update_resources.go @@ -47,6 +47,8 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up return nil, err } + defer c.nri.BlockPluginSync().Unblock() + resources := r.GetLinux() updated, err := c.nri.UpdateContainerResources(ctx, &sandbox, &container, resources) if err != nil { diff --git a/internal/cri/server/sandbox_remove.go b/internal/cri/server/sandbox_remove.go index f7e1ed8802..e7c0e0654a 100644 --- a/internal/cri/server/sandbox_remove.go +++ b/internal/cri/server/sandbox_remove.go @@ -44,6 +44,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS r.GetPodSandboxId()) return &runtime.RemovePodSandboxResponse{}, nil } + + defer c.nri.BlockPluginSync().Unblock() + // Use the full sandbox id. id := sandbox.ID span.SetAttributes(tracing.Attribute("sandbox.id", id)) diff --git a/internal/cri/server/sandbox_run.go b/internal/cri/server/sandbox_run.go index e895d66b37..8ed15b2c93 100644 --- a/internal/cri/server/sandbox_run.go +++ b/internal/cri/server/sandbox_run.go @@ -300,6 +300,8 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox sandbox.ProcessLabel = labels["selinux_label"] + defer c.nri.BlockPluginSync().Unblock() + err = c.nri.RunPodSandbox(ctx, &sandbox) if err != nil { return nil, fmt.Errorf("NRI RunPodSandbox failed: %w", err) diff --git a/internal/cri/server/sandbox_stop.go b/internal/cri/server/sandbox_stop.go index 95f5780a05..a27beec93b 100644 --- a/internal/cri/server/sandbox_stop.go +++ b/internal/cri/server/sandbox_stop.go @@ -46,6 +46,9 @@ func (c *criService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandb // https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L45-L46 return &runtime.StopPodSandboxResponse{}, nil } + + defer c.nri.BlockPluginSync().Unblock() + span.SetAttributes(tracing.Attribute("sandbox.id", sandbox.ID)) if err := c.stopPodSandbox(ctx, sandbox); err != nil { return nil, err diff --git a/internal/nri/nri.go b/internal/nri/nri.go index a3922bcf35..14fc107dca 100644 --- a/internal/nri/nri.go +++ b/internal/nri/nri.go @@ -81,6 +81,9 @@ type API interface { // RemoveContainer relays container removal events to NRI. RemoveContainer(context.Context, PodSandbox, Container) error + + // BlockPluginSync blocks plugin synchronization until it is Unblock()ed. + BlockPluginSync() *PluginSyncBlock } type State int @@ -435,6 +438,15 @@ func (l *local) RemoveContainer(ctx context.Context, pod PodSandbox, ctr Contain return err } +type PluginSyncBlock = nri.PluginSyncBlock + +func (l *local) BlockPluginSync() *PluginSyncBlock { + if !l.IsEnabled() { + return nil + } + return l.nri.BlockPluginSync() +} + func (l *local) syncPlugin(ctx context.Context, syncFn nri.SyncCB) error { l.Lock() defer l.Unlock()