progress: refactor progress IDs to items

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi
2017-06-16 22:12:27 -07:00
parent fa82d4bf5b
commit 4ed7fa3280
10 changed files with 310 additions and 216 deletions

2
cache/refs.go vendored
View File

@@ -65,7 +65,7 @@ func (cr *cacheRecord) mref() *mutableRef {
func (cr *cacheRecord) Size(ctx context.Context) (int64, error) {
// this expects that usage() is implemented lazily
s, err, _ := cr.sizeG.Do(ctx, cr.id, func(ctx context.Context) (interface{}, error) {
s, err := cr.sizeG.Do(ctx, cr.id, func(ctx context.Context) (interface{}, error) {
cr.mu.Lock()
s := cr.size
cr.mu.Unlock()

View File

@@ -3,10 +3,10 @@ package solver
import (
"context"
"io"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/tonistiigi/buildkit_poc/client"
@@ -27,7 +27,7 @@ func newJobList() *jobList {
return jl
}
func (jl *jobList) new(ctx context.Context, id string, g *opVertex, pr progress.ProgressReader) (*job, error) {
func (jl *jobList) new(ctx context.Context, id string, g *opVertex, pr progress.Reader) (*job, error) {
jl.mu.Lock()
defer jl.mu.Unlock()
@@ -106,10 +106,14 @@ func (j *job) pipe(ctx context.Context, ch chan *client.SolveStatus) error {
ss.Vertexes = append(ss.Vertexes, &v)
case progress.Status:
i := strings.Index(p.ID, ".")
vtx, ok := p.Meta("vertex")
if !ok {
logrus.Warnf("progress %s status without vertex info", p.ID)
continue
}
vs := &client.VertexStatus{
ID: p.ID[i+1:],
Vertex: digest.Digest(p.ID[:i]), // TODO: this needs to be handled better
ID: p.ID,
Vertex: vtx.(digest.Digest),
Name: v.Action,
Total: int64(v.Total),
Current: int64(v.Current),

View File

@@ -121,6 +121,9 @@ func (g *opVertex) solve(ctx context.Context, opt Opt) (retErr error) {
}
}()
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.dgst))
defer pw.Close()
if len(g.inputs) > 0 {
eg, ctx := errgroup.WithContext(ctx)
@@ -140,9 +143,6 @@ func (g *opVertex) solve(ctx context.Context, opt Opt) (retErr error) {
}
}
pw, _, ctx := progress.FromContext(ctx, g.dgst.String())
defer pw.Done()
g.notifyStarted(pw)
defer g.notifyComplete(pw)
@@ -216,16 +216,16 @@ func (g *opVertex) solve(ctx context.Context, opt Opt) (retErr error) {
return nil
}
func (g *opVertex) notifyStarted(pw progress.ProgressWriter) {
func (g *opVertex) notifyStarted(pw progress.Writer) {
now := time.Now()
g.vtx.Started = &now
pw.Write(g.vtx)
pw.Write(g.dgst.String(), g.vtx)
}
func (g *opVertex) notifyComplete(pw progress.ProgressWriter) {
func (g *opVertex) notifyComplete(pw progress.Writer) {
now := time.Now()
g.vtx.Completed = &now
pw.Write(g.vtx)
pw.Write(g.dgst.String(), g.vtx)
}
func (g *opVertex) name() string {

View File

@@ -78,6 +78,7 @@ func (is *imageSource) Pull(ctx context.Context, id source.Identifier) (cache.Im
resolveProgressDone(nil)
ongoing := newJobs(ref)
pctx, stopProgress := context.WithCancel(ctx)
go showProgress(pctx, ongoing, is.ContentStore)
@@ -93,7 +94,7 @@ func (is *imageSource) Pull(ctx context.Context, id source.Identifier) (cache.Im
// or 2) cachemanager should manage the contentstore
handlers := []images.Handler{
images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(ctx, desc)
ongoing.add(desc)
return nil, nil
}),
remotes.FetchHandler(is.ContentStore, fetcher),
@@ -176,22 +177,21 @@ func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Desc
func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
var (
ticker = time.NewTicker(100 * time.Millisecond)
// fw = progress.NewWriter(out)
ticker = time.NewTicker(100 * time.Millisecond)
statuses = map[string]statusInfo{}
done bool
)
defer ticker.Stop()
pw, _, ctx := progress.FromContext(ctx)
defer pw.Close()
for {
select {
case <-ticker.C:
case <-ctx.Done():
done = true
}
// fw.Flush()
// tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
resolved := "resolved"
if !ongoing.isResolved() {
@@ -201,7 +201,6 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
Ref: ongoing.name,
Status: resolved,
}
// keys := []string{ongoing.name}
actives := make(map[string]statusInfo)
@@ -228,7 +227,7 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
for _, j := range ongoing.jobs() {
refKey := remotes.MakeRefKey(ctx, j.Descriptor)
if a, ok := actives[refKey]; ok {
j.Write(progress.Status{
pw.Write(j.Digest.String(), progress.Status{
Action: a.Status,
Total: int(a.Total),
Current: int(a.Offset),
@@ -241,7 +240,7 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
info, err := cs.Info(ctx, j.Digest)
if err != nil {
if content.IsNotFound(err) {
j.Write(progress.Status{
pw.Write(j.Digest.String(), progress.Status{
Action: "waiting",
})
continue
@@ -251,14 +250,13 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
}
if done || j.done {
j.Write(progress.Status{
pw.Write(j.Digest.String(), progress.Status{
Action: "done",
Current: int(info.Size),
Total: int(info.Size),
Completed: &info.CommittedAt,
Started: &j.started,
})
j.Done()
}
}
}
@@ -282,7 +280,6 @@ type jobs struct {
type job struct {
ocispec.Descriptor
progress.ProgressWriter
done bool
started time.Time
}
@@ -294,18 +291,16 @@ func newJobs(name string) *jobs {
}
}
func (j *jobs) add(ctx context.Context, desc ocispec.Descriptor) {
func (j *jobs) add(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()
if _, ok := j.added[desc.Digest]; ok {
return
}
pw, _, _ := progress.FromContext(ctx, desc.Digest.String())
j.added[desc.Digest] = job{
Descriptor: desc,
ProgressWriter: pw,
started: time.Now(),
Descriptor: desc,
started: time.Now(),
}
}
@@ -336,18 +331,18 @@ type statusInfo struct {
}
func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.FromContext(ctx, id)
pw, _, _ := progress.FromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
}
pw.Write(st)
pw.Write(id, st)
return func(err error) error {
// TODO: set error on status
now := time.Now()
st.Completed = &now
pw.Write(st)
pw.Done()
pw.Write(id, st)
pw.Close()
return err
}
}

View File

@@ -1,12 +1,14 @@
package flightcontrol
import (
"io"
"runtime"
"sort"
"sync"
"time"
"github.com/pkg/errors"
"github.com/tonistiigi/buildkit_poc/util/progress"
"golang.org/x/net/context"
)
@@ -20,21 +22,23 @@ type Group struct {
m map[string]*call // lazily initialized
}
func (g *Group) Do(ctx context.Context, key string, fn func(ctx context.Context) (interface{}, error)) (v interface{}, err error, shared bool) {
func (g *Group) Do(ctx context.Context, key string, fn func(ctx context.Context) (interface{}, error)) (v interface{}, err error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { // register 2nd waiter
g.mu.Unlock()
v, err, shared := c.wait(ctx)
v, err := c.wait(ctx)
if err == errRetry {
runtime.Gosched()
return g.Do(ctx, key, fn)
}
return v, err, shared
return v, err
}
c := &call{fn: fn, ready: make(chan struct{})}
c := newCall(fn)
g.m[key] = c
go func() {
// cleanup after a caller has returned
@@ -52,34 +56,57 @@ type call struct {
result interface{}
err error
ready chan struct{}
ctx *ctx
fn func(ctx context.Context) (interface{}, error)
ctx *sharedContext
ctxs []context.Context
fn func(ctx context.Context) (interface{}, error)
once sync.Once
closeProgressWriter func()
progressState *progressState
}
func (c *call) wait(ctx context.Context) (v interface{}, err error, shared bool) {
func newCall(fn func(ctx context.Context) (interface{}, error)) *call {
c := &call{
fn: fn,
ready: make(chan struct{}),
progressState: newProgressState(),
}
ctx := newContext(c) // newSharedContext
pr, _, closeProgressWriter := progress.NewContext(ctx)
c.ctx = ctx
c.closeProgressWriter = closeProgressWriter
go c.progressState.run(pr)
return c
}
func (c *call) run() {
defer c.closeProgressWriter()
v, err := c.fn(c.ctx)
c.mu.Lock()
c.result = v
c.err = err
c.mu.Unlock()
close(c.ready)
}
func (c *call) wait(ctx context.Context) (v interface{}, err error) {
c.mu.Lock()
// detect case where caller has just returned, let it clean up before
select {
case <-c.ready:
case <-c.ready: // could return if no error
c.mu.Unlock()
return nil, errRetry, false
return nil, errRetry
default:
}
if c.ctx == nil { // first invocation, register shared context
c.ctx = newContext()
c.ctx.append(ctx)
go func() {
v, err := c.fn(c.ctx)
c.mu.Lock()
c.result = v
c.err = err
c.mu.Unlock()
close(c.ready)
}()
} else {
c.ctx.append(ctx)
}
c.append(ctx)
c.mu.Unlock()
go c.once.Do(c.run)
select {
case <-ctx.Done():
select {
@@ -87,43 +114,91 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error, shared bool)
// if this cancelled the last context, then wait for function to shut down
// and don't accept any more callers
<-c.ready
return c.result, c.err, false
return c.result, c.err
default:
return nil, ctx.Err(), false
return nil, ctx.Err()
}
case <-c.ready:
return c.result, c.err, false // shared not implemented yet
return c.result, c.err // shared not implemented yet
}
}
type ctx struct {
mu sync.Mutex
ctxs []context.Context
done chan struct{}
err error
}
func newContext() *ctx {
return &ctx{done: make(chan struct{})}
}
func (c *ctx) append(ctx context.Context) {
c.mu.Lock()
defer c.mu.Unlock()
func (c *call) append(ctx context.Context) {
pw, ok, ctx := progress.FromContext(ctx)
if ok {
c.progressState.add(pw)
}
c.ctxs = append(c.ctxs, ctx)
go func() {
select {
case <-c.done:
case <-c.ctx.done:
case <-ctx.Done():
c.mu.Lock()
c.signalDone()
c.ctx.signal()
c.mu.Unlock()
}
}()
}
func (c *call) Deadline() (deadline time.Time, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
for _, ctx := range c.ctxs {
select {
case <-ctx.Done():
default:
dl, ok := ctx.Deadline()
if ok {
return dl, ok
}
}
}
return time.Time{}, false
}
func (c *call) Done() <-chan struct{} {
c.mu.Lock()
c.ctx.signal()
c.mu.Unlock()
return c.ctx.done
}
func (c *call) Err() error {
select {
case <-c.ctx.Done():
return c.ctx.err
default:
return nil
}
}
func (c *call) Value(key interface{}) interface{} {
c.mu.Lock()
defer c.mu.Unlock()
for _, ctx := range append([]context.Context{}, c.ctxs...) {
select {
case <-ctx.Done():
default:
if v := ctx.Value(key); v != nil {
return v
}
}
}
return nil
}
type sharedContext struct {
*call
done chan struct{}
err error
}
func newContext(c *call) *sharedContext {
return &sharedContext{call: c, done: make(chan struct{})}
}
// call with lock
func (c *ctx) signalDone() {
func (c *sharedContext) signal() {
select {
case <-c.done:
default:
@@ -141,47 +216,69 @@ func (c *ctx) signalDone() {
}
}
func (c *ctx) Deadline() (deadline time.Time, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
for _, ctx := range c.ctxs {
select {
case <-ctx.Done():
default:
dl, ok := ctx.Deadline()
if ok {
return dl, ok
type rawProgressWriter interface {
WriteRawProgress(*progress.Progress) error
Close() error
}
type progressState struct {
mu sync.Mutex
items map[string]*progress.Progress
writers []rawProgressWriter
done bool
}
func newProgressState() *progressState {
return &progressState{
items: make(map[string]*progress.Progress),
}
}
func (ps *progressState) run(pr progress.Reader) {
for {
p, err := pr.Read(context.TODO())
if err != nil {
if err == io.EOF {
ps.mu.Lock()
ps.done = true
ps.mu.Unlock()
for _, w := range ps.writers {
w.Close()
}
}
return
}
}
return time.Time{}, false
}
func (c *ctx) Done() <-chan struct{} {
c.mu.Lock()
c.signalDone()
c.mu.Unlock()
return c.done
}
func (c *ctx) Err() error {
select {
case <-c.Done():
return c.err
default:
return nil
}
}
func (c *ctx) Value(key interface{}) interface{} {
c.mu.Lock()
defer c.mu.Unlock()
for _, ctx := range c.ctxs {
select {
case <-ctx.Done():
default:
return ctx.Value(key)
ps.mu.Lock()
for _, p := range p {
for _, w := range ps.writers {
w.WriteRawProgress(p)
}
ps.items[p.ID] = p
}
ps.mu.Unlock()
}
return nil
}
func (ps *progressState) add(pw progress.Writer) {
rw, ok := pw.(rawProgressWriter)
if !ok {
return
}
ps.mu.Lock()
plist := make([]*progress.Progress, 0, len(ps.items))
for _, p := range ps.items {
plist = append(plist, p)
}
sort.Slice(plist, func(i, j int) bool {
return plist[i].Timestamp.Before(plist[j].Timestamp)
})
for _, p := range plist {
rw.WriteRawProgress(p)
}
if ps.done {
rw.Close()
} else {
ps.writers = append(ps.writers, rw)
}
ps.mu.Unlock()
}

View File

@@ -18,7 +18,7 @@ func TestNoCancel(t *testing.T) {
var counter int64
f := testFunc(100*time.Millisecond, "bar", &counter)
eg.Go(func() error {
ret1, err, _ := g.Do(ctx, "foo", f)
ret1, err := g.Do(ctx, "foo", f)
if err != nil {
return err
}
@@ -26,7 +26,7 @@ func TestNoCancel(t *testing.T) {
return nil
})
eg.Go(func() error {
ret2, err, _ := g.Do(ctx, "foo", f)
ret2, err := g.Do(ctx, "foo", f)
if err != nil {
return err
}
@@ -48,7 +48,7 @@ func TestCancelOne(t *testing.T) {
f := testFunc(100*time.Millisecond, "bar", &counter)
ctx2, cancel := context.WithCancel(ctx)
eg.Go(func() error {
ret1, err, _ := g.Do(ctx2, "foo", f)
ret1, err := g.Do(ctx2, "foo", f)
assert.Error(t, err)
assert.Equal(t, errors.Cause(err), context.Canceled)
if err == nil {
@@ -57,7 +57,7 @@ func TestCancelOne(t *testing.T) {
return nil
})
eg.Go(func() error {
ret2, err, _ := g.Do(ctx, "foo", f)
ret2, err := g.Do(ctx, "foo", f)
if err != nil {
return err
}
@@ -85,11 +85,11 @@ func TestCancelBoth(t *testing.T) {
eg, ctx := errgroup.WithContext(context.Background())
var r1, r2 string
var counter int64
f := testFunc(100*time.Millisecond, "bar", &counter)
f := testFunc(200*time.Millisecond, "bar", &counter)
ctx2, cancel2 := context.WithCancel(ctx)
ctx3, cancel3 := context.WithCancel(ctx)
eg.Go(func() error {
ret1, err, _ := g.Do(ctx2, "foo", f)
ret1, err := g.Do(ctx2, "foo", f)
assert.Error(t, err)
assert.Equal(t, errors.Cause(err), context.Canceled)
if err == nil {
@@ -98,7 +98,7 @@ func TestCancelBoth(t *testing.T) {
return nil
})
eg.Go(func() error {
ret2, err, _ := g.Do(ctx3, "foo", f)
ret2, err := g.Do(ctx3, "foo", f)
assert.Error(t, err)
assert.Equal(t, errors.Cause(err), context.Canceled)
if err == nil {
@@ -130,15 +130,15 @@ func TestCancelBoth(t *testing.T) {
assert.Equal(t, "", r2)
assert.Equal(t, counter, int64(1))
ret1, err, _ := g.Do(context.TODO(), "foo", f)
ret1, err := g.Do(context.TODO(), "foo", f)
assert.NoError(t, err)
assert.Equal(t, ret1, "bar")
f2 := testFunc(100*time.Millisecond, "baz", &counter)
ret1, err, _ = g.Do(context.TODO(), "foo", f2)
ret1, err = g.Do(context.TODO(), "foo", f2)
assert.NoError(t, err)
assert.Equal(t, ret1, "baz")
ret1, err, _ = g.Do(context.TODO(), "abc", f)
ret1, err = g.Do(context.TODO(), "abc", f)
assert.NoError(t, err)
assert.Equal(t, ret1, "bar")

View File

@@ -8,13 +8,13 @@ import (
type MultiReader struct {
mu sync.Mutex
main ProgressReader
main Reader
initialized bool
done chan struct{}
writers map[*progressWriter]func()
}
func NewMultiReader(pr ProgressReader) *MultiReader {
func NewMultiReader(pr Reader) *MultiReader {
mr := &MultiReader{
main: pr,
writers: make(map[*progressWriter]func()),
@@ -23,12 +23,12 @@ func NewMultiReader(pr ProgressReader) *MultiReader {
return mr
}
func (mr *MultiReader) Reader(ctx context.Context) ProgressReader {
func (mr *MultiReader) Reader(ctx context.Context) Reader {
mr.mu.Lock()
defer mr.mu.Unlock()
pr, ctx, closeWriter := NewContext(ctx)
pw, _, _ := FromContext(ctx, "")
pw, _, ctx := FromContext(ctx)
w := pw.(*progressWriter)
mr.writers[w] = closeWriter
@@ -57,8 +57,9 @@ func (mr *MultiReader) handle() error {
if err != nil {
if err == io.EOF {
mr.mu.Lock()
for w := range mr.writers {
w.Done()
for w, c := range mr.writers {
w.Close()
c()
}
mr.mu.Unlock()
return nil
@@ -67,12 +68,8 @@ func (mr *MultiReader) handle() error {
}
mr.mu.Lock()
for _, p := range p {
for w, c := range mr.writers {
if p == nil {
c()
} else {
w.write(*p)
}
for w := range mr.writers {
w.WriteRawProgress(p)
}
}
mr.mu.Unlock()

View File

@@ -10,32 +10,53 @@ import (
"github.com/pkg/errors"
)
// Progress package provides utility functions for using the context to capture
// progress of a running function. All progress items written contain an ID
// that is used to collapse unread messages.
type contextKeyT string
var contextKey = contextKeyT("buildkit/util/progress")
func FromContext(ctx context.Context, name string) (ProgressWriter, bool, context.Context) {
// FromContext returns a progress writer from a context.
func FromContext(ctx context.Context, opts ...WriterOption) (Writer, bool, context.Context) {
pw, ok := ctx.Value(contextKey).(*progressWriter)
if !ok {
return &noOpWriter{}, false, ctx
}
pw = newWriter(pw, name)
pw = newWriter(pw)
for _, o := range opts {
o(pw)
}
ctx = context.WithValue(ctx, contextKey, pw)
return pw, true, ctx
}
func NewContext(ctx context.Context) (ProgressReader, context.Context, func()) {
type WriterOption func(Writer)
// NewContext returns a new context and a progress reader that captures all
// progress items writtern to this context. Last returned parameter is a closer
// function to signal that no new writes will happen to this context.
func NewContext(ctx context.Context) (Reader, context.Context, func()) {
pr, pw, cancel := pipe()
ctx = context.WithValue(ctx, contextKey, pw)
return pr, ctx, cancel
}
type ProgressWriter interface {
Write(interface{}) error
Done() error // Close
func WithMetadata(key string, val interface{}) WriterOption {
return func(w Writer) {
if pw, ok := w.(*progressWriter); ok {
pw.meta[key] = val
}
}
}
type ProgressReader interface {
type Writer interface {
Write(id string, value interface{}) error
Close() error
}
type Reader interface {
Read(context.Context) ([]*Progress, error)
}
@@ -43,10 +64,10 @@ type Progress struct {
ID string
Timestamp time.Time
Sys interface{}
meta map[string]interface{}
}
type Status struct {
// ...progress of an action
Action string
Current int
Total int
@@ -62,32 +83,6 @@ type progressReader struct {
dirty map[string]*Progress
}
// type progressState struct {
// mu sync.Mutex
//
// }
//
// type streamHandle struct {
// pw *progressWriter
// lastP *Progress
// }
//
// func (sh *streamHandle) next() (*Progress, bool) {
// lasti := sh.pw.lastP.Load()
// if lasti != nil {
// last := lasti.(*Progress)
// if last != sh.lastP {
// sh.lastP = last
// return last, true
// }
// }
// return nil, false
// }
//
// func (pr *progressReader) write(id string, p *Progress) {
//
// }
func (pr *progressReader) Read(ctx context.Context) ([]*Progress, error) {
done := make(chan struct{})
defer close(done)
@@ -107,11 +102,14 @@ func (pr *progressReader) Read(ctx context.Context) ([]*Progress, error) {
default:
}
dmap := pr.dirty
open := len(pr.writers) > 0
if len(dmap) == 0 {
if !open {
pr.mu.Unlock()
return nil, io.EOF
select {
case <-pr.ctx.Done():
if len(pr.writers) == 0 {
pr.mu.Unlock()
return nil, io.EOF
}
default:
}
pr.cond.Wait()
continue
@@ -162,48 +160,46 @@ func pipe() (*progressReader, *progressWriter, func()) {
return pr, pw, cancel
}
func newWriter(pw *progressWriter, name string) *progressWriter {
if pw.id != "" {
if name == "" {
name = pw.id
} else {
name = pw.id + "." + name
}
func newWriter(pw *progressWriter) *progressWriter {
meta := make(map[string]interface{})
for k, v := range pw.meta {
meta[k] = v
}
pw = &progressWriter{
id: name,
reader: pw.reader,
meta: meta,
}
pw.reader.append(pw)
return pw
}
type progressWriter struct {
id string
done bool
reader *progressReader
meta map[string]interface{}
}
func (pw *progressWriter) Write(s interface{}) error {
func (pw *progressWriter) Write(id string, v interface{}) error {
if pw.done {
return errors.Errorf("writing to closed progresswriter %s", pw.id)
return errors.Errorf("writing %s to closed progress writer", id)
}
var p Progress
p.ID = pw.id
p.Timestamp = time.Now()
p.Sys = s
return pw.write(p)
return pw.WriteRawProgress(&Progress{
ID: id,
Timestamp: time.Now(),
Sys: v,
meta: pw.meta,
})
}
func (pw *progressWriter) write(p Progress) error {
func (pw *progressWriter) WriteRawProgress(p *Progress) error {
pw.reader.mu.Lock()
pw.reader.dirty[pw.id+"."+p.ID] = &p
pw.reader.dirty[p.ID] = p
pw.reader.mu.Unlock()
pw.reader.cond.Broadcast()
return nil
}
func (pw *progressWriter) Done() error {
func (pw *progressWriter) Close() error {
pw.reader.mu.Lock()
delete(pw.reader.writers, pw)
pw.reader.mu.Unlock()
@@ -212,19 +208,17 @@ func (pw *progressWriter) Done() error {
return nil
}
func (p *Progress) Meta(key string) (interface{}, bool) {
v, ok := p.meta[key]
return v, ok
}
type noOpWriter struct{}
func (pw *noOpWriter) Write(p interface{}) error {
func (pw *noOpWriter) Write(_ string, _ interface{}) error {
return nil
}
func (pw *noOpWriter) Done() error {
func (pw *noOpWriter) Close() error {
return nil
}
// type ProgressRecord struct {
// UUID string
// Parent string
// Done bool
// *Progress
// }

View File

@@ -23,7 +23,10 @@ func TestProgress(t *testing.T) {
eg.Go(func() error {
return saveProgress(ctx, pr, &trace)
})
pw, _, ctx := FromContext(ctx, WithMetadata("tag", "foo"))
s, err = calc(ctx, 5, "calc")
pw.Close()
assert.NoError(t, err)
assert.Equal(t, 15, s)
@@ -33,6 +36,11 @@ func TestProgress(t *testing.T) {
assert.True(t, len(trace.items) > 5)
assert.True(t, len(trace.items) <= 7)
for _, p := range trace.items {
v, ok := p.Meta("tag")
assert.True(t, ok)
assert.Equal(t, v.(string), "foo")
}
}
func TestProgressNested(t *testing.T) {
@@ -56,11 +64,11 @@ func TestProgressNested(t *testing.T) {
}
func calc(ctx context.Context, total int, name string) (int, error) {
pw, _, ctx := FromContext(ctx, name)
defer pw.Done()
pw, _, ctx := FromContext(ctx)
defer pw.Close()
sum := 0
pw.Write(Status{Action: "starting", Total: total})
pw.Write(name, Status{Action: "starting", Total: total})
for i := 1; i <= total; i++ {
select {
case <-ctx.Done():
@@ -68,13 +76,12 @@ func calc(ctx context.Context, total int, name string) (int, error) {
case <-time.After(10 * time.Millisecond):
}
if i == total {
pw.Write(Status{Action: "done", Total: total, Current: total})
pw.Write(name, Status{Action: "done", Total: total, Current: total})
} else {
pw.Write(Status{Action: "calculating", Total: total, Current: i})
pw.Write(name, Status{Action: "calculating", Total: total, Current: i})
}
sum += i
}
pw.Done()
return sum, nil
}
@@ -82,10 +89,10 @@ func calc(ctx context.Context, total int, name string) (int, error) {
func reduceCalc(ctx context.Context, total int) (int, error) {
eg, ctx := errgroup.WithContext(ctx)
pw, _, ctx := FromContext(ctx, "reduce")
defer pw.Done()
pw, _, ctx := FromContext(ctx)
defer pw.Close()
pw.Write(Status{Action: "starting"})
pw.Write("reduce", Status{Action: "starting"})
// sync step
sum, err := calc(ctx, total, "synccalc")
@@ -111,7 +118,7 @@ type trace struct {
items []*Progress
}
func saveProgress(ctx context.Context, pr ProgressReader, t *trace) error {
func saveProgress(ctx context.Context, pr Reader, t *trace) error {
for {
p, err := pr.Read(ctx)
if err != nil {

View File

@@ -133,7 +133,7 @@ func (t *trace) displayInfo(maxRows int) (d displayInfo) {
name: "=> " + s.ID,
}
if s.Total != 0 {
j.name += " " + units.HumanSize(float64(s.Current)) + " / " + units.HumanSize(float64(s.Current))
j.name += " " + units.HumanSize(float64(s.Current)) + " / " + units.HumanSize(float64(s.Total))
}
d.jobs = append(d.jobs, j)
}