From 80ec03fe7ebbf100cdf979cbb87037626a010dbd Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Wed, 25 Mar 2026 16:37:14 -0700 Subject: [PATCH] core/mount: Fix mount manager activation error when already exists Correctly handle cases where the mount activation still exists: - If activation is fully activate, then just return already exists and allow the caller to return error or call Info to continue. - If activation is stale or incomplete due to crash during activation, overwrite the identifier and cleanup the incomplete activation during activate. Signed-off-by: Derek McGowan --- core/mount/manager/buckets.go | 4 + core/mount/manager/manager.go | 118 ++++++++++++- core/mount/manager/manager_test.go | 268 ++++++++++++++++++++++++++++- core/runtime/v2/task_manager.go | 12 +- 4 files changed, 395 insertions(+), 7 deletions(-) diff --git a/core/mount/manager/buckets.go b/core/mount/manager/buckets.go index 5b9a78938..e1d8011a0 100644 --- a/core/mount/manager/buckets.go +++ b/core/mount/manager/buckets.go @@ -69,7 +69,11 @@ var ( bucketKeyLeases = []byte("leases") bucketKeyLease = []byte("lease") bucketKeyActive = []byte("active") + bucketKeySystem = []byte("system") bucketKeyType = []byte("type") + bucketKeySource = []byte("source") + bucketKeyTarget = []byte("target") + bucketKeyOptions = []byte("options") bucketKeyMountedAt = []byte("mat") bucketKeyMountPoint = []byte("mp") bucketKeyLabels = []byte("labels") diff --git a/core/mount/manager/manager.go b/core/mount/manager/manager.go index 5163c17db..67a0c3191 100644 --- a/core/mount/manager/manager.go +++ b/core/mount/manager/manager.go @@ -39,6 +39,7 @@ import ( "github.com/containerd/containerd/v2/core/metadata" "github.com/containerd/containerd/v2/core/metadata/boltutil" "github.com/containerd/containerd/v2/core/mount" + "github.com/containerd/containerd/v2/internal/kmutex" "github.com/containerd/containerd/v2/pkg/gc" "github.com/containerd/containerd/v2/pkg/namespaces" ) @@ -103,6 +104,7 @@ func NewManager(db *bolt.DB, targetDir string, opts ...Opt) (mount.Manager, erro targets: tr, handlers: options.handlers, rootMap: rootMap, + activate: kmutex.New(), }, nil } @@ -112,7 +114,8 @@ type mountManager struct { handlers map[string]mount.Handler rootMap map[string]*os.Root - rwlock sync.RWMutex + rwlock sync.RWMutex + activate kmutex.KeyedLocker } func (mm *mountManager) Close() error { @@ -131,6 +134,14 @@ func (mm *mountManager) Activate(ctx context.Context, name string, mounts []moun return mount.ActivationInfo{}, err } + // Serialize concurrent activations of the same name to prevent a + // racing Activate from misidentifying an in-progress activation as + // a stale record and destroying it. + if err := mm.activate.Lock(ctx, name); err != nil { + return mount.ActivationInfo{}, err + } + defer mm.activate.Unlock(name) + log.G(ctx).WithField("name", name).WithField("mounts", mounts).Debugf("activating mount") lid, leased := leases.FromContext(ctx) @@ -229,6 +240,7 @@ func (mm *mountManager) Activate(ctx context.Context, name string, mounts []moun defer mm.rwlock.RUnlock() var mid uint64 + var staleMID uint64 if err := mm.db.Update(func(tx *bolt.Tx) error { v1bkt, err := tx.CreateBucketIfNotExists([]byte("v1")) @@ -246,8 +258,38 @@ func (mm *mountManager) Activate(ctx context.Context, name string, mounts []moun } bkt, err := mbkt.CreateBucket([]byte(name)) if err != nil { - // If already exists, return already exists - return err + existing := mbkt.Bucket([]byte(name)) + if existing == nil { + return err + } + // If the mount is fully activated, return already exists + // so the caller can reuse the existing mount. + if existing.Bucket(bucketKeyActive) != nil { + return fmt.Errorf("mount %q: %w", name, errdefs.ErrAlreadyExists) + } + // The mount bucket exists but was never fully activated + // (e.g., process crashed between creating the bucket and + // completing activation). Clean up the stale entry and + // proceed with a fresh activation. Save the old mount ID + // so the target directory can be cleaned up after the + // transaction commits. + staleMID = readID(existing) + if lid := existing.Get(bucketKeyLease); len(lid) > 0 { + if lsbkt := nsbkt.Bucket(bucketKeyLeases); lsbkt != nil { + if lbkt := lsbkt.Bucket(lid); lbkt != nil { + if err := lbkt.Delete([]byte(name)); err != nil { + return err + } + } + } + } + if err := mbkt.DeleteBucket([]byte(name)); err != nil { + return err + } + bkt, err = mbkt.CreateBucket([]byte(name)) + if err != nil { + return err + } } mid, err = v1bkt.NextSequence() @@ -297,6 +339,19 @@ func (mm *mountManager) Activate(ctx context.Context, name string, mounts []moun return mount.ActivationInfo{}, err } + // If a stale incomplete activation was found, clean up its target + // directory which may contain leftover mounts from before a crash. + if staleMID != 0 { + staleTarget := filepath.Join(mm.targets.Name(), strconv.FormatUint(staleMID, 10)) + if err := unmountAll(ctx, staleTarget, mm.handlers); err != nil { + if os.IsNotExist(err) { + log.G(ctx).WithError(err).WithField("mountid", staleMID).Debug("stale activation target does not exist, skipping cleanup") + } else { + log.G(ctx).WithError(err).WithField("mountid", staleMID).Warn("failed to unmount stale activation target") + } + } + } + defer func() { // If error, rollback and remove by name if retErr != nil { @@ -483,7 +538,24 @@ func (mm *mountManager) Activate(ctx context.Context, name string, mounts []moun return err } - // TODO: Save all system mounts + if len(info.System) > 0 { + if len(info.System) > 255 { + return fmt.Errorf("too many system mounts (%d): maximum 255", len(info.System)) + } + sbkt, err := bkt.CreateBucket(bucketKeySystem) + if err != nil { + return err + } + for i, sm := range info.System { + cur, err := sbkt.CreateBucket([]byte{byte(i)}) + if err != nil { + return err + } + if err = putSystemMount(cur, sm); err != nil { + return err + } + } + } return nil }); err != nil { @@ -552,6 +624,36 @@ func readActiveMount(bkt *bolt.Bucket) (mount.ActiveMount, error) { return active, nil } +func putSystemMount(bkt *bolt.Bucket, m mount.Mount) error { + if err := bkt.Put(bucketKeyType, []byte(m.Type)); err != nil { + return err + } + if err := bkt.Put(bucketKeySource, []byte(m.Source)); err != nil { + return err + } + if err := bkt.Put(bucketKeyTarget, []byte(m.Target)); err != nil { + return err + } + if len(m.Options) > 0 { + if err := bkt.Put(bucketKeyOptions, []byte(strings.Join(m.Options, "\x00"))); err != nil { + return err + } + } + return nil +} + +func readSystemMount(bkt *bolt.Bucket) mount.Mount { + m := mount.Mount{ + Type: string(bkt.Get(bucketKeyType)), + Source: string(bkt.Get(bucketKeySource)), + Target: string(bkt.Get(bucketKeyTarget)), + } + if v := bkt.Get(bucketKeyOptions); len(v) > 0 { + m.Options = strings.Split(string(v), "\x00") + } + return m +} + func readActivationInfo(name string, bkt *bolt.Bucket) (mount.ActivationInfo, error) { info := mount.ActivationInfo{ Name: name, @@ -568,6 +670,14 @@ func readActivationInfo(name string, bkt *bolt.Bucket) (mount.ActivationInfo, er return mount.ActivationInfo{}, err } } + if sbkt := bkt.Bucket(bucketKeySystem); sbkt != nil { + if err := sbkt.ForEachBucket(func(k []byte) error { + info.System = append(info.System, readSystemMount(sbkt.Bucket(k))) + return nil + }); err != nil { + return mount.ActivationInfo{}, err + } + } lbls, err := boltutil.ReadLabels(bkt) if err != nil { return mount.ActivationInfo{}, err diff --git a/core/mount/manager/manager_test.go b/core/mount/manager/manager_test.go index 2eb7b8181..0a85f1c37 100644 --- a/core/mount/manager/manager_test.go +++ b/core/mount/manager/manager_test.go @@ -334,6 +334,272 @@ func checkGCActive(t *testing.T, i int, cc metadata.CollectionContext, active [] } } -// TODO: Test Info +func TestActivateAlreadyExists(t *testing.T) { + td := t.TempDir() + metadb := filepath.Join(td, "mounts.db") + targetdir := filepath.Join(td, "m") + db, err := bolt.Open(metadb, 0600, nil) + require.NoError(t, err) + t.Cleanup(func() { db.Close() }) + ctx := namespaces.WithNamespace(context.Background(), "test") + + mountC := new(atomic.Int32) + m, err := NewManager(db, targetdir, WithMountHandler("noop", &noopHandler{mounts: mountC})) + require.NoError(t, err) + t.Cleanup(func() { m.(io.Closer).Close() }) + + mounts := []mount.Mount{{Type: "noop"}} + + // First activation should succeed + _, err = m.Activate(ctx, "task1", mounts) + require.NoError(t, err) + + // Second activation with same name should return ErrAlreadyExists + _, err = m.Activate(ctx, "task1", mounts) + assert.True(t, errdefs.IsAlreadyExists(err), "expected ErrAlreadyExists, got: %v", err) + + // Info should return valid info for the existing mount + info, err := m.Info(ctx, "task1") + require.NoError(t, err) + assert.Equal(t, "task1", info.Name) + assert.Equal(t, 1, len(info.Active)) + + // Cleanup + assert.NoError(t, m.Deactivate(ctx, "task1")) +} + +func TestActivateStaleIncomplete(t *testing.T) { + td := t.TempDir() + metadb := filepath.Join(td, "mounts.db") + targetdir := filepath.Join(td, "m") + db, err := bolt.Open(metadb, 0600, nil) + require.NoError(t, err) + t.Cleanup(func() { db.Close() }) + ctx := namespaces.WithNamespace(context.Background(), "test") + + // Simulate a stale incomplete activation by directly writing a bucket + // without the "active" sub-bucket (as if the process crashed mid-activation). + // Use mount ID 42 so we can verify the stale target directory gets cleaned up. + var staleMID uint64 = 42 + err = db.Update(func(tx *bolt.Tx) error { + v1bkt, err := tx.CreateBucketIfNotExists([]byte("v1")) + if err != nil { + return err + } + nsbkt, err := v1bkt.CreateBucketIfNotExists([]byte("test")) + if err != nil { + return err + } + mbkt, err := nsbkt.CreateBucketIfNotExists(bucketKeyMounts) + if err != nil { + return err + } + // Create the mount bucket but don't add the "active" sub-bucket + bkt, err := mbkt.CreateBucket([]byte("task1")) + if err != nil { + return err + } + idb, err := encodeID(staleMID) + if err != nil { + return err + } + return bkt.Put(bucketKeyID, idb) + }) + require.NoError(t, err) + + // Create the stale target directory as if the process crashed after + // mkdir but before the second transaction. + staleTarget := filepath.Join(targetdir, fmt.Sprintf("%d", staleMID)) + require.NoError(t, os.MkdirAll(staleTarget, 0700)) + + mountC := new(atomic.Int32) + m, err := NewManager(db, targetdir, WithMountHandler("noop", &noopHandler{mounts: mountC})) + require.NoError(t, err) + t.Cleanup(func() { m.(io.Closer).Close() }) + + mounts := []mount.Mount{{Type: "noop"}} + + // Activation should succeed by cleaning up the stale entry + ainfo, err := m.Activate(ctx, "task1", mounts) + require.NoError(t, err) + assert.Equal(t, "task1", ainfo.Name) + assert.Equal(t, 1, len(ainfo.Active)) + + // The stale target directory should have been cleaned up + _, err = os.Stat(staleTarget) + assert.True(t, os.IsNotExist(err), "stale target directory should be removed, but still exists") + + // Cleanup + assert.NoError(t, m.Deactivate(ctx, "task1")) +} + +func TestInfo(t *testing.T) { + td := t.TempDir() + metadb := filepath.Join(td, "mounts.db") + targetdir := filepath.Join(td, "m") + db, err := bolt.Open(metadb, 0600, nil) + require.NoError(t, err) + t.Cleanup(func() { db.Close() }) + ctx := namespaces.WithNamespace(context.Background(), "test") + + mountC := new(atomic.Int32) + m, err := NewManager(db, targetdir, WithMountHandler("noop", &noopHandler{mounts: mountC})) + require.NoError(t, err) + t.Cleanup(func() { m.(io.Closer).Close() }) + + // Info on non-existent mount should return ErrNotFound + _, err = m.Info(ctx, "nonexistent") + assert.True(t, errdefs.IsNotFound(err), "expected ErrNotFound, got: %v", err) + + // Activate a mount with labels + labels := map[string]string{ + "containerd.io/gc.bref.container": "ctr1", + "custom-label": "value1", + } + mounts := []mount.Mount{{Type: "noop"}} + ainfo, err := m.Activate(ctx, "task1", mounts, mount.WithLabels(labels)) + require.NoError(t, err) + defer m.Deactivate(ctx, "task1") + + // Info should return the correct activation info + info, err := m.Info(ctx, "task1") + require.NoError(t, err) + assert.Equal(t, "task1", info.Name) + assert.Equal(t, 1, len(info.Active)) + assert.Equal(t, "noop", info.Active[0].Type) + assert.NotNil(t, info.Active[0].MountedAt) + assert.Equal(t, labels, info.Labels) + + // Info active and system mounts should match those returned by Activate + require.Equal(t, len(ainfo.Active), len(info.Active)) + for i := range ainfo.Active { + assert.Equal(t, ainfo.Active[i].Type, info.Active[i].Type) + assert.Equal(t, ainfo.Active[i].MountPoint, info.Active[i].MountPoint) + assert.Equal(t, ainfo.Active[i].MountedAt.Unix(), info.Active[i].MountedAt.Unix()) + } + // No system mounts when all mounts are handled + assert.Empty(t, ainfo.System) + assert.Empty(t, info.System) + + // Activate a second mount and verify Info returns correct data for each + _, err = m.Activate(ctx, "task2", mounts) + require.NoError(t, err) + defer m.Deactivate(ctx, "task2") + + info2, err := m.Info(ctx, "task2") + require.NoError(t, err) + assert.Equal(t, "task2", info2.Name) + assert.Equal(t, 1, len(info2.Active)) + // task2 has no custom labels + assert.Empty(t, info2.Labels) + + // Original task1 info should be unchanged + info1, err := m.Info(ctx, "task1") + require.NoError(t, err) + assert.Equal(t, "task1", info1.Name) + assert.Equal(t, labels, info1.Labels) +} + +func TestInfoSystemMounts(t *testing.T) { + td := t.TempDir() + metadb := filepath.Join(td, "mounts.db") + targetdir := filepath.Join(td, "m") + db, err := bolt.Open(metadb, 0600, nil) + require.NoError(t, err) + t.Cleanup(func() { db.Close() }) + ctx := namespaces.WithNamespace(context.Background(), "test") + + mountC := new(atomic.Int32) + // Only register a handler for "noop"; "bind" will pass through as a system mount + m, err := NewManager(db, targetdir, WithMountHandler("noop", &noopHandler{mounts: mountC})) + require.NoError(t, err) + t.Cleanup(func() { m.(io.Closer).Close() }) + + sourcedir := filepath.Join(td, "source") + require.NoError(t, os.Mkdir(sourcedir, 0700)) + + mounts := []mount.Mount{ + {Type: "noop"}, + { + Type: "bind", + Source: sourcedir, + Options: []string{"rbind", "ro"}, + }, + } + + ainfo, err := m.Activate(ctx, "task1", mounts) + require.NoError(t, err) + defer m.Deactivate(ctx, "task1") + + // Activate should return one active mount and one system mount + require.Equal(t, 1, len(ainfo.Active)) + require.Equal(t, 1, len(ainfo.System)) + assert.Equal(t, "bind", ainfo.System[0].Type) + assert.Equal(t, sourcedir, ainfo.System[0].Source) + assert.Equal(t, []string{"rbind", "ro"}, ainfo.System[0].Options) + + // Info should return the same active and system mounts + info, err := m.Info(ctx, "task1") + require.NoError(t, err) + assert.Equal(t, "task1", info.Name) + + require.Equal(t, len(ainfo.Active), len(info.Active)) + for i := range ainfo.Active { + assert.Equal(t, ainfo.Active[i].Type, info.Active[i].Type) + assert.Equal(t, ainfo.Active[i].MountPoint, info.Active[i].MountPoint) + assert.Equal(t, ainfo.Active[i].MountedAt.Unix(), info.Active[i].MountedAt.Unix()) + } + + require.Equal(t, len(ainfo.System), len(info.System)) + for i := range ainfo.System { + assert.Equal(t, ainfo.System[i].Type, info.System[i].Type) + assert.Equal(t, ainfo.System[i].Source, info.System[i].Source) + assert.Equal(t, ainfo.System[i].Target, info.System[i].Target) + assert.Equal(t, ainfo.System[i].Options, info.System[i].Options) + } +} + +func TestActivateConcurrentSameName(t *testing.T) { + td := t.TempDir() + metadb := filepath.Join(td, "mounts.db") + targetdir := filepath.Join(td, "m") + db, err := bolt.Open(metadb, 0600, nil) + require.NoError(t, err) + t.Cleanup(func() { db.Close() }) + ctx := namespaces.WithNamespace(context.Background(), "test") + + mountC := new(atomic.Int32) + m, err := NewManager(db, targetdir, WithMountHandler("noop", &noopHandler{mounts: mountC})) + require.NoError(t, err) + t.Cleanup(func() { m.(io.Closer).Close() }) + + mounts := []mount.Mount{{Type: "noop"}} + + // Launch two concurrent activations with the same name. + // The per-name lock should serialize them: first succeeds, + // second gets ErrAlreadyExists (not a stale-recovery race). + errs := make(chan error, 2) + for i := 0; i < 2; i++ { + go func() { + _, err := m.Activate(ctx, "task1", mounts) + errs <- err + }() + } + + err1 := <-errs + err2 := <-errs + + // Exactly one should succeed and one should get ErrAlreadyExists + if err1 == nil && errdefs.IsAlreadyExists(err2) { + // ok + } else if err2 == nil && errdefs.IsAlreadyExists(err1) { + // ok + } else { + t.Fatalf("expected one nil and one ErrAlreadyExists, got: %v, %v", err1, err2) + } + + assert.NoError(t, m.Deactivate(ctx, "task1")) +} + // TODO: Test deactivate // TODO: Test Sync diff --git a/core/runtime/v2/task_manager.go b/core/runtime/v2/task_manager.go index 04765e1e3..994110455 100644 --- a/core/runtime/v2/task_manager.go +++ b/core/runtime/v2/task_manager.go @@ -180,8 +180,7 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr } // Add options based on runtime - ai, err := m.mounts.Activate(ctx, taskID, opts.Rootfs, activateOpts...) - if err == nil { + if ai, err := m.mounts.Activate(ctx, taskID, opts.Rootfs, activateOpts...); err == nil { opts.Rootfs = ai.System defer func() { if retErr != nil { @@ -192,6 +191,15 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr } } }() + } else if errdefs.IsAlreadyExists(err) { + // If creation of task with same identifier, use existing mount rather than forcing + // deactivation of the old one. The back reference will prevent racing between + // deactivation and re-use, as the container with the same ID would still exist. + ai, err = m.mounts.Info(ctx, taskID) + if err != nil { + return nil, fmt.Errorf("failed to get info on already active mount: %w", err) + } + opts.Rootfs = ai.System } else if !errdefs.IsNotImplemented(err) { return nil, err }