Add progress to apply options

Allows the apply process to return progress on the tar apply operation.

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2025-06-02 17:33:08 -07:00
parent 1a8d84b826
commit 09e531b88e
3 changed files with 72 additions and 3 deletions

View File

@@ -71,10 +71,16 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
if err != nil {
return emptyDesc, fmt.Errorf("failed to get reader from content store: %w", err)
}
defer ra.Close()
var r io.ReadCloser
if config.Progress != nil {
r = newProgressReader(ra, config.Progress)
} else {
r = newReadCloser(ra)
}
defer r.Close()
var processors []diff.StreamProcessor
processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra))
processor := diff.NewProcessorChain(desc.MediaType, r)
processors = append(processors, processor)
for {
if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil {
@@ -110,6 +116,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
}
}
}
return ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageLayer,
Size: rc.c,
@@ -129,3 +136,46 @@ func (rc *readCounter) Read(p []byte) (n int, err error) {
}
return
}
type progressReader struct {
rc *readCounter
c io.Closer
p func(int64)
}
func newProgressReader(ra content.ReaderAt, p func(int64)) io.ReadCloser {
return &progressReader{
rc: &readCounter{
r: content.NewReader(ra),
c: 0,
},
c: ra,
p: p,
}
}
func (pr *progressReader) Read(p []byte) (n int, err error) {
// Call the progress function with the current count, indicating
// the previously read content has been processed. Initial
// progress of 0 indicates start of processing.
pr.p(pr.rc.c)
n, err = pr.rc.Read(p)
return
}
func (pr *progressReader) Close() error {
pr.p(pr.rc.c)
return pr.c.Close()
}
type readCloser struct {
io.Reader
io.Closer
}
func newReadCloser(ra content.ReaderAt) io.ReadCloser {
return &readCloser{
Reader: content.NewReader(ra),
Closer: ra,
}
}

View File

@@ -69,6 +69,8 @@ type ApplyConfig struct {
ProcessorPayloads map[string]typeurl.Any
// SyncFs is to synchronize the underlying filesystem containing files
SyncFs bool
// Progress is a function which reports status of processed read data
Progress func(int64)
}
// ApplyOpt is used to configure an Apply operation
@@ -135,6 +137,18 @@ func WithSyncFs(sync bool) ApplyOpt {
}
}
// WithProgress is used to indicate process of the apply operation, should
// atleast expect a progress of 0 and of the final size. It is up to the applier
// how much progress it reports in between.
func WithProgress(f func(ocispec.Descriptor, int64)) ApplyOpt {
return func(_ context.Context, desc ocispec.Descriptor, c *ApplyConfig) error {
c.Progress = func(state int64) {
f(desc, state)
}
return nil
}
}
// WithSourceDateEpoch specifies the timestamp used to provide control for reproducibility.
// See also https://reproducible-builds.org/docs/source-date-epoch/ .
//

View File

@@ -56,7 +56,9 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts
for k, v := range config.ProcessorPayloads {
payloads[k] = typeurl.MarshalProto(v)
}
if config.Progress != nil {
config.Progress(0)
}
req := &diffapi.ApplyRequest{
Diff: oci.DescriptorToProto(desc),
Mounts: mount.ToProto(mounts),
@@ -67,6 +69,9 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts
if err != nil {
return ocispec.Descriptor{}, errgrpc.ToNative(err)
}
if config.Progress != nil {
config.Progress(desc.Size)
}
return oci.DescriptorFromProto(resp.Applied), nil
}