From 9aa5ac30343703daa37b92aeeb21bda92b31cb19 Mon Sep 17 00:00:00 2001 From: Sebastiaan van Stijn Date: Wed, 1 Apr 2026 17:27:18 +0200 Subject: [PATCH] client/pkg/jsonmessage: use functional options for display funcs Signed-off-by: Sebastiaan van Stijn --- client/pkg/jsonmessage/jsonmessage.go | 56 ++++++++++++++++--- client/pkg/jsonmessage/jsonmessage_test.go | 4 +- integration/build/build_userns_linux_test.go | 4 +- integration/container/copy_test.go | 8 +-- integration/plugin/common/plugin_test.go | 6 +- internal/testutil/fixtures/load/frozen.go | 4 +- .../client/pkg/jsonmessage/jsonmessage.go | 56 ++++++++++++++++--- 7 files changed, 107 insertions(+), 31 deletions(-) diff --git a/client/pkg/jsonmessage/jsonmessage.go b/client/pkg/jsonmessage/jsonmessage.go index 727cdba516..e3ac43afd4 100644 --- a/client/pkg/jsonmessage/jsonmessage.go +++ b/client/pkg/jsonmessage/jsonmessage.go @@ -20,6 +20,25 @@ var timeNow = time.Now // For overriding in tests. // ensure the formatted time is always the same number of characters. const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" +// DisplayOpt configures behavior for [DisplayStream] and [DisplayMessages]. +// Options are applied in order; if an option returns an error, processing +// stops and the error is returned to the caller. +type DisplayOpt func(*displayOpts) error + +type displayOpts struct { + auxCallback func(jsonstream.Message) +} + +// WithAuxCallback registers a callback that is invoked for auxiliary +// jsonstream messages as they are processed. The callback is optional; +// if not provided, auxiliary messages are ignored. +func WithAuxCallback(fn func(jsonstream.Message)) DisplayOpt { + return func(opts *displayOpts) error { + opts.auxCallback = fn + return nil + } +} + func RenderTUIProgress(p jsonstream.Progress, width uint16) string { var ( pbBox string @@ -143,17 +162,21 @@ type JSONMessagesStream = iter.Seq2[jsonstream.Message, error] // DisplayJSONMessagesStream is like [DisplayStream], but allows the caller to // explicitly provide the terminal file descriptor and whether out is a terminal. func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error { - return displayJSONMessagesStream(in, out, terminalFd, isTerminal, auxCallback) + var opts []DisplayOpt + if auxCallback != nil { + opts = append(opts, WithAuxCallback(auxCallback)) + } + return displayJSONMessagesStream(in, out, terminalFd, isTerminal, opts...) } // DisplayStream reads a JSON message stream from in, and writes each // [jsonstream.Message] to out. See [DisplayMessages] for details. -func DisplayStream(in io.Reader, out io.Writer, auxCallback func(jsonstream.Message)) error { +func DisplayStream(in io.Reader, out io.Writer, opts ...DisplayOpt) error { terminalFd, isTerminal := term.GetFdInfo(out) - return displayJSONMessagesStream(in, out, terminalFd, isTerminal, auxCallback) + return displayJSONMessagesStream(in, out, terminalFd, isTerminal, opts...) } -func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error { +func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, opts ...DisplayOpt) error { dec := json.NewDecoder(in) f := func(yield func(jsonstream.Message, error) bool) { for { @@ -168,13 +191,17 @@ func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, } } - return displayJSONMessages(f, out, terminalFd, isTerminal, auxCallback) + return displayJSONMessages(f, out, terminalFd, isTerminal, opts...) } // DisplayJSONMessages is like [DisplayMessages], but allows the caller to // explicitly provide the terminal file descriptor and whether out is a terminal. func DisplayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error { - return displayJSONMessages(messages, out, terminalFd, isTerminal, auxCallback) + var opts []DisplayOpt + if auxCallback != nil { + opts = append(opts, WithAuxCallback(auxCallback)) + } + return displayJSONMessages(messages, out, terminalFd, isTerminal, opts...) } // DisplayMessages writes each [jsonstream.Message] from stream to out. @@ -189,12 +216,23 @@ func DisplayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.W // auxCallback allows handling the [jsonstream.Message.Aux] field. It is called // if a message contains an Aux field, in which case DisplayMessages does not // present the message. -func DisplayMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, auxCallback func(jsonstream.Message)) error { +func DisplayMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, opts ...DisplayOpt) error { terminalFd, isTerminal := term.GetFdInfo(out) - return displayJSONMessages(messages, out, terminalFd, isTerminal, auxCallback) + return displayJSONMessages(messages, out, terminalFd, isTerminal, opts...) } -func displayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error { +func displayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, opts ...DisplayOpt) error { + var cfg displayOpts + for _, opt := range opts { + if opt == nil { + continue + } + if err := opt(&cfg); err != nil { + return err + } + } + auxCallback := cfg.auxCallback + ids := make(map[string]uint) var width uint16 = 200 if isTerminal { diff --git a/client/pkg/jsonmessage/jsonmessage_test.go b/client/pkg/jsonmessage/jsonmessage_test.go index 30383865ae..5d0a560370 100644 --- a/client/pkg/jsonmessage/jsonmessage_test.go +++ b/client/pkg/jsonmessage/jsonmessage_test.go @@ -189,7 +189,7 @@ func TestDisplayStreamInvalidJSON(t *testing.T) { data := bytes.NewBuffer([]byte{}) reader := strings.NewReader("This is not a 'valid' JSON []") exp := "invalid character " - if err := DisplayStream(reader, data, nil); err == nil || !strings.HasPrefix(err.Error(), exp) { + if err := DisplayStream(reader, data); err == nil || !strings.HasPrefix(err.Error(), exp) { t.Fatalf("Expected error (%s...), got %q", exp, err) } } @@ -227,7 +227,7 @@ func TestDisplayJSONMessagesStream(t *testing.T) { reader := strings.NewReader(jsonMessage) // Without terminal - if err := DisplayStream(reader, data, nil); err != nil { + if err := DisplayStream(reader, data); err != nil { t.Fatal(err) } if data.String() != expectedMessages[0] { diff --git a/integration/build/build_userns_linux_test.go b/integration/build/build_userns_linux_test.go index b757818238..7472e309eb 100644 --- a/integration/build/build_userns_linux_test.go +++ b/integration/build/build_userns_linux_test.go @@ -70,7 +70,7 @@ func TestBuildUserNamespaceValidateCapabilitiesAreV2(t *testing.T) { defer resp.Body.Close() buf := bytes.NewBuffer(nil) - err = jsonmessage.DisplayStream(resp.Body, buf, nil) + err = jsonmessage.DisplayStream(resp.Body, buf) assert.NilError(t, err) reader, err := clientUserRemap.ImageSave(ctx, []string{imageTag}) @@ -107,7 +107,7 @@ func TestBuildUserNamespaceValidateCapabilitiesAreV2(t *testing.T) { assert.NilError(t, err, "failed to load image tar file") defer loadResp.Close() var buf2 bytes.Buffer - err = jsonmessage.DisplayStream(loadResp, &buf2, nil) + err = jsonmessage.DisplayStream(loadResp, &buf2) assert.NilError(t, err) cid := container.Run(ctx, t, clientNoUserRemap, diff --git a/integration/container/copy_test.go b/integration/container/copy_test.go index 06993821a4..6183ce02e0 100644 --- a/integration/container/copy_test.go +++ b/integration/container/copy_test.go @@ -215,11 +215,11 @@ func makeTestImage(ctx context.Context, t *testing.T) (imageID string) { assert.NilError(t, err) defer resp.Body.Close() - err = jsonmessage.DisplayStream(resp.Body, io.Discard, func(msg jsonstream.Message) { + err = jsonmessage.DisplayStream(resp.Body, io.Discard, jsonmessage.WithAuxCallback(func(msg jsonstream.Message) { var r build.Result assert.NilError(t, json.Unmarshal(*msg.Aux, &r)) imageID = r.ID - }) + })) assert.NilError(t, err) assert.Assert(t, imageID != "") return imageID @@ -290,11 +290,11 @@ func TestCopyFromContainer(t *testing.T) { defer resp.Body.Close() var imageID string - err = jsonmessage.DisplayStream(resp.Body, io.Discard, func(msg jsonstream.Message) { + err = jsonmessage.DisplayStream(resp.Body, io.Discard, jsonmessage.WithAuxCallback(func(msg jsonstream.Message) { var r build.Result assert.NilError(t, json.Unmarshal(*msg.Aux, &r)) imageID = r.ID - }) + })) assert.NilError(t, err) assert.Assert(t, imageID != "") diff --git a/integration/plugin/common/plugin_test.go b/integration/plugin/common/plugin_test.go index c062330ed7..869104a166 100644 --- a/integration/plugin/common/plugin_test.go +++ b/integration/plugin/common/plugin_test.go @@ -152,13 +152,13 @@ func TestPluginInstall(t *testing.T) { Size int } var buf strings.Builder - assert.NilError(t, jsonmessage.DisplayStream(res, &buf, func(j jsonstream.Message) { + assert.NilError(t, jsonmessage.DisplayStream(res, &buf, jsonmessage.WithAuxCallback(func(j jsonstream.Message) { if j.Aux != nil { var r pushResult assert.NilError(t, json.Unmarshal(*j.Aux, &r)) digest = r.Digest } - }), buf) + })), buf) _, err = apiclient.PluginRemove(ctx, repo, client.PluginRemoveOptions{Force: true}) assert.NilError(t, err) @@ -349,7 +349,7 @@ func TestPluginBackCompatMediaTypes(t *testing.T) { defer res.Close() var buf strings.Builder - assert.NilError(t, jsonmessage.DisplayStream(res, &buf, nil), buf) + assert.NilError(t, jsonmessage.DisplayStream(res, &buf), buf) // Use custom header here because older versions of the registry do not // parse the accept header correctly and does not like the accept header diff --git a/internal/testutil/fixtures/load/frozen.go b/internal/testutil/fixtures/load/frozen.go index e8ea207495..e3c70ea841 100644 --- a/internal/testutil/fixtures/load/frozen.go +++ b/internal/testutil/fixtures/load/frozen.go @@ -114,7 +114,7 @@ func loadFrozenImages(ctx context.Context, apiClient client.APIClient) error { return errors.Wrap(err, "failed to load frozen images") } defer resp.Close() - return jsonmessage.DisplayStream(resp, os.Stdout, nil) + return jsonmessage.DisplayStream(resp, os.Stdout) } func pullImages(ctx context.Context, client client.APIClient, images []string) error { @@ -164,7 +164,7 @@ func pullTagAndRemove(ctx context.Context, apiClient client.APIClient, ref strin return errors.Wrapf(err, "failed to pull %s", ref) } defer resp.Close() - if err := jsonmessage.DisplayStream(resp, os.Stdout, nil); err != nil { + if err := jsonmessage.DisplayStream(resp, os.Stdout); err != nil { return err } diff --git a/vendor/github.com/moby/moby/client/pkg/jsonmessage/jsonmessage.go b/vendor/github.com/moby/moby/client/pkg/jsonmessage/jsonmessage.go index 727cdba516..e3ac43afd4 100644 --- a/vendor/github.com/moby/moby/client/pkg/jsonmessage/jsonmessage.go +++ b/vendor/github.com/moby/moby/client/pkg/jsonmessage/jsonmessage.go @@ -20,6 +20,25 @@ var timeNow = time.Now // For overriding in tests. // ensure the formatted time is always the same number of characters. const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" +// DisplayOpt configures behavior for [DisplayStream] and [DisplayMessages]. +// Options are applied in order; if an option returns an error, processing +// stops and the error is returned to the caller. +type DisplayOpt func(*displayOpts) error + +type displayOpts struct { + auxCallback func(jsonstream.Message) +} + +// WithAuxCallback registers a callback that is invoked for auxiliary +// jsonstream messages as they are processed. The callback is optional; +// if not provided, auxiliary messages are ignored. +func WithAuxCallback(fn func(jsonstream.Message)) DisplayOpt { + return func(opts *displayOpts) error { + opts.auxCallback = fn + return nil + } +} + func RenderTUIProgress(p jsonstream.Progress, width uint16) string { var ( pbBox string @@ -143,17 +162,21 @@ type JSONMessagesStream = iter.Seq2[jsonstream.Message, error] // DisplayJSONMessagesStream is like [DisplayStream], but allows the caller to // explicitly provide the terminal file descriptor and whether out is a terminal. func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error { - return displayJSONMessagesStream(in, out, terminalFd, isTerminal, auxCallback) + var opts []DisplayOpt + if auxCallback != nil { + opts = append(opts, WithAuxCallback(auxCallback)) + } + return displayJSONMessagesStream(in, out, terminalFd, isTerminal, opts...) } // DisplayStream reads a JSON message stream from in, and writes each // [jsonstream.Message] to out. See [DisplayMessages] for details. -func DisplayStream(in io.Reader, out io.Writer, auxCallback func(jsonstream.Message)) error { +func DisplayStream(in io.Reader, out io.Writer, opts ...DisplayOpt) error { terminalFd, isTerminal := term.GetFdInfo(out) - return displayJSONMessagesStream(in, out, terminalFd, isTerminal, auxCallback) + return displayJSONMessagesStream(in, out, terminalFd, isTerminal, opts...) } -func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error { +func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, opts ...DisplayOpt) error { dec := json.NewDecoder(in) f := func(yield func(jsonstream.Message, error) bool) { for { @@ -168,13 +191,17 @@ func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, } } - return displayJSONMessages(f, out, terminalFd, isTerminal, auxCallback) + return displayJSONMessages(f, out, terminalFd, isTerminal, opts...) } // DisplayJSONMessages is like [DisplayMessages], but allows the caller to // explicitly provide the terminal file descriptor and whether out is a terminal. func DisplayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error { - return displayJSONMessages(messages, out, terminalFd, isTerminal, auxCallback) + var opts []DisplayOpt + if auxCallback != nil { + opts = append(opts, WithAuxCallback(auxCallback)) + } + return displayJSONMessages(messages, out, terminalFd, isTerminal, opts...) } // DisplayMessages writes each [jsonstream.Message] from stream to out. @@ -189,12 +216,23 @@ func DisplayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.W // auxCallback allows handling the [jsonstream.Message.Aux] field. It is called // if a message contains an Aux field, in which case DisplayMessages does not // present the message. -func DisplayMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, auxCallback func(jsonstream.Message)) error { +func DisplayMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, opts ...DisplayOpt) error { terminalFd, isTerminal := term.GetFdInfo(out) - return displayJSONMessages(messages, out, terminalFd, isTerminal, auxCallback) + return displayJSONMessages(messages, out, terminalFd, isTerminal, opts...) } -func displayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error { +func displayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, opts ...DisplayOpt) error { + var cfg displayOpts + for _, opt := range opts { + if opt == nil { + continue + } + if err := opt(&cfg); err != nil { + return err + } + } + auxCallback := cfg.auxCallback + ids := make(map[string]uint) var width uint16 = 200 if isTerminal {