Merge pull request #52285 from thaJeztah/stream_funcopts

client/pkg/jsonmessage: use functional options for display funcs
This commit is contained in:
Paweł Gronowski
2026-04-01 18:49:28 +02:00
committed by GitHub
7 changed files with 107 additions and 31 deletions

View File

@@ -20,6 +20,25 @@ var timeNow = time.Now // For overriding in tests.
// ensure the formatted time is always the same number of characters.
const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
// DisplayOpt configures behavior for [DisplayStream] and [DisplayMessages].
// Options are applied in order; if an option returns an error, processing
// stops and the error is returned to the caller.
type DisplayOpt func(*displayOpts) error
type displayOpts struct {
auxCallback func(jsonstream.Message)
}
// WithAuxCallback registers a callback that is invoked for auxiliary
// jsonstream messages as they are processed. The callback is optional;
// if not provided, auxiliary messages are ignored.
func WithAuxCallback(fn func(jsonstream.Message)) DisplayOpt {
return func(opts *displayOpts) error {
opts.auxCallback = fn
return nil
}
}
func RenderTUIProgress(p jsonstream.Progress, width uint16) string {
var (
pbBox string
@@ -143,17 +162,21 @@ type JSONMessagesStream = iter.Seq2[jsonstream.Message, error]
// DisplayJSONMessagesStream is like [DisplayStream], but allows the caller to
// explicitly provide the terminal file descriptor and whether out is a terminal.
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
return displayJSONMessagesStream(in, out, terminalFd, isTerminal, auxCallback)
var opts []DisplayOpt
if auxCallback != nil {
opts = append(opts, WithAuxCallback(auxCallback))
}
return displayJSONMessagesStream(in, out, terminalFd, isTerminal, opts...)
}
// DisplayStream reads a JSON message stream from in, and writes each
// [jsonstream.Message] to out. See [DisplayMessages] for details.
func DisplayStream(in io.Reader, out io.Writer, auxCallback func(jsonstream.Message)) error {
func DisplayStream(in io.Reader, out io.Writer, opts ...DisplayOpt) error {
terminalFd, isTerminal := term.GetFdInfo(out)
return displayJSONMessagesStream(in, out, terminalFd, isTerminal, auxCallback)
return displayJSONMessagesStream(in, out, terminalFd, isTerminal, opts...)
}
func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, opts ...DisplayOpt) error {
dec := json.NewDecoder(in)
f := func(yield func(jsonstream.Message, error) bool) {
for {
@@ -168,13 +191,17 @@ func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr,
}
}
return displayJSONMessages(f, out, terminalFd, isTerminal, auxCallback)
return displayJSONMessages(f, out, terminalFd, isTerminal, opts...)
}
// DisplayJSONMessages is like [DisplayMessages], but allows the caller to
// explicitly provide the terminal file descriptor and whether out is a terminal.
func DisplayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
return displayJSONMessages(messages, out, terminalFd, isTerminal, auxCallback)
var opts []DisplayOpt
if auxCallback != nil {
opts = append(opts, WithAuxCallback(auxCallback))
}
return displayJSONMessages(messages, out, terminalFd, isTerminal, opts...)
}
// DisplayMessages writes each [jsonstream.Message] from stream to out.
@@ -189,12 +216,23 @@ func DisplayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.W
// auxCallback allows handling the [jsonstream.Message.Aux] field. It is called
// if a message contains an Aux field, in which case DisplayMessages does not
// present the message.
func DisplayMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, auxCallback func(jsonstream.Message)) error {
func DisplayMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, opts ...DisplayOpt) error {
terminalFd, isTerminal := term.GetFdInfo(out)
return displayJSONMessages(messages, out, terminalFd, isTerminal, auxCallback)
return displayJSONMessages(messages, out, terminalFd, isTerminal, opts...)
}
func displayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
func displayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, opts ...DisplayOpt) error {
var cfg displayOpts
for _, opt := range opts {
if opt == nil {
continue
}
if err := opt(&cfg); err != nil {
return err
}
}
auxCallback := cfg.auxCallback
ids := make(map[string]uint)
var width uint16 = 200
if isTerminal {

View File

@@ -189,7 +189,7 @@ func TestDisplayStreamInvalidJSON(t *testing.T) {
data := bytes.NewBuffer([]byte{})
reader := strings.NewReader("This is not a 'valid' JSON []")
exp := "invalid character "
if err := DisplayStream(reader, data, nil); err == nil || !strings.HasPrefix(err.Error(), exp) {
if err := DisplayStream(reader, data); err == nil || !strings.HasPrefix(err.Error(), exp) {
t.Fatalf("Expected error (%s...), got %q", exp, err)
}
}
@@ -227,7 +227,7 @@ func TestDisplayJSONMessagesStream(t *testing.T) {
reader := strings.NewReader(jsonMessage)
// Without terminal
if err := DisplayStream(reader, data, nil); err != nil {
if err := DisplayStream(reader, data); err != nil {
t.Fatal(err)
}
if data.String() != expectedMessages[0] {

View File

@@ -70,7 +70,7 @@ func TestBuildUserNamespaceValidateCapabilitiesAreV2(t *testing.T) {
defer resp.Body.Close()
buf := bytes.NewBuffer(nil)
err = jsonmessage.DisplayStream(resp.Body, buf, nil)
err = jsonmessage.DisplayStream(resp.Body, buf)
assert.NilError(t, err)
reader, err := clientUserRemap.ImageSave(ctx, []string{imageTag})
@@ -107,7 +107,7 @@ func TestBuildUserNamespaceValidateCapabilitiesAreV2(t *testing.T) {
assert.NilError(t, err, "failed to load image tar file")
defer loadResp.Close()
var buf2 bytes.Buffer
err = jsonmessage.DisplayStream(loadResp, &buf2, nil)
err = jsonmessage.DisplayStream(loadResp, &buf2)
assert.NilError(t, err)
cid := container.Run(ctx, t, clientNoUserRemap,

View File

@@ -215,11 +215,11 @@ func makeTestImage(ctx context.Context, t *testing.T) (imageID string) {
assert.NilError(t, err)
defer resp.Body.Close()
err = jsonmessage.DisplayStream(resp.Body, io.Discard, func(msg jsonstream.Message) {
err = jsonmessage.DisplayStream(resp.Body, io.Discard, jsonmessage.WithAuxCallback(func(msg jsonstream.Message) {
var r build.Result
assert.NilError(t, json.Unmarshal(*msg.Aux, &r))
imageID = r.ID
})
}))
assert.NilError(t, err)
assert.Assert(t, imageID != "")
return imageID
@@ -290,11 +290,11 @@ func TestCopyFromContainer(t *testing.T) {
defer resp.Body.Close()
var imageID string
err = jsonmessage.DisplayStream(resp.Body, io.Discard, func(msg jsonstream.Message) {
err = jsonmessage.DisplayStream(resp.Body, io.Discard, jsonmessage.WithAuxCallback(func(msg jsonstream.Message) {
var r build.Result
assert.NilError(t, json.Unmarshal(*msg.Aux, &r))
imageID = r.ID
})
}))
assert.NilError(t, err)
assert.Assert(t, imageID != "")

View File

@@ -152,13 +152,13 @@ func TestPluginInstall(t *testing.T) {
Size int
}
var buf strings.Builder
assert.NilError(t, jsonmessage.DisplayStream(res, &buf, func(j jsonstream.Message) {
assert.NilError(t, jsonmessage.DisplayStream(res, &buf, jsonmessage.WithAuxCallback(func(j jsonstream.Message) {
if j.Aux != nil {
var r pushResult
assert.NilError(t, json.Unmarshal(*j.Aux, &r))
digest = r.Digest
}
}), buf)
})), buf)
_, err = apiclient.PluginRemove(ctx, repo, client.PluginRemoveOptions{Force: true})
assert.NilError(t, err)
@@ -349,7 +349,7 @@ func TestPluginBackCompatMediaTypes(t *testing.T) {
defer res.Close()
var buf strings.Builder
assert.NilError(t, jsonmessage.DisplayStream(res, &buf, nil), buf)
assert.NilError(t, jsonmessage.DisplayStream(res, &buf), buf)
// Use custom header here because older versions of the registry do not
// parse the accept header correctly and does not like the accept header

View File

@@ -114,7 +114,7 @@ func loadFrozenImages(ctx context.Context, apiClient client.APIClient) error {
return errors.Wrap(err, "failed to load frozen images")
}
defer resp.Close()
return jsonmessage.DisplayStream(resp, os.Stdout, nil)
return jsonmessage.DisplayStream(resp, os.Stdout)
}
func pullImages(ctx context.Context, client client.APIClient, images []string) error {
@@ -164,7 +164,7 @@ func pullTagAndRemove(ctx context.Context, apiClient client.APIClient, ref strin
return errors.Wrapf(err, "failed to pull %s", ref)
}
defer resp.Close()
if err := jsonmessage.DisplayStream(resp, os.Stdout, nil); err != nil {
if err := jsonmessage.DisplayStream(resp, os.Stdout); err != nil {
return err
}

View File

@@ -20,6 +20,25 @@ var timeNow = time.Now // For overriding in tests.
// ensure the formatted time is always the same number of characters.
const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
// DisplayOpt configures behavior for [DisplayStream] and [DisplayMessages].
// Options are applied in order; if an option returns an error, processing
// stops and the error is returned to the caller.
type DisplayOpt func(*displayOpts) error
type displayOpts struct {
auxCallback func(jsonstream.Message)
}
// WithAuxCallback registers a callback that is invoked for auxiliary
// jsonstream messages as they are processed. The callback is optional;
// if not provided, auxiliary messages are ignored.
func WithAuxCallback(fn func(jsonstream.Message)) DisplayOpt {
return func(opts *displayOpts) error {
opts.auxCallback = fn
return nil
}
}
func RenderTUIProgress(p jsonstream.Progress, width uint16) string {
var (
pbBox string
@@ -143,17 +162,21 @@ type JSONMessagesStream = iter.Seq2[jsonstream.Message, error]
// DisplayJSONMessagesStream is like [DisplayStream], but allows the caller to
// explicitly provide the terminal file descriptor and whether out is a terminal.
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
return displayJSONMessagesStream(in, out, terminalFd, isTerminal, auxCallback)
var opts []DisplayOpt
if auxCallback != nil {
opts = append(opts, WithAuxCallback(auxCallback))
}
return displayJSONMessagesStream(in, out, terminalFd, isTerminal, opts...)
}
// DisplayStream reads a JSON message stream from in, and writes each
// [jsonstream.Message] to out. See [DisplayMessages] for details.
func DisplayStream(in io.Reader, out io.Writer, auxCallback func(jsonstream.Message)) error {
func DisplayStream(in io.Reader, out io.Writer, opts ...DisplayOpt) error {
terminalFd, isTerminal := term.GetFdInfo(out)
return displayJSONMessagesStream(in, out, terminalFd, isTerminal, auxCallback)
return displayJSONMessagesStream(in, out, terminalFd, isTerminal, opts...)
}
func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, opts ...DisplayOpt) error {
dec := json.NewDecoder(in)
f := func(yield func(jsonstream.Message, error) bool) {
for {
@@ -168,13 +191,17 @@ func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr,
}
}
return displayJSONMessages(f, out, terminalFd, isTerminal, auxCallback)
return displayJSONMessages(f, out, terminalFd, isTerminal, opts...)
}
// DisplayJSONMessages is like [DisplayMessages], but allows the caller to
// explicitly provide the terminal file descriptor and whether out is a terminal.
func DisplayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
return displayJSONMessages(messages, out, terminalFd, isTerminal, auxCallback)
var opts []DisplayOpt
if auxCallback != nil {
opts = append(opts, WithAuxCallback(auxCallback))
}
return displayJSONMessages(messages, out, terminalFd, isTerminal, opts...)
}
// DisplayMessages writes each [jsonstream.Message] from stream to out.
@@ -189,12 +216,23 @@ func DisplayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.W
// auxCallback allows handling the [jsonstream.Message.Aux] field. It is called
// if a message contains an Aux field, in which case DisplayMessages does not
// present the message.
func DisplayMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, auxCallback func(jsonstream.Message)) error {
func DisplayMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, opts ...DisplayOpt) error {
terminalFd, isTerminal := term.GetFdInfo(out)
return displayJSONMessages(messages, out, terminalFd, isTerminal, auxCallback)
return displayJSONMessages(messages, out, terminalFd, isTerminal, opts...)
}
func displayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
func displayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, opts ...DisplayOpt) error {
var cfg displayOpts
for _, opt := range opts {
if opt == nil {
continue
}
if err := opt(&cfg); err != nil {
return err
}
}
auxCallback := cfg.auxCallback
ids := make(map[string]uint)
var width uint16 = 200
if isTerminal {