diff --git a/cache/refs.go b/cache/refs.go index 3bef999f6..e96836c15 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -65,7 +65,7 @@ func (cr *cacheRecord) mref() *mutableRef { func (cr *cacheRecord) Size(ctx context.Context) (int64, error) { // this expects that usage() is implemented lazily - s, err, _ := cr.sizeG.Do(ctx, cr.id, func(ctx context.Context) (interface{}, error) { + s, err := cr.sizeG.Do(ctx, cr.id, func(ctx context.Context) (interface{}, error) { cr.mu.Lock() s := cr.size cr.mu.Unlock() diff --git a/solver/jobs.go b/solver/jobs.go index 7dca5fb00..2743e648b 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -3,10 +3,10 @@ package solver import ( "context" "io" - "strings" "sync" "time" + "github.com/Sirupsen/logrus" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "github.com/tonistiigi/buildkit_poc/client" @@ -27,7 +27,7 @@ func newJobList() *jobList { return jl } -func (jl *jobList) new(ctx context.Context, id string, g *opVertex, pr progress.ProgressReader) (*job, error) { +func (jl *jobList) new(ctx context.Context, id string, g *opVertex, pr progress.Reader) (*job, error) { jl.mu.Lock() defer jl.mu.Unlock() @@ -106,10 +106,14 @@ func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error { ss.Vertexes = append(ss.Vertexes, &v) case progress.Status: - i := strings.Index(p.ID, ".") + vtx, ok := p.Meta("vertex") + if !ok { + logrus.Warnf("progress %s status without vertex info", p.ID) + continue + } vs := &client.VertexStatus{ - ID: p.ID[i+1:], - Vertex: digest.Digest(p.ID[:i]), // TODO: this needs to be handled better + ID: p.ID, + Vertex: vtx.(digest.Digest), Name: v.Action, Total: int64(v.Total), Current: int64(v.Current), diff --git a/solver/solver.go b/solver/solver.go index fb3e6cfae..e4100e4c6 100644 --- a/solver/solver.go +++ b/solver/solver.go @@ -121,6 +121,9 @@ func (g *opVertex) solve(ctx context.Context, opt Opt) (retErr error) { } }() + pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.dgst)) + defer pw.Close() + if len(g.inputs) > 0 { eg, ctx := errgroup.WithContext(ctx) @@ -140,9 +143,6 @@ func (g *opVertex) solve(ctx context.Context, opt Opt) (retErr error) { } } - pw, _, ctx := progress.FromContext(ctx, g.dgst.String()) - defer pw.Done() - g.notifyStarted(pw) defer g.notifyComplete(pw) @@ -216,16 +216,16 @@ func (g *opVertex) solve(ctx context.Context, opt Opt) (retErr error) { return nil } -func (g *opVertex) notifyStarted(pw progress.ProgressWriter) { +func (g *opVertex) notifyStarted(pw progress.Writer) { now := time.Now() g.vtx.Started = &now - pw.Write(g.vtx) + pw.Write(g.dgst.String(), g.vtx) } -func (g *opVertex) notifyComplete(pw progress.ProgressWriter) { +func (g *opVertex) notifyComplete(pw progress.Writer) { now := time.Now() g.vtx.Completed = &now - pw.Write(g.vtx) + pw.Write(g.dgst.String(), g.vtx) } func (g *opVertex) name() string { diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index 0e3ad63e0..a8af7be2b 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -78,6 +78,7 @@ func (is *imageSource) Pull(ctx context.Context, id source.Identifier) (cache.Im resolveProgressDone(nil) ongoing := newJobs(ref) + pctx, stopProgress := context.WithCancel(ctx) go showProgress(pctx, ongoing, is.ContentStore) @@ -93,7 +94,7 @@ func (is *imageSource) Pull(ctx context.Context, id source.Identifier) (cache.Im // or 2) cachemanager should manage the contentstore handlers := []images.Handler{ images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - ongoing.add(ctx, desc) + ongoing.add(desc) return nil, nil }), remotes.FetchHandler(is.ContentStore, fetcher), @@ -176,22 +177,21 @@ func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Desc func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) { var ( - ticker = time.NewTicker(100 * time.Millisecond) - // fw = progress.NewWriter(out) + ticker = time.NewTicker(100 * time.Millisecond) statuses = map[string]statusInfo{} done bool ) defer ticker.Stop() + pw, _, ctx := progress.FromContext(ctx) + defer pw.Close() + for { select { case <-ticker.C: case <-ctx.Done(): done = true } - // fw.Flush() - - // tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0) resolved := "resolved" if !ongoing.isResolved() { @@ -201,7 +201,6 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) { Ref: ongoing.name, Status: resolved, } - // keys := []string{ongoing.name} actives := make(map[string]statusInfo) @@ -228,7 +227,7 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) { for _, j := range ongoing.jobs() { refKey := remotes.MakeRefKey(ctx, j.Descriptor) if a, ok := actives[refKey]; ok { - j.Write(progress.Status{ + pw.Write(j.Digest.String(), progress.Status{ Action: a.Status, Total: int(a.Total), Current: int(a.Offset), @@ -241,7 +240,7 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) { info, err := cs.Info(ctx, j.Digest) if err != nil { if content.IsNotFound(err) { - j.Write(progress.Status{ + pw.Write(j.Digest.String(), progress.Status{ Action: "waiting", }) continue @@ -251,14 +250,13 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) { } if done || j.done { - j.Write(progress.Status{ + pw.Write(j.Digest.String(), progress.Status{ Action: "done", Current: int(info.Size), Total: int(info.Size), Completed: &info.CommittedAt, Started: &j.started, }) - j.Done() } } } @@ -282,7 +280,6 @@ type jobs struct { type job struct { ocispec.Descriptor - progress.ProgressWriter done bool started time.Time } @@ -294,18 +291,16 @@ func newJobs(name string) *jobs { } } -func (j *jobs) add(ctx context.Context, desc ocispec.Descriptor) { +func (j *jobs) add(desc ocispec.Descriptor) { j.mu.Lock() defer j.mu.Unlock() if _, ok := j.added[desc.Digest]; ok { return } - pw, _, _ := progress.FromContext(ctx, desc.Digest.String()) j.added[desc.Digest] = job{ - Descriptor: desc, - ProgressWriter: pw, - started: time.Now(), + Descriptor: desc, + started: time.Now(), } } @@ -336,18 +331,18 @@ type statusInfo struct { } func oneOffProgress(ctx context.Context, id string) func(err error) error { - pw, _, _ := progress.FromContext(ctx, id) + pw, _, _ := progress.FromContext(ctx) now := time.Now() st := progress.Status{ Started: &now, } - pw.Write(st) + pw.Write(id, st) return func(err error) error { // TODO: set error on status now := time.Now() st.Completed = &now - pw.Write(st) - pw.Done() + pw.Write(id, st) + pw.Close() return err } } diff --git a/util/flightcontrol/flightcontrol.go b/util/flightcontrol/flightcontrol.go index a805970b3..7bfec5b36 100644 --- a/util/flightcontrol/flightcontrol.go +++ b/util/flightcontrol/flightcontrol.go @@ -1,12 +1,14 @@ package flightcontrol import ( + "io" "runtime" + "sort" "sync" "time" "github.com/pkg/errors" - + "github.com/tonistiigi/buildkit_poc/util/progress" "golang.org/x/net/context" ) @@ -20,21 +22,23 @@ type Group struct { m map[string]*call // lazily initialized } -func (g *Group) Do(ctx context.Context, key string, fn func(ctx context.Context) (interface{}, error)) (v interface{}, err error, shared bool) { +func (g *Group) Do(ctx context.Context, key string, fn func(ctx context.Context) (interface{}, error)) (v interface{}, err error) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } + if c, ok := g.m[key]; ok { // register 2nd waiter g.mu.Unlock() - v, err, shared := c.wait(ctx) + v, err := c.wait(ctx) if err == errRetry { runtime.Gosched() return g.Do(ctx, key, fn) } - return v, err, shared + return v, err } - c := &call{fn: fn, ready: make(chan struct{})} + + c := newCall(fn) g.m[key] = c go func() { // cleanup after a caller has returned @@ -52,34 +56,57 @@ type call struct { result interface{} err error ready chan struct{} - ctx *ctx - fn func(ctx context.Context) (interface{}, error) + + ctx *sharedContext + ctxs []context.Context + fn func(ctx context.Context) (interface{}, error) + once sync.Once + + closeProgressWriter func() + progressState *progressState } -func (c *call) wait(ctx context.Context) (v interface{}, err error, shared bool) { +func newCall(fn func(ctx context.Context) (interface{}, error)) *call { + c := &call{ + fn: fn, + ready: make(chan struct{}), + progressState: newProgressState(), + } + ctx := newContext(c) // newSharedContext + pr, _, closeProgressWriter := progress.NewContext(ctx) + + c.ctx = ctx + c.closeProgressWriter = closeProgressWriter + + go c.progressState.run(pr) + + return c +} + +func (c *call) run() { + defer c.closeProgressWriter() + v, err := c.fn(c.ctx) + c.mu.Lock() + c.result = v + c.err = err + c.mu.Unlock() + close(c.ready) +} + +func (c *call) wait(ctx context.Context) (v interface{}, err error) { c.mu.Lock() // detect case where caller has just returned, let it clean up before select { - case <-c.ready: + case <-c.ready: // could return if no error c.mu.Unlock() - return nil, errRetry, false + return nil, errRetry default: } - if c.ctx == nil { // first invocation, register shared context - c.ctx = newContext() - c.ctx.append(ctx) - go func() { - v, err := c.fn(c.ctx) - c.mu.Lock() - c.result = v - c.err = err - c.mu.Unlock() - close(c.ready) - }() - } else { - c.ctx.append(ctx) - } + c.append(ctx) c.mu.Unlock() + + go c.once.Do(c.run) + select { case <-ctx.Done(): select { @@ -87,43 +114,91 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error, shared bool) // if this cancelled the last context, then wait for function to shut down // and don't accept any more callers <-c.ready - return c.result, c.err, false + return c.result, c.err default: - return nil, ctx.Err(), false + return nil, ctx.Err() } case <-c.ready: - return c.result, c.err, false // shared not implemented yet + return c.result, c.err // shared not implemented yet } } -type ctx struct { - mu sync.Mutex - ctxs []context.Context - done chan struct{} - err error -} - -func newContext() *ctx { - return &ctx{done: make(chan struct{})} -} - -func (c *ctx) append(ctx context.Context) { - c.mu.Lock() - defer c.mu.Unlock() +func (c *call) append(ctx context.Context) { + pw, ok, ctx := progress.FromContext(ctx) + if ok { + c.progressState.add(pw) + } c.ctxs = append(c.ctxs, ctx) go func() { select { - case <-c.done: + case <-c.ctx.done: case <-ctx.Done(): c.mu.Lock() - c.signalDone() + c.ctx.signal() c.mu.Unlock() } }() } +func (c *call) Deadline() (deadline time.Time, ok bool) { + c.mu.Lock() + defer c.mu.Unlock() + for _, ctx := range c.ctxs { + select { + case <-ctx.Done(): + default: + dl, ok := ctx.Deadline() + if ok { + return dl, ok + } + } + } + return time.Time{}, false +} + +func (c *call) Done() <-chan struct{} { + c.mu.Lock() + c.ctx.signal() + c.mu.Unlock() + return c.ctx.done +} + +func (c *call) Err() error { + select { + case <-c.ctx.Done(): + return c.ctx.err + default: + return nil + } +} + +func (c *call) Value(key interface{}) interface{} { + c.mu.Lock() + defer c.mu.Unlock() + for _, ctx := range append([]context.Context{}, c.ctxs...) { + select { + case <-ctx.Done(): + default: + if v := ctx.Value(key); v != nil { + return v + } + } + } + return nil +} + +type sharedContext struct { + *call + done chan struct{} + err error +} + +func newContext(c *call) *sharedContext { + return &sharedContext{call: c, done: make(chan struct{})} +} + // call with lock -func (c *ctx) signalDone() { +func (c *sharedContext) signal() { select { case <-c.done: default: @@ -141,47 +216,69 @@ func (c *ctx) signalDone() { } } -func (c *ctx) Deadline() (deadline time.Time, ok bool) { - c.mu.Lock() - defer c.mu.Unlock() - for _, ctx := range c.ctxs { - select { - case <-ctx.Done(): - default: - dl, ok := ctx.Deadline() - if ok { - return dl, ok +type rawProgressWriter interface { + WriteRawProgress(*progress.Progress) error + Close() error +} + +type progressState struct { + mu sync.Mutex + items map[string]*progress.Progress + writers []rawProgressWriter + done bool +} + +func newProgressState() *progressState { + return &progressState{ + items: make(map[string]*progress.Progress), + } +} + +func (ps *progressState) run(pr progress.Reader) { + for { + p, err := pr.Read(context.TODO()) + if err != nil { + if err == io.EOF { + ps.mu.Lock() + ps.done = true + ps.mu.Unlock() + for _, w := range ps.writers { + w.Close() + } } + return } - } - return time.Time{}, false -} - -func (c *ctx) Done() <-chan struct{} { - c.mu.Lock() - c.signalDone() - c.mu.Unlock() - return c.done -} - -func (c *ctx) Err() error { - select { - case <-c.Done(): - return c.err - default: - return nil - } -} - -func (c *ctx) Value(key interface{}) interface{} { - c.mu.Lock() - defer c.mu.Unlock() - for _, ctx := range c.ctxs { - select { - case <-ctx.Done(): - default: - return ctx.Value(key) + ps.mu.Lock() + for _, p := range p { + for _, w := range ps.writers { + w.WriteRawProgress(p) + } + ps.items[p.ID] = p } + ps.mu.Unlock() } - return nil +} + +func (ps *progressState) add(pw progress.Writer) { + rw, ok := pw.(rawProgressWriter) + if !ok { + return + } + ps.mu.Lock() + plist := make([]*progress.Progress, 0, len(ps.items)) + for _, p := range ps.items { + plist = append(plist, p) + } + sort.Slice(plist, func(i, j int) bool { + return plist[i].Timestamp.Before(plist[j].Timestamp) + }) + for _, p := range plist { + rw.WriteRawProgress(p) + } + if ps.done { + rw.Close() + } else { + ps.writers = append(ps.writers, rw) + } + ps.mu.Unlock() } diff --git a/util/flightcontrol/flightcontrol_test.go b/util/flightcontrol/flightcontrol_test.go index 10d014f3f..21e5ee094 100644 --- a/util/flightcontrol/flightcontrol_test.go +++ b/util/flightcontrol/flightcontrol_test.go @@ -18,7 +18,7 @@ func TestNoCancel(t *testing.T) { var counter int64 f := testFunc(100*time.Millisecond, "bar", &counter) eg.Go(func() error { - ret1, err, _ := g.Do(ctx, "foo", f) + ret1, err := g.Do(ctx, "foo", f) if err != nil { return err } @@ -26,7 +26,7 @@ func TestNoCancel(t *testing.T) { return nil }) eg.Go(func() error { - ret2, err, _ := g.Do(ctx, "foo", f) + ret2, err := g.Do(ctx, "foo", f) if err != nil { return err } @@ -48,7 +48,7 @@ func TestCancelOne(t *testing.T) { f := testFunc(100*time.Millisecond, "bar", &counter) ctx2, cancel := context.WithCancel(ctx) eg.Go(func() error { - ret1, err, _ := g.Do(ctx2, "foo", f) + ret1, err := g.Do(ctx2, "foo", f) assert.Error(t, err) assert.Equal(t, errors.Cause(err), context.Canceled) if err == nil { @@ -57,7 +57,7 @@ func TestCancelOne(t *testing.T) { return nil }) eg.Go(func() error { - ret2, err, _ := g.Do(ctx, "foo", f) + ret2, err := g.Do(ctx, "foo", f) if err != nil { return err } @@ -85,11 +85,11 @@ func TestCancelBoth(t *testing.T) { eg, ctx := errgroup.WithContext(context.Background()) var r1, r2 string var counter int64 - f := testFunc(100*time.Millisecond, "bar", &counter) + f := testFunc(200*time.Millisecond, "bar", &counter) ctx2, cancel2 := context.WithCancel(ctx) ctx3, cancel3 := context.WithCancel(ctx) eg.Go(func() error { - ret1, err, _ := g.Do(ctx2, "foo", f) + ret1, err := g.Do(ctx2, "foo", f) assert.Error(t, err) assert.Equal(t, errors.Cause(err), context.Canceled) if err == nil { @@ -98,7 +98,7 @@ func TestCancelBoth(t *testing.T) { return nil }) eg.Go(func() error { - ret2, err, _ := g.Do(ctx3, "foo", f) + ret2, err := g.Do(ctx3, "foo", f) assert.Error(t, err) assert.Equal(t, errors.Cause(err), context.Canceled) if err == nil { @@ -130,15 +130,15 @@ func TestCancelBoth(t *testing.T) { assert.Equal(t, "", r2) assert.Equal(t, counter, int64(1)) - ret1, err, _ := g.Do(context.TODO(), "foo", f) + ret1, err := g.Do(context.TODO(), "foo", f) assert.NoError(t, err) assert.Equal(t, ret1, "bar") f2 := testFunc(100*time.Millisecond, "baz", &counter) - ret1, err, _ = g.Do(context.TODO(), "foo", f2) + ret1, err = g.Do(context.TODO(), "foo", f2) assert.NoError(t, err) assert.Equal(t, ret1, "baz") - ret1, err, _ = g.Do(context.TODO(), "abc", f) + ret1, err = g.Do(context.TODO(), "abc", f) assert.NoError(t, err) assert.Equal(t, ret1, "bar") diff --git a/util/progress/multireader.go b/util/progress/multireader.go index d9dbd4222..2583ed814 100644 --- a/util/progress/multireader.go +++ b/util/progress/multireader.go @@ -8,13 +8,13 @@ import ( type MultiReader struct { mu sync.Mutex - main ProgressReader + main Reader initialized bool done chan struct{} writers map[*progressWriter]func() } -func NewMultiReader(pr ProgressReader) *MultiReader { +func NewMultiReader(pr Reader) *MultiReader { mr := &MultiReader{ main: pr, writers: make(map[*progressWriter]func()), @@ -23,12 +23,12 @@ func NewMultiReader(pr ProgressReader) *MultiReader { return mr } -func (mr *MultiReader) Reader(ctx context.Context) ProgressReader { +func (mr *MultiReader) Reader(ctx context.Context) Reader { mr.mu.Lock() defer mr.mu.Unlock() pr, ctx, closeWriter := NewContext(ctx) - pw, _, _ := FromContext(ctx, "") + pw, _, ctx := FromContext(ctx) w := pw.(*progressWriter) mr.writers[w] = closeWriter @@ -57,8 +57,9 @@ func (mr *MultiReader) handle() error { if err != nil { if err == io.EOF { mr.mu.Lock() - for w := range mr.writers { - w.Done() + for w, c := range mr.writers { + w.Close() + c() } mr.mu.Unlock() return nil @@ -67,12 +68,8 @@ func (mr *MultiReader) handle() error { } mr.mu.Lock() for _, p := range p { - for w, c := range mr.writers { - if p == nil { - c() - } else { - w.write(*p) - } + for w := range mr.writers { + w.WriteRawProgress(p) } } mr.mu.Unlock() diff --git a/util/progress/progress.go b/util/progress/progress.go index 6f884219d..4c9478214 100644 --- a/util/progress/progress.go +++ b/util/progress/progress.go @@ -10,32 +10,53 @@ import ( "github.com/pkg/errors" ) +// Progress package provides utility functions for using the context to capture +// progress of a running function. All progress items written contain an ID +// that is used to collapse unread messages. + type contextKeyT string var contextKey = contextKeyT("buildkit/util/progress") -func FromContext(ctx context.Context, name string) (ProgressWriter, bool, context.Context) { +// FromContext returns a progress writer from a context. +func FromContext(ctx context.Context, opts ...WriterOption) (Writer, bool, context.Context) { pw, ok := ctx.Value(contextKey).(*progressWriter) if !ok { return &noOpWriter{}, false, ctx } - pw = newWriter(pw, name) + pw = newWriter(pw) + for _, o := range opts { + o(pw) + } ctx = context.WithValue(ctx, contextKey, pw) return pw, true, ctx } -func NewContext(ctx context.Context) (ProgressReader, context.Context, func()) { +type WriterOption func(Writer) + +// NewContext returns a new context and a progress reader that captures all +// progress items writtern to this context. Last returned parameter is a closer +// function to signal that no new writes will happen to this context. +func NewContext(ctx context.Context) (Reader, context.Context, func()) { pr, pw, cancel := pipe() ctx = context.WithValue(ctx, contextKey, pw) return pr, ctx, cancel } -type ProgressWriter interface { - Write(interface{}) error - Done() error // Close +func WithMetadata(key string, val interface{}) WriterOption { + return func(w Writer) { + if pw, ok := w.(*progressWriter); ok { + pw.meta[key] = val + } + } } -type ProgressReader interface { +type Writer interface { + Write(id string, value interface{}) error + Close() error +} + +type Reader interface { Read(context.Context) ([]*Progress, error) } @@ -43,10 +64,10 @@ type Progress struct { ID string Timestamp time.Time Sys interface{} + meta map[string]interface{} } type Status struct { - // ...progress of an action Action string Current int Total int @@ -62,32 +83,6 @@ type progressReader struct { dirty map[string]*Progress } -// type progressState struct { -// mu sync.Mutex -// -// } -// -// type streamHandle struct { -// pw *progressWriter -// lastP *Progress -// } -// -// func (sh *streamHandle) next() (*Progress, bool) { -// lasti := sh.pw.lastP.Load() -// if lasti != nil { -// last := lasti.(*Progress) -// if last != sh.lastP { -// sh.lastP = last -// return last, true -// } -// } -// return nil, false -// } -// -// func (pr *progressReader) write(id string, p *Progress) { -// -// } - func (pr *progressReader) Read(ctx context.Context) ([]*Progress, error) { done := make(chan struct{}) defer close(done) @@ -107,11 +102,14 @@ func (pr *progressReader) Read(ctx context.Context) ([]*Progress, error) { default: } dmap := pr.dirty - open := len(pr.writers) > 0 if len(dmap) == 0 { - if !open { - pr.mu.Unlock() - return nil, io.EOF + select { + case <-pr.ctx.Done(): + if len(pr.writers) == 0 { + pr.mu.Unlock() + return nil, io.EOF + } + default: } pr.cond.Wait() continue @@ -162,48 +160,46 @@ func pipe() (*progressReader, *progressWriter, func()) { return pr, pw, cancel } -func newWriter(pw *progressWriter, name string) *progressWriter { - if pw.id != "" { - if name == "" { - name = pw.id - } else { - name = pw.id + "." + name - } +func newWriter(pw *progressWriter) *progressWriter { + meta := make(map[string]interface{}) + for k, v := range pw.meta { + meta[k] = v } pw = &progressWriter{ - id: name, reader: pw.reader, + meta: meta, } pw.reader.append(pw) return pw } type progressWriter struct { - id string done bool reader *progressReader + meta map[string]interface{} } -func (pw *progressWriter) Write(s interface{}) error { +func (pw *progressWriter) Write(id string, v interface{}) error { if pw.done { - return errors.Errorf("writing to closed progresswriter %s", pw.id) + return errors.Errorf("writing %s to closed progress writer", id) } - var p Progress - p.ID = pw.id - p.Timestamp = time.Now() - p.Sys = s - return pw.write(p) + return pw.WriteRawProgress(&Progress{ + ID: id, + Timestamp: time.Now(), + Sys: v, + meta: pw.meta, + }) } -func (pw *progressWriter) write(p Progress) error { +func (pw *progressWriter) WriteRawProgress(p *Progress) error { pw.reader.mu.Lock() - pw.reader.dirty[pw.id+"."+p.ID] = &p + pw.reader.dirty[p.ID] = p pw.reader.mu.Unlock() pw.reader.cond.Broadcast() return nil } -func (pw *progressWriter) Done() error { +func (pw *progressWriter) Close() error { pw.reader.mu.Lock() delete(pw.reader.writers, pw) pw.reader.mu.Unlock() @@ -212,19 +208,17 @@ func (pw *progressWriter) Done() error { return nil } +func (p *Progress) Meta(key string) (interface{}, bool) { + v, ok := p.meta[key] + return v, ok +} + type noOpWriter struct{} -func (pw *noOpWriter) Write(p interface{}) error { +func (pw *noOpWriter) Write(_ string, _ interface{}) error { return nil } -func (pw *noOpWriter) Done() error { +func (pw *noOpWriter) Close() error { return nil } - -// type ProgressRecord struct { -// UUID string -// Parent string -// Done bool -// *Progress -// } diff --git a/util/progress/progress_test.go b/util/progress/progress_test.go index 5a113e1d7..7e3756eb8 100644 --- a/util/progress/progress_test.go +++ b/util/progress/progress_test.go @@ -23,7 +23,10 @@ func TestProgress(t *testing.T) { eg.Go(func() error { return saveProgress(ctx, pr, &trace) }) + + pw, _, ctx := FromContext(ctx, WithMetadata("tag", "foo")) s, err = calc(ctx, 5, "calc") + pw.Close() assert.NoError(t, err) assert.Equal(t, 15, s) @@ -33,6 +36,11 @@ func TestProgress(t *testing.T) { assert.True(t, len(trace.items) > 5) assert.True(t, len(trace.items) <= 7) + for _, p := range trace.items { + v, ok := p.Meta("tag") + assert.True(t, ok) + assert.Equal(t, v.(string), "foo") + } } func TestProgressNested(t *testing.T) { @@ -56,11 +64,11 @@ func TestProgressNested(t *testing.T) { } func calc(ctx context.Context, total int, name string) (int, error) { - pw, _, ctx := FromContext(ctx, name) - defer pw.Done() + pw, _, ctx := FromContext(ctx) + defer pw.Close() sum := 0 - pw.Write(Status{Action: "starting", Total: total}) + pw.Write(name, Status{Action: "starting", Total: total}) for i := 1; i <= total; i++ { select { case <-ctx.Done(): @@ -68,13 +76,12 @@ func calc(ctx context.Context, total int, name string) (int, error) { case <-time.After(10 * time.Millisecond): } if i == total { - pw.Write(Status{Action: "done", Total: total, Current: total}) + pw.Write(name, Status{Action: "done", Total: total, Current: total}) } else { - pw.Write(Status{Action: "calculating", Total: total, Current: i}) + pw.Write(name, Status{Action: "calculating", Total: total, Current: i}) } sum += i } - pw.Done() return sum, nil } @@ -82,10 +89,10 @@ func calc(ctx context.Context, total int, name string) (int, error) { func reduceCalc(ctx context.Context, total int) (int, error) { eg, ctx := errgroup.WithContext(ctx) - pw, _, ctx := FromContext(ctx, "reduce") - defer pw.Done() + pw, _, ctx := FromContext(ctx) + defer pw.Close() - pw.Write(Status{Action: "starting"}) + pw.Write("reduce", Status{Action: "starting"}) // sync step sum, err := calc(ctx, total, "synccalc") @@ -111,7 +118,7 @@ type trace struct { items []*Progress } -func saveProgress(ctx context.Context, pr ProgressReader, t *trace) error { +func saveProgress(ctx context.Context, pr Reader, t *trace) error { for { p, err := pr.Read(ctx) if err != nil { diff --git a/util/progress/progressui/display.go b/util/progress/progressui/display.go index 5081bb2e7..c31e35553 100644 --- a/util/progress/progressui/display.go +++ b/util/progress/progressui/display.go @@ -133,7 +133,7 @@ func (t *trace) displayInfo(maxRows int) (d displayInfo) { name: "=> " + s.ID, } if s.Total != 0 { - j.name += " " + units.HumanSize(float64(s.Current)) + " / " + units.HumanSize(float64(s.Current)) + j.name += " " + units.HumanSize(float64(s.Current)) + " / " + units.HumanSize(float64(s.Total)) } d.jobs = append(d.jobs, j) }