move flightcontrol to use generics

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi
2023-06-13 15:51:03 -07:00
parent 62bdf9689b
commit 8ffc03b8f0
19 changed files with 180 additions and 214 deletions

57
cache/blobs.go vendored
View File

@@ -22,7 +22,8 @@ import (
"golang.org/x/sync/errgroup"
)
var g flightcontrol.Group
var g flightcontrol.Group[struct{}]
var gFileList flightcontrol.Group[[]string]
const containerdUncompressed = "containerd.io/uncompressed"
@@ -86,12 +87,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if _, ok := filter[sr.ID()]; ok {
eg.Go(func() error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) {
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (struct{}, error) {
if sr.getBlob() != "" {
return nil, nil
return struct{}{}, nil
}
if !createIfNeeded {
return nil, errors.WithStack(ErrNoBlobs)
return struct{}{}, errors.WithStack(ErrNoBlobs)
}
compressorFunc, finalize := comp.Type.Compress(ctx, comp)
@@ -108,12 +109,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if lowerRef != nil {
m, err := lowerRef.Mount(ctx, true, s)
if err != nil {
return nil, err
return struct{}{}, err
}
var release func() error
lower, release, err = m.Mount()
if err != nil {
return nil, err
return struct{}{}, err
}
if release != nil {
defer release()
@@ -131,12 +132,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if upperRef != nil {
m, err := upperRef.Mount(ctx, true, s)
if err != nil {
return nil, err
return struct{}{}, err
}
var release func() error
upper, release, err = m.Mount()
if err != nil {
return nil, err
return struct{}{}, err
}
if release != nil {
defer release()
@@ -151,7 +152,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if forceOvlStr := os.Getenv("BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF"); forceOvlStr != "" && sr.kind() != Diff {
enableOverlay, err = strconv.ParseBool(forceOvlStr)
if err != nil {
return nil, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
return struct{}{}, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
}
fallback = false // prohibit fallback on debug
} else if !isTypeWindows(sr) {
@@ -173,10 +174,10 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if !ok || err != nil {
if !fallback {
if !ok {
return nil, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
return struct{}{}, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
}
if err != nil {
return nil, errors.Wrapf(err, "failed to compute overlay diff")
return struct{}{}, errors.Wrapf(err, "failed to compute overlay diff")
}
}
if logWarnOnErr {
@@ -209,7 +210,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
diff.WithCompressor(compressorFunc),
)
if err != nil {
return nil, err
return struct{}{}, err
}
}
@@ -219,7 +220,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if finalize != nil {
a, err := finalize(ctx, sr.cm.ContentStore)
if err != nil {
return nil, errors.Wrapf(err, "failed to finalize compression")
return struct{}{}, errors.Wrapf(err, "failed to finalize compression")
}
for k, v := range a {
desc.Annotations[k] = v
@@ -227,7 +228,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
}
info, err := sr.cm.ContentStore.Info(ctx, desc.Digest)
if err != nil {
return nil, err
return struct{}{}, err
}
if diffID, ok := info.Labels[containerdUncompressed]; ok {
@@ -235,13 +236,13 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
} else if mediaType == ocispecs.MediaTypeImageLayer {
desc.Annotations[containerdUncompressed] = desc.Digest.String()
} else {
return nil, errors.Errorf("unknown layer compression type")
return struct{}{}, errors.Errorf("unknown layer compression type")
}
if err := sr.setBlob(ctx, desc); err != nil {
return nil, err
return struct{}{}, err
}
return nil, nil
return struct{}{}, nil
})
if err != nil {
return err
@@ -415,29 +416,29 @@ func isTypeWindows(sr *immutableRef) bool {
// ensureCompression ensures the specified ref has the blob of the specified compression Type.
func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (interface{}, error) {
_, err := g.Do(ctx, fmt.Sprintf("ensureComp-%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (struct{}, error) {
desc, err := ref.ociDesc(ctx, ref.descHandlers, true)
if err != nil {
return nil, err
return struct{}{}, err
}
// Resolve converters
layerConvertFunc, err := getConverter(ctx, ref.cm.ContentStore, desc, comp)
if err != nil {
return nil, err
return struct{}{}, err
} else if layerConvertFunc == nil {
if isLazy, err := ref.isLazy(ctx); err != nil {
return nil, err
return struct{}{}, err
} else if isLazy {
// This ref can be used as the specified compressionType. Keep it lazy.
return nil, nil
return struct{}{}, nil
}
return nil, ref.linkBlob(ctx, desc)
return struct{}{}, ref.linkBlob(ctx, desc)
}
// First, lookup local content store
if _, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil {
return nil, nil // found the compression variant. no need to convert.
return struct{}{}, nil // found the compression variant. no need to convert.
}
// Convert layer compression type
@@ -447,18 +448,18 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.
dh: ref.descHandlers[desc.Digest],
session: s,
}).Unlazy(ctx); err != nil {
return nil, err
return struct{}{}, err
}
newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc)
if err != nil {
return nil, errors.Wrapf(err, "failed to convert")
return struct{}{}, errors.Wrapf(err, "failed to convert")
}
// Start to track converted layer
if err := ref.linkBlob(ctx, *newDesc); err != nil {
return nil, errors.Wrapf(err, "failed to add compression blob")
return struct{}{}, errors.Wrapf(err, "failed to add compression blob")
}
return nil, nil
return struct{}{}, nil
})
return err
}

9
cache/filelist.go vendored
View File

@@ -20,7 +20,7 @@ const keyFileList = "filelist"
// are in the tar stream (AUFS whiteout format). If the reference does not have a
// a blob associated with it, the list is empty.
func (sr *immutableRef) FileList(ctx context.Context, s session.Group) ([]string, error) {
res, err := g.Do(ctx, fmt.Sprintf("filelist-%s", sr.ID()), func(ctx context.Context) (interface{}, error) {
return gFileList.Do(ctx, fmt.Sprintf("filelist-%s", sr.ID()), func(ctx context.Context) ([]string, error) {
dt, err := sr.GetExternal(keyFileList)
if err == nil && dt != nil {
var files []string
@@ -80,11 +80,4 @@ func (sr *immutableRef) FileList(ctx context.Context, s session.Group) ([]string
}
return files, nil
})
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
return res.([]string), nil
}

2
cache/manager.go vendored
View File

@@ -93,7 +93,7 @@ type cacheManager struct {
mountPool sharableMountPool
muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group
unlazyG flightcontrol.Group[struct{}]
}
func NewManager(opt ManagerOpt) (Manager, error) {

26
cache/refs.go vendored
View File

@@ -90,7 +90,7 @@ type cacheRecord struct {
mountCache snapshot.Mountable
sizeG flightcontrol.Group
sizeG flightcontrol.Group[int64]
// these are filled if multiple refs point to same data
equalMutable *mutableRef
@@ -325,7 +325,7 @@ func (cr *cacheRecord) viewSnapshotID() string {
func (cr *cacheRecord) size(ctx context.Context) (int64, error) {
// this expects that usage() is implemented lazily
s, err := cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (interface{}, error) {
return cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (int64, error) {
cr.mu.Lock()
s := cr.getSize()
if s != sizeUnknown {
@@ -346,7 +346,7 @@ func (cr *cacheRecord) size(ctx context.Context) (int64, error) {
isDead := cr.isDead()
cr.mu.Unlock()
if isDead {
return int64(0), nil
return 0, nil
}
if !errors.Is(err, errdefs.ErrNotFound) {
return s, errors.Wrapf(err, "failed to get usage for %s", cr.ID())
@@ -379,10 +379,6 @@ func (cr *cacheRecord) size(ctx context.Context) (int64, error) {
cr.mu.Unlock()
return usage.Size, nil
})
if err != nil {
return 0, err
}
return s.(int64), nil
}
// caller must hold cr.mu
@@ -1057,7 +1053,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context,
}
func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s session.Group) error {
_, err := sr.sizeG.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ interface{}, rerr error) {
_, err := g.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ struct{}, rerr error) {
dhs := sr.descHandlers
for _, r := range sr.layerChain() {
r := r
@@ -1069,7 +1065,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s
dh := dhs[digest.Digest(r.getBlob())]
if dh == nil {
// We cannot prepare remote snapshots without descHandler.
return nil, nil
return struct{}{}, nil
}
// tmpLabels contains dh.SnapshotLabels + session IDs. All keys contain
@@ -1121,7 +1117,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s
break
}
return nil, nil
return struct{}{}, nil
})
return err
}
@@ -1144,18 +1140,18 @@ func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields
}
func (sr *immutableRef) unlazy(ctx context.Context, dhs DescHandlers, pg progress.Controller, s session.Group, topLevel bool) error {
_, err := sr.sizeG.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ interface{}, rerr error) {
_, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ struct{}, rerr error) {
if _, err := sr.cm.Snapshotter.Stat(ctx, sr.getSnapshotID()); err == nil {
return nil, nil
return struct{}{}, nil
}
switch sr.kind() {
case Merge, Diff:
return nil, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel)
return struct{}{}, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel)
case Layer, BaseLayer:
return nil, sr.unlazyLayer(ctx, dhs, pg, s)
return struct{}{}, sr.unlazyLayer(ctx, dhs, pg, s)
}
return nil, nil
return struct{}{}, nil
})
return err
}

14
cache/remote.go vendored
View File

@@ -305,11 +305,11 @@ func (p lazyRefProvider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor)
}
func (p lazyRefProvider) Unlazy(ctx context.Context) error {
_, err := p.ref.cm.unlazyG.Do(ctx, string(p.desc.Digest), func(ctx context.Context) (_ interface{}, rerr error) {
_, err := p.ref.cm.unlazyG.Do(ctx, string(p.desc.Digest), func(ctx context.Context) (_ struct{}, rerr error) {
if isLazy, err := p.ref.isLazy(ctx); err != nil {
return nil, err
return struct{}{}, err
} else if !isLazy {
return nil, nil
return struct{}{}, nil
}
defer func() {
if rerr == nil {
@@ -320,7 +320,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
if p.dh == nil {
// shouldn't happen, if you have a lazy immutable ref it already should be validated
// that descriptor handlers exist for it
return nil, errors.New("unexpected nil descriptor handler")
return struct{}{}, errors.New("unexpected nil descriptor handler")
}
if p.dh.Progress != nil {
@@ -337,7 +337,7 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
Manager: p.ref.cm.ContentStore,
}, p.desc, p.dh.Ref, logs.LoggerFromContext(ctx))
if err != nil {
return nil, err
return struct{}{}, err
}
if imageRefs := p.ref.getImageRefs(); len(imageRefs) > 0 {
@@ -345,12 +345,12 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error {
imageRef := imageRefs[0]
if p.ref.GetDescription() == "" {
if err := p.ref.SetDescription("pulled from " + imageRef); err != nil {
return nil, err
return struct{}{}, err
}
}
}
return nil, nil
return struct{}{}, nil
})
return err
}