mirror of
https://github.com/moby/buildkit.git
synced 2026-06-30 19:57:39 +00:00
Merge pull request #6810 from tonistiigi/containerd-exec-test-fix
test: stabilize worker exec pid1 lifetime
This commit is contained in:
@@ -225,6 +225,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M
|
||||
}
|
||||
}()
|
||||
|
||||
stdinDone := trackStdinEOF(&process)
|
||||
fixProcessOutput(&process)
|
||||
cioOpts := []cio.Opt{cio.WithStreams(process.Stdin, process.Stdout, process.Stderr)}
|
||||
if meta.Tty {
|
||||
@@ -256,7 +257,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M
|
||||
}
|
||||
|
||||
trace.SpanFromContext(ctx).AddEvent("Container created")
|
||||
err = w.runProcess(ctx, task, process.Resize, process.Signal, process.Meta.ValidExitCodes, func() {
|
||||
err = w.runProcess(ctx, task, process.Resize, process.Signal, stdinDone, process.Meta.ValidExitCodes, func() {
|
||||
startedOnce.Do(func() {
|
||||
trace.SpanFromContext(ctx).AddEvent("Container started")
|
||||
if started != nil {
|
||||
@@ -336,6 +337,7 @@ func (w *containerdExecutor) Exec(ctx context.Context, id string, process execut
|
||||
spec.Process.Env = process.Meta.Env
|
||||
}
|
||||
|
||||
stdinDone := trackStdinEOF(&process)
|
||||
fixProcessOutput(&process)
|
||||
cioOpts := []cio.Opt{cio.WithStreams(process.Stdin, process.Stdout, process.Stderr)}
|
||||
if meta.Tty {
|
||||
@@ -347,10 +349,53 @@ func (w *containerdExecutor) Exec(ctx context.Context, id string, process execut
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
err = w.runProcess(ctx, taskProcess, process.Resize, process.Signal, process.Meta.ValidExitCodes, nil)
|
||||
err = w.runProcess(ctx, taskProcess, process.Resize, process.Signal, stdinDone, process.Meta.ValidExitCodes, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
type stdinEOFTracker struct {
|
||||
io.ReadCloser
|
||||
once sync.Once
|
||||
done chan struct{}
|
||||
err error
|
||||
}
|
||||
|
||||
func trackStdinEOF(process *executor.ProcessInfo) <-chan struct{} {
|
||||
if process.Stdin == nil {
|
||||
return nil
|
||||
}
|
||||
tracker := &stdinEOFTracker{
|
||||
ReadCloser: process.Stdin,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
process.Stdin = tracker
|
||||
return tracker.done
|
||||
}
|
||||
|
||||
func (r *stdinEOFTracker) Read(p []byte) (int, error) {
|
||||
if r.err != nil {
|
||||
err := r.err
|
||||
r.err = nil
|
||||
r.close()
|
||||
return 0, err
|
||||
}
|
||||
n, err := r.ReadCloser.Read(p)
|
||||
if err != nil {
|
||||
if n > 0 {
|
||||
r.err = err
|
||||
return n, nil
|
||||
}
|
||||
r.close()
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (r *stdinEOFTracker) close() {
|
||||
r.once.Do(func() {
|
||||
close(r.done)
|
||||
})
|
||||
}
|
||||
|
||||
func fixProcessOutput(process *executor.ProcessInfo) {
|
||||
// It seems like if containerd has one of stdin, stdout or stderr then the
|
||||
// others need to be present as well otherwise we get this error:
|
||||
@@ -364,7 +409,7 @@ func fixProcessOutput(process *executor.ProcessInfo) {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *containerdExecutor) runProcess(ctx context.Context, p ctd.Process, resize <-chan executor.WinSize, signal <-chan syscall.Signal, validExitCodes []int, started func()) error {
|
||||
func (w *containerdExecutor) runProcess(ctx context.Context, p ctd.Process, resize <-chan executor.WinSize, signal <-chan syscall.Signal, stdinDone <-chan struct{}, validExitCodes []int, started func()) error {
|
||||
// Not using `ctx` here because the context passed only affects the statusCh which we
|
||||
// don't want cancelled when ctx.Done is sent. We want to process statusCh on cancel.
|
||||
statusCh, err := p.Wait(context.Background())
|
||||
@@ -387,12 +432,25 @@ func (w *containerdExecutor) runProcess(ctx context.Context, p ctd.Process, resi
|
||||
started()
|
||||
}
|
||||
|
||||
p.CloseIO(ctx, ctd.WithStdinCloser)
|
||||
|
||||
// handle signals (and resize) in separate go loop so it does not
|
||||
// potentially block the container cancel/exit status loop below.
|
||||
eventCtx, eventCancel := context.WithCancelCause(ctx)
|
||||
defer eventCancel(errors.WithStack(context.Canceled))
|
||||
if stdinDone == nil {
|
||||
p.CloseIO(ctx, ctd.WithStdinCloser)
|
||||
} else {
|
||||
go func() {
|
||||
select {
|
||||
case <-eventCtx.Done():
|
||||
case <-stdinDone:
|
||||
if err := p.CloseIO(eventCtx, ctd.WithStdinCloser); err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(context.Cause(eventCtx), context.Canceled) {
|
||||
bklog.G(eventCtx).Warnf("Failed to close stdin for %s: %s", p.ID(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
|
||||
Reference in New Issue
Block a user