mirror of
https://github.com/containerd/containerd.git
synced 2026-06-24 08:48:48 +00:00
Merge pull request #13126 from dmcgowan/handle-mount-already-exists
Fix mount manager activation error when already exists
This commit is contained in:
@@ -69,7 +69,11 @@ var (
|
||||
bucketKeyLeases = []byte("leases")
|
||||
bucketKeyLease = []byte("lease")
|
||||
bucketKeyActive = []byte("active")
|
||||
bucketKeySystem = []byte("system")
|
||||
bucketKeyType = []byte("type")
|
||||
bucketKeySource = []byte("source")
|
||||
bucketKeyTarget = []byte("target")
|
||||
bucketKeyOptions = []byte("options")
|
||||
bucketKeyMountedAt = []byte("mat")
|
||||
bucketKeyMountPoint = []byte("mp")
|
||||
bucketKeyLabels = []byte("labels")
|
||||
|
||||
@@ -39,6 +39,7 @@ import (
|
||||
"github.com/containerd/containerd/v2/core/metadata"
|
||||
"github.com/containerd/containerd/v2/core/metadata/boltutil"
|
||||
"github.com/containerd/containerd/v2/core/mount"
|
||||
"github.com/containerd/containerd/v2/internal/kmutex"
|
||||
"github.com/containerd/containerd/v2/pkg/gc"
|
||||
"github.com/containerd/containerd/v2/pkg/namespaces"
|
||||
)
|
||||
@@ -103,6 +104,7 @@ func NewManager(db *bolt.DB, targetDir string, opts ...Opt) (mount.Manager, erro
|
||||
targets: tr,
|
||||
handlers: options.handlers,
|
||||
rootMap: rootMap,
|
||||
activate: kmutex.New(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -112,7 +114,8 @@ type mountManager struct {
|
||||
handlers map[string]mount.Handler
|
||||
rootMap map[string]*os.Root
|
||||
|
||||
rwlock sync.RWMutex
|
||||
rwlock sync.RWMutex
|
||||
activate kmutex.KeyedLocker
|
||||
}
|
||||
|
||||
func (mm *mountManager) Close() error {
|
||||
@@ -131,6 +134,14 @@ func (mm *mountManager) Activate(ctx context.Context, name string, mounts []moun
|
||||
return mount.ActivationInfo{}, err
|
||||
}
|
||||
|
||||
// Serialize concurrent activations of the same name to prevent a
|
||||
// racing Activate from misidentifying an in-progress activation as
|
||||
// a stale record and destroying it.
|
||||
if err := mm.activate.Lock(ctx, name); err != nil {
|
||||
return mount.ActivationInfo{}, err
|
||||
}
|
||||
defer mm.activate.Unlock(name)
|
||||
|
||||
log.G(ctx).WithField("name", name).WithField("mounts", mounts).Debugf("activating mount")
|
||||
|
||||
lid, leased := leases.FromContext(ctx)
|
||||
@@ -229,6 +240,7 @@ func (mm *mountManager) Activate(ctx context.Context, name string, mounts []moun
|
||||
defer mm.rwlock.RUnlock()
|
||||
|
||||
var mid uint64
|
||||
var staleMID uint64
|
||||
|
||||
if err := mm.db.Update(func(tx *bolt.Tx) error {
|
||||
v1bkt, err := tx.CreateBucketIfNotExists([]byte("v1"))
|
||||
@@ -246,8 +258,38 @@ func (mm *mountManager) Activate(ctx context.Context, name string, mounts []moun
|
||||
}
|
||||
bkt, err := mbkt.CreateBucket([]byte(name))
|
||||
if err != nil {
|
||||
// If already exists, return already exists
|
||||
return err
|
||||
existing := mbkt.Bucket([]byte(name))
|
||||
if existing == nil {
|
||||
return err
|
||||
}
|
||||
// If the mount is fully activated, return already exists
|
||||
// so the caller can reuse the existing mount.
|
||||
if existing.Bucket(bucketKeyActive) != nil {
|
||||
return fmt.Errorf("mount %q: %w", name, errdefs.ErrAlreadyExists)
|
||||
}
|
||||
// The mount bucket exists but was never fully activated
|
||||
// (e.g., process crashed between creating the bucket and
|
||||
// completing activation). Clean up the stale entry and
|
||||
// proceed with a fresh activation. Save the old mount ID
|
||||
// so the target directory can be cleaned up after the
|
||||
// transaction commits.
|
||||
staleMID = readID(existing)
|
||||
if lid := existing.Get(bucketKeyLease); len(lid) > 0 {
|
||||
if lsbkt := nsbkt.Bucket(bucketKeyLeases); lsbkt != nil {
|
||||
if lbkt := lsbkt.Bucket(lid); lbkt != nil {
|
||||
if err := lbkt.Delete([]byte(name)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := mbkt.DeleteBucket([]byte(name)); err != nil {
|
||||
return err
|
||||
}
|
||||
bkt, err = mbkt.CreateBucket([]byte(name))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
mid, err = v1bkt.NextSequence()
|
||||
@@ -297,6 +339,19 @@ func (mm *mountManager) Activate(ctx context.Context, name string, mounts []moun
|
||||
return mount.ActivationInfo{}, err
|
||||
}
|
||||
|
||||
// If a stale incomplete activation was found, clean up its target
|
||||
// directory which may contain leftover mounts from before a crash.
|
||||
if staleMID != 0 {
|
||||
staleTarget := filepath.Join(mm.targets.Name(), strconv.FormatUint(staleMID, 10))
|
||||
if err := unmountAll(ctx, staleTarget, mm.handlers); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
log.G(ctx).WithError(err).WithField("mountid", staleMID).Debug("stale activation target does not exist, skipping cleanup")
|
||||
} else {
|
||||
log.G(ctx).WithError(err).WithField("mountid", staleMID).Warn("failed to unmount stale activation target")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// If error, rollback and remove by name
|
||||
if retErr != nil {
|
||||
@@ -483,7 +538,24 @@ func (mm *mountManager) Activate(ctx context.Context, name string, mounts []moun
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: Save all system mounts
|
||||
if len(info.System) > 0 {
|
||||
if len(info.System) > 255 {
|
||||
return fmt.Errorf("too many system mounts (%d): maximum 255", len(info.System))
|
||||
}
|
||||
sbkt, err := bkt.CreateBucket(bucketKeySystem)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, sm := range info.System {
|
||||
cur, err := sbkt.CreateBucket([]byte{byte(i)})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = putSystemMount(cur, sm); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
@@ -552,6 +624,36 @@ func readActiveMount(bkt *bolt.Bucket) (mount.ActiveMount, error) {
|
||||
return active, nil
|
||||
}
|
||||
|
||||
func putSystemMount(bkt *bolt.Bucket, m mount.Mount) error {
|
||||
if err := bkt.Put(bucketKeyType, []byte(m.Type)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := bkt.Put(bucketKeySource, []byte(m.Source)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := bkt.Put(bucketKeyTarget, []byte(m.Target)); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(m.Options) > 0 {
|
||||
if err := bkt.Put(bucketKeyOptions, []byte(strings.Join(m.Options, "\x00"))); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func readSystemMount(bkt *bolt.Bucket) mount.Mount {
|
||||
m := mount.Mount{
|
||||
Type: string(bkt.Get(bucketKeyType)),
|
||||
Source: string(bkt.Get(bucketKeySource)),
|
||||
Target: string(bkt.Get(bucketKeyTarget)),
|
||||
}
|
||||
if v := bkt.Get(bucketKeyOptions); len(v) > 0 {
|
||||
m.Options = strings.Split(string(v), "\x00")
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func readActivationInfo(name string, bkt *bolt.Bucket) (mount.ActivationInfo, error) {
|
||||
info := mount.ActivationInfo{
|
||||
Name: name,
|
||||
@@ -568,6 +670,14 @@ func readActivationInfo(name string, bkt *bolt.Bucket) (mount.ActivationInfo, er
|
||||
return mount.ActivationInfo{}, err
|
||||
}
|
||||
}
|
||||
if sbkt := bkt.Bucket(bucketKeySystem); sbkt != nil {
|
||||
if err := sbkt.ForEachBucket(func(k []byte) error {
|
||||
info.System = append(info.System, readSystemMount(sbkt.Bucket(k)))
|
||||
return nil
|
||||
}); err != nil {
|
||||
return mount.ActivationInfo{}, err
|
||||
}
|
||||
}
|
||||
lbls, err := boltutil.ReadLabels(bkt)
|
||||
if err != nil {
|
||||
return mount.ActivationInfo{}, err
|
||||
|
||||
@@ -334,6 +334,272 @@ func checkGCActive(t *testing.T, i int, cc metadata.CollectionContext, active []
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Test Info
|
||||
func TestActivateAlreadyExists(t *testing.T) {
|
||||
td := t.TempDir()
|
||||
metadb := filepath.Join(td, "mounts.db")
|
||||
targetdir := filepath.Join(td, "m")
|
||||
db, err := bolt.Open(metadb, 0600, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { db.Close() })
|
||||
ctx := namespaces.WithNamespace(context.Background(), "test")
|
||||
|
||||
mountC := new(atomic.Int32)
|
||||
m, err := NewManager(db, targetdir, WithMountHandler("noop", &noopHandler{mounts: mountC}))
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { m.(io.Closer).Close() })
|
||||
|
||||
mounts := []mount.Mount{{Type: "noop"}}
|
||||
|
||||
// First activation should succeed
|
||||
_, err = m.Activate(ctx, "task1", mounts)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Second activation with same name should return ErrAlreadyExists
|
||||
_, err = m.Activate(ctx, "task1", mounts)
|
||||
assert.True(t, errdefs.IsAlreadyExists(err), "expected ErrAlreadyExists, got: %v", err)
|
||||
|
||||
// Info should return valid info for the existing mount
|
||||
info, err := m.Info(ctx, "task1")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "task1", info.Name)
|
||||
assert.Equal(t, 1, len(info.Active))
|
||||
|
||||
// Cleanup
|
||||
assert.NoError(t, m.Deactivate(ctx, "task1"))
|
||||
}
|
||||
|
||||
func TestActivateStaleIncomplete(t *testing.T) {
|
||||
td := t.TempDir()
|
||||
metadb := filepath.Join(td, "mounts.db")
|
||||
targetdir := filepath.Join(td, "m")
|
||||
db, err := bolt.Open(metadb, 0600, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { db.Close() })
|
||||
ctx := namespaces.WithNamespace(context.Background(), "test")
|
||||
|
||||
// Simulate a stale incomplete activation by directly writing a bucket
|
||||
// without the "active" sub-bucket (as if the process crashed mid-activation).
|
||||
// Use mount ID 42 so we can verify the stale target directory gets cleaned up.
|
||||
var staleMID uint64 = 42
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
v1bkt, err := tx.CreateBucketIfNotExists([]byte("v1"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nsbkt, err := v1bkt.CreateBucketIfNotExists([]byte("test"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mbkt, err := nsbkt.CreateBucketIfNotExists(bucketKeyMounts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Create the mount bucket but don't add the "active" sub-bucket
|
||||
bkt, err := mbkt.CreateBucket([]byte("task1"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
idb, err := encodeID(staleMID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return bkt.Put(bucketKeyID, idb)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create the stale target directory as if the process crashed after
|
||||
// mkdir but before the second transaction.
|
||||
staleTarget := filepath.Join(targetdir, fmt.Sprintf("%d", staleMID))
|
||||
require.NoError(t, os.MkdirAll(staleTarget, 0700))
|
||||
|
||||
mountC := new(atomic.Int32)
|
||||
m, err := NewManager(db, targetdir, WithMountHandler("noop", &noopHandler{mounts: mountC}))
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { m.(io.Closer).Close() })
|
||||
|
||||
mounts := []mount.Mount{{Type: "noop"}}
|
||||
|
||||
// Activation should succeed by cleaning up the stale entry
|
||||
ainfo, err := m.Activate(ctx, "task1", mounts)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "task1", ainfo.Name)
|
||||
assert.Equal(t, 1, len(ainfo.Active))
|
||||
|
||||
// The stale target directory should have been cleaned up
|
||||
_, err = os.Stat(staleTarget)
|
||||
assert.True(t, os.IsNotExist(err), "stale target directory should be removed, but still exists")
|
||||
|
||||
// Cleanup
|
||||
assert.NoError(t, m.Deactivate(ctx, "task1"))
|
||||
}
|
||||
|
||||
func TestInfo(t *testing.T) {
|
||||
td := t.TempDir()
|
||||
metadb := filepath.Join(td, "mounts.db")
|
||||
targetdir := filepath.Join(td, "m")
|
||||
db, err := bolt.Open(metadb, 0600, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { db.Close() })
|
||||
ctx := namespaces.WithNamespace(context.Background(), "test")
|
||||
|
||||
mountC := new(atomic.Int32)
|
||||
m, err := NewManager(db, targetdir, WithMountHandler("noop", &noopHandler{mounts: mountC}))
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { m.(io.Closer).Close() })
|
||||
|
||||
// Info on non-existent mount should return ErrNotFound
|
||||
_, err = m.Info(ctx, "nonexistent")
|
||||
assert.True(t, errdefs.IsNotFound(err), "expected ErrNotFound, got: %v", err)
|
||||
|
||||
// Activate a mount with labels
|
||||
labels := map[string]string{
|
||||
"containerd.io/gc.bref.container": "ctr1",
|
||||
"custom-label": "value1",
|
||||
}
|
||||
mounts := []mount.Mount{{Type: "noop"}}
|
||||
ainfo, err := m.Activate(ctx, "task1", mounts, mount.WithLabels(labels))
|
||||
require.NoError(t, err)
|
||||
defer m.Deactivate(ctx, "task1")
|
||||
|
||||
// Info should return the correct activation info
|
||||
info, err := m.Info(ctx, "task1")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "task1", info.Name)
|
||||
assert.Equal(t, 1, len(info.Active))
|
||||
assert.Equal(t, "noop", info.Active[0].Type)
|
||||
assert.NotNil(t, info.Active[0].MountedAt)
|
||||
assert.Equal(t, labels, info.Labels)
|
||||
|
||||
// Info active and system mounts should match those returned by Activate
|
||||
require.Equal(t, len(ainfo.Active), len(info.Active))
|
||||
for i := range ainfo.Active {
|
||||
assert.Equal(t, ainfo.Active[i].Type, info.Active[i].Type)
|
||||
assert.Equal(t, ainfo.Active[i].MountPoint, info.Active[i].MountPoint)
|
||||
assert.Equal(t, ainfo.Active[i].MountedAt.Unix(), info.Active[i].MountedAt.Unix())
|
||||
}
|
||||
// No system mounts when all mounts are handled
|
||||
assert.Empty(t, ainfo.System)
|
||||
assert.Empty(t, info.System)
|
||||
|
||||
// Activate a second mount and verify Info returns correct data for each
|
||||
_, err = m.Activate(ctx, "task2", mounts)
|
||||
require.NoError(t, err)
|
||||
defer m.Deactivate(ctx, "task2")
|
||||
|
||||
info2, err := m.Info(ctx, "task2")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "task2", info2.Name)
|
||||
assert.Equal(t, 1, len(info2.Active))
|
||||
// task2 has no custom labels
|
||||
assert.Empty(t, info2.Labels)
|
||||
|
||||
// Original task1 info should be unchanged
|
||||
info1, err := m.Info(ctx, "task1")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "task1", info1.Name)
|
||||
assert.Equal(t, labels, info1.Labels)
|
||||
}
|
||||
|
||||
func TestInfoSystemMounts(t *testing.T) {
|
||||
td := t.TempDir()
|
||||
metadb := filepath.Join(td, "mounts.db")
|
||||
targetdir := filepath.Join(td, "m")
|
||||
db, err := bolt.Open(metadb, 0600, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { db.Close() })
|
||||
ctx := namespaces.WithNamespace(context.Background(), "test")
|
||||
|
||||
mountC := new(atomic.Int32)
|
||||
// Only register a handler for "noop"; "bind" will pass through as a system mount
|
||||
m, err := NewManager(db, targetdir, WithMountHandler("noop", &noopHandler{mounts: mountC}))
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { m.(io.Closer).Close() })
|
||||
|
||||
sourcedir := filepath.Join(td, "source")
|
||||
require.NoError(t, os.Mkdir(sourcedir, 0700))
|
||||
|
||||
mounts := []mount.Mount{
|
||||
{Type: "noop"},
|
||||
{
|
||||
Type: "bind",
|
||||
Source: sourcedir,
|
||||
Options: []string{"rbind", "ro"},
|
||||
},
|
||||
}
|
||||
|
||||
ainfo, err := m.Activate(ctx, "task1", mounts)
|
||||
require.NoError(t, err)
|
||||
defer m.Deactivate(ctx, "task1")
|
||||
|
||||
// Activate should return one active mount and one system mount
|
||||
require.Equal(t, 1, len(ainfo.Active))
|
||||
require.Equal(t, 1, len(ainfo.System))
|
||||
assert.Equal(t, "bind", ainfo.System[0].Type)
|
||||
assert.Equal(t, sourcedir, ainfo.System[0].Source)
|
||||
assert.Equal(t, []string{"rbind", "ro"}, ainfo.System[0].Options)
|
||||
|
||||
// Info should return the same active and system mounts
|
||||
info, err := m.Info(ctx, "task1")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "task1", info.Name)
|
||||
|
||||
require.Equal(t, len(ainfo.Active), len(info.Active))
|
||||
for i := range ainfo.Active {
|
||||
assert.Equal(t, ainfo.Active[i].Type, info.Active[i].Type)
|
||||
assert.Equal(t, ainfo.Active[i].MountPoint, info.Active[i].MountPoint)
|
||||
assert.Equal(t, ainfo.Active[i].MountedAt.Unix(), info.Active[i].MountedAt.Unix())
|
||||
}
|
||||
|
||||
require.Equal(t, len(ainfo.System), len(info.System))
|
||||
for i := range ainfo.System {
|
||||
assert.Equal(t, ainfo.System[i].Type, info.System[i].Type)
|
||||
assert.Equal(t, ainfo.System[i].Source, info.System[i].Source)
|
||||
assert.Equal(t, ainfo.System[i].Target, info.System[i].Target)
|
||||
assert.Equal(t, ainfo.System[i].Options, info.System[i].Options)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivateConcurrentSameName(t *testing.T) {
|
||||
td := t.TempDir()
|
||||
metadb := filepath.Join(td, "mounts.db")
|
||||
targetdir := filepath.Join(td, "m")
|
||||
db, err := bolt.Open(metadb, 0600, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { db.Close() })
|
||||
ctx := namespaces.WithNamespace(context.Background(), "test")
|
||||
|
||||
mountC := new(atomic.Int32)
|
||||
m, err := NewManager(db, targetdir, WithMountHandler("noop", &noopHandler{mounts: mountC}))
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { m.(io.Closer).Close() })
|
||||
|
||||
mounts := []mount.Mount{{Type: "noop"}}
|
||||
|
||||
// Launch two concurrent activations with the same name.
|
||||
// The per-name lock should serialize them: first succeeds,
|
||||
// second gets ErrAlreadyExists (not a stale-recovery race).
|
||||
errs := make(chan error, 2)
|
||||
for i := 0; i < 2; i++ {
|
||||
go func() {
|
||||
_, err := m.Activate(ctx, "task1", mounts)
|
||||
errs <- err
|
||||
}()
|
||||
}
|
||||
|
||||
err1 := <-errs
|
||||
err2 := <-errs
|
||||
|
||||
// Exactly one should succeed and one should get ErrAlreadyExists
|
||||
if err1 == nil && errdefs.IsAlreadyExists(err2) {
|
||||
// ok
|
||||
} else if err2 == nil && errdefs.IsAlreadyExists(err1) {
|
||||
// ok
|
||||
} else {
|
||||
t.Fatalf("expected one nil and one ErrAlreadyExists, got: %v, %v", err1, err2)
|
||||
}
|
||||
|
||||
assert.NoError(t, m.Deactivate(ctx, "task1"))
|
||||
}
|
||||
|
||||
// TODO: Test deactivate
|
||||
// TODO: Test Sync
|
||||
|
||||
@@ -186,8 +186,7 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
|
||||
}
|
||||
|
||||
// Add options based on runtime
|
||||
ai, err := m.mounts.Activate(ctx, taskID, opts.Rootfs, activateOpts...)
|
||||
if err == nil {
|
||||
if ai, err := m.mounts.Activate(ctx, taskID, opts.Rootfs, activateOpts...); err == nil {
|
||||
opts.Rootfs = ai.System
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
@@ -198,6 +197,15 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
|
||||
}
|
||||
}
|
||||
}()
|
||||
} else if errdefs.IsAlreadyExists(err) {
|
||||
// If creation of task with same identifier, use existing mount rather than forcing
|
||||
// deactivation of the old one. The back reference will prevent racing between
|
||||
// deactivation and re-use, as the container with the same ID would still exist.
|
||||
ai, err = m.mounts.Info(ctx, taskID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get info on already active mount: %w", err)
|
||||
}
|
||||
opts.Rootfs = ai.System
|
||||
} else if !errdefs.IsNotImplemented(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user