From 09e531b88e94cf27baf0b1eab31f005098fb37a6 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Mon, 2 Jun 2025 17:33:08 -0700 Subject: [PATCH] Add progress to apply options Allows the apply process to return progress on the tar apply operation. Signed-off-by: Derek McGowan --- core/diff/apply/apply.go | 54 +++++++++++++++++++++++++++++++++++++-- core/diff/diff.go | 14 ++++++++++ core/diff/proxy/differ.go | 7 ++++- 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/core/diff/apply/apply.go b/core/diff/apply/apply.go index 62e7a36233..9acecea35c 100644 --- a/core/diff/apply/apply.go +++ b/core/diff/apply/apply.go @@ -71,10 +71,16 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ if err != nil { return emptyDesc, fmt.Errorf("failed to get reader from content store: %w", err) } - defer ra.Close() + var r io.ReadCloser + if config.Progress != nil { + r = newProgressReader(ra, config.Progress) + } else { + r = newReadCloser(ra) + } + defer r.Close() var processors []diff.StreamProcessor - processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) + processor := diff.NewProcessorChain(desc.MediaType, r) processors = append(processors, processor) for { if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil { @@ -110,6 +116,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ } } } + return ocispec.Descriptor{ MediaType: ocispec.MediaTypeImageLayer, Size: rc.c, @@ -129,3 +136,46 @@ func (rc *readCounter) Read(p []byte) (n int, err error) { } return } + +type progressReader struct { + rc *readCounter + c io.Closer + p func(int64) +} + +func newProgressReader(ra content.ReaderAt, p func(int64)) io.ReadCloser { + return &progressReader{ + rc: &readCounter{ + r: content.NewReader(ra), + c: 0, + }, + c: ra, + p: p, + } +} + +func (pr *progressReader) Read(p []byte) (n int, err error) { + // Call the progress function with the current count, indicating + // the previously read content has been processed. Initial + // progress of 0 indicates start of processing. + pr.p(pr.rc.c) + n, err = pr.rc.Read(p) + return +} + +func (pr *progressReader) Close() error { + pr.p(pr.rc.c) + return pr.c.Close() +} + +type readCloser struct { + io.Reader + io.Closer +} + +func newReadCloser(ra content.ReaderAt) io.ReadCloser { + return &readCloser{ + Reader: content.NewReader(ra), + Closer: ra, + } +} diff --git a/core/diff/diff.go b/core/diff/diff.go index 4838a1c37e..4d20044ca7 100644 --- a/core/diff/diff.go +++ b/core/diff/diff.go @@ -69,6 +69,8 @@ type ApplyConfig struct { ProcessorPayloads map[string]typeurl.Any // SyncFs is to synchronize the underlying filesystem containing files SyncFs bool + // Progress is a function which reports status of processed read data + Progress func(int64) } // ApplyOpt is used to configure an Apply operation @@ -135,6 +137,18 @@ func WithSyncFs(sync bool) ApplyOpt { } } +// WithProgress is used to indicate process of the apply operation, should +// atleast expect a progress of 0 and of the final size. It is up to the applier +// how much progress it reports in between. +func WithProgress(f func(ocispec.Descriptor, int64)) ApplyOpt { + return func(_ context.Context, desc ocispec.Descriptor, c *ApplyConfig) error { + c.Progress = func(state int64) { + f(desc, state) + } + return nil + } +} + // WithSourceDateEpoch specifies the timestamp used to provide control for reproducibility. // See also https://reproducible-builds.org/docs/source-date-epoch/ . // diff --git a/core/diff/proxy/differ.go b/core/diff/proxy/differ.go index 8a5a4abd1e..f34e0d65d9 100644 --- a/core/diff/proxy/differ.go +++ b/core/diff/proxy/differ.go @@ -56,7 +56,9 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts for k, v := range config.ProcessorPayloads { payloads[k] = typeurl.MarshalProto(v) } - + if config.Progress != nil { + config.Progress(0) + } req := &diffapi.ApplyRequest{ Diff: oci.DescriptorToProto(desc), Mounts: mount.ToProto(mounts), @@ -67,6 +69,9 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts if err != nil { return ocispec.Descriptor{}, errgrpc.ToNative(err) } + if config.Progress != nil { + config.Progress(desc.Size) + } return oci.DescriptorFromProto(resp.Applied), nil }