mirror of
https://github.com/moby/buildkit.git
synced 2026-06-30 19:57:39 +00:00
Allow signals to be sent to gateway exec containers
Signed-off-by: Cory Bennett <cbennett@netflix.com>
This commit is contained in:
@@ -315,14 +315,14 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
|
||||
}()
|
||||
|
||||
bklog.G(ctx).Debugf("> creating %s %v", id, meta.Args)
|
||||
// this is a cheat, we have not actually started, but as close as we can get with runc for now
|
||||
if started != nil {
|
||||
startedOnce.Do(func() {
|
||||
close(started)
|
||||
})
|
||||
}
|
||||
|
||||
err = w.run(runCtx, id, bundle, process)
|
||||
err = w.run(runCtx, id, bundle, process, func() {
|
||||
startedOnce.Do(func() {
|
||||
if started != nil {
|
||||
close(started)
|
||||
}
|
||||
})
|
||||
})
|
||||
close(ended)
|
||||
return exitError(ctx, err)
|
||||
}
|
||||
@@ -414,7 +414,7 @@ func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.Pro
|
||||
spec.Process.Env = process.Meta.Env
|
||||
}
|
||||
|
||||
err = w.exec(ctx, id, state.Bundle, spec.Process, process)
|
||||
err = w.exec(ctx, id, state.Bundle, spec.Process, process, nil)
|
||||
return exitError(ctx, err)
|
||||
}
|
||||
|
||||
@@ -444,3 +444,78 @@ func (s *forwardIO) Stdout() io.ReadCloser {
|
||||
func (s *forwardIO) Stderr() io.ReadCloser {
|
||||
return nil
|
||||
}
|
||||
|
||||
// startingProcess is to track the os process so we can send signals to it.
|
||||
type startingProcess struct {
|
||||
Process *os.Process
|
||||
ready chan struct{}
|
||||
}
|
||||
|
||||
// Release will free resources with a startingProcess.
|
||||
func (p *startingProcess) Release() {
|
||||
if p.Process != nil {
|
||||
p.Process.Release()
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForReady will wait until the Process has been populated or the
|
||||
// provided context was cancelled. This should be called before using
|
||||
// the Process field.
|
||||
func (p *startingProcess) WaitForReady(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-p.ready:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForStart will record the pid reported by Runc via the channel.
|
||||
// We wait for up to 10s for the runc process to start. If the started
|
||||
// callback is non-nil it will be called after receiving the pid.
|
||||
func (p *startingProcess) WaitForStart(ctx context.Context, startedCh <-chan int, started func()) error {
|
||||
startedCtx, timeout := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer timeout()
|
||||
var err error
|
||||
select {
|
||||
case <-startedCtx.Done():
|
||||
return errors.New("runc started message never received")
|
||||
case pid, ok := <-startedCh:
|
||||
if !ok {
|
||||
return errors.New("runc process failed to send pid")
|
||||
}
|
||||
if started != nil {
|
||||
started()
|
||||
}
|
||||
p.Process, err = os.FindProcess(pid)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to find runc process for pid %d", pid)
|
||||
}
|
||||
close(p.ready)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleSignals will wait until the runcProcess is ready then will
|
||||
// send each signal received on the channel to the process.
|
||||
func handleSignals(ctx context.Context, runcProcess *startingProcess, signals <-chan syscall.Signal) error {
|
||||
if signals == nil {
|
||||
return nil
|
||||
}
|
||||
err := runcProcess.WaitForReady(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case sig := <-signals:
|
||||
err := runcProcess.Process.Signal(sig)
|
||||
if err != nil {
|
||||
bklog.G(ctx).Errorf("failed to signal %s to process: %s", sig, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,28 +10,63 @@ import (
|
||||
"github.com/moby/buildkit/executor"
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var unsupportedConsoleError = errors.New("tty for runc is only supported on linux")
|
||||
|
||||
func updateRuncFieldsForHostOS(runtime *runc.Runc) {}
|
||||
|
||||
func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo) error {
|
||||
func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func()) error {
|
||||
if process.Meta.Tty {
|
||||
return unsupportedConsoleError
|
||||
}
|
||||
_, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
|
||||
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
|
||||
NoPivot: w.noPivot,
|
||||
return w.commonCall(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {
|
||||
_, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
|
||||
NoPivot: w.noPivot,
|
||||
Started: started,
|
||||
IO: io,
|
||||
})
|
||||
return err
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo) error {
|
||||
func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo, started func()) error {
|
||||
if process.Meta.Tty {
|
||||
return unsupportedConsoleError
|
||||
}
|
||||
return w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{
|
||||
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
|
||||
return w.commonCall(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {
|
||||
return w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{
|
||||
Started: started,
|
||||
IO: io,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error
|
||||
|
||||
// commonCall is the common run/exec logic used for non-linux runtimes. A tty
|
||||
// is only supported for linux, so this really just handles signal propagation
|
||||
// to the started runc process.
|
||||
func (w *runcExecutor) commonCall(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error {
|
||||
runcProcess := &startingProcess{
|
||||
ready: make(chan struct{}),
|
||||
}
|
||||
defer runcProcess.Release()
|
||||
|
||||
var eg errgroup.Group
|
||||
egCtx, cancel := context.WithCancel(ctx)
|
||||
defer eg.Wait()
|
||||
defer cancel()
|
||||
|
||||
startedCh := make(chan int, 1)
|
||||
eg.Go(func() error {
|
||||
return runcProcess.WaitForStart(egCtx, startedCh, started)
|
||||
})
|
||||
|
||||
eg.Go(func() error {
|
||||
return handleSignals(egCtx, runcProcess, process.Signal)
|
||||
})
|
||||
|
||||
return call(ctx, startedCh, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr})
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/console"
|
||||
runc "github.com/containerd/go-runc"
|
||||
@@ -22,8 +21,8 @@ func updateRuncFieldsForHostOS(runtime *runc.Runc) {
|
||||
runtime.PdeathSignal = syscall.SIGKILL // this can still leak the process
|
||||
}
|
||||
|
||||
func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo) error {
|
||||
return w.callWithIO(ctx, id, bundle, process, func(ctx context.Context, started chan<- int, io runc.IO) error {
|
||||
func (w *runcExecutor) run(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func()) error {
|
||||
return w.callWithIO(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {
|
||||
_, err := w.runc.Run(ctx, id, bundle, &runc.CreateOpts{
|
||||
NoPivot: w.noPivot,
|
||||
Started: started,
|
||||
@@ -33,8 +32,8 @@ func (w *runcExecutor) run(ctx context.Context, id, bundle string, process execu
|
||||
})
|
||||
}
|
||||
|
||||
func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo) error {
|
||||
return w.callWithIO(ctx, id, bundle, process, func(ctx context.Context, started chan<- int, io runc.IO) error {
|
||||
func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess *specs.Process, process executor.ProcessInfo, started func()) error {
|
||||
return w.callWithIO(ctx, id, bundle, process, started, func(ctx context.Context, started chan<- int, io runc.IO) error {
|
||||
return w.runc.Exec(ctx, id, *specsProcess, &runc.ExecOpts{
|
||||
Started: started,
|
||||
IO: io,
|
||||
@@ -44,12 +43,28 @@ func (w *runcExecutor) exec(ctx context.Context, id, bundle string, specsProcess
|
||||
|
||||
type runcCall func(ctx context.Context, started chan<- int, io runc.IO) error
|
||||
|
||||
func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, process executor.ProcessInfo, call runcCall) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, process executor.ProcessInfo, started func(), call runcCall) error {
|
||||
runcProcess := &startingProcess{
|
||||
ready: make(chan struct{}),
|
||||
}
|
||||
defer runcProcess.Release()
|
||||
|
||||
var eg errgroup.Group
|
||||
egCtx, cancel := context.WithCancel(ctx)
|
||||
defer eg.Wait()
|
||||
defer cancel()
|
||||
|
||||
startedCh := make(chan int, 1)
|
||||
eg.Go(func() error {
|
||||
return runcProcess.WaitForStart(egCtx, startedCh, started)
|
||||
})
|
||||
|
||||
eg.Go(func() error {
|
||||
return handleSignals(egCtx, runcProcess, process.Signal)
|
||||
})
|
||||
|
||||
if !process.Meta.Tty {
|
||||
return call(ctx, nil, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr})
|
||||
return call(ctx, startedCh, &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr})
|
||||
}
|
||||
|
||||
ptm, ptsName, err := console.NewPty()
|
||||
@@ -63,15 +78,13 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces
|
||||
return err
|
||||
}
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
defer func() {
|
||||
if process.Stdin != nil {
|
||||
process.Stdin.Close()
|
||||
}
|
||||
pts.Close()
|
||||
ptm.Close()
|
||||
cancel() // this will shutdown resize loop
|
||||
cancel() // this will shutdown resize and signal loops
|
||||
err := eg.Wait()
|
||||
if err != nil {
|
||||
bklog.G(ctx).Warningf("error while shutting down tty io: %s", err)
|
||||
@@ -105,29 +118,14 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces
|
||||
})
|
||||
}
|
||||
|
||||
started := make(chan int, 1)
|
||||
|
||||
eg.Go(func() error {
|
||||
startedCtx, timeout := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer timeout()
|
||||
var runcProcess *os.Process
|
||||
select {
|
||||
case <-startedCtx.Done():
|
||||
return errors.New("runc started message never received")
|
||||
case pid, ok := <-started:
|
||||
if !ok {
|
||||
return errors.New("runc process failed to send pid")
|
||||
}
|
||||
runcProcess, err = os.FindProcess(pid)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to find runc process for pid %d", pid)
|
||||
}
|
||||
defer runcProcess.Release()
|
||||
err := runcProcess.WaitForReady(egCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-egCtx.Done():
|
||||
return nil
|
||||
case resize := <-process.Resize:
|
||||
err = ptm.Resize(console.WinSize{
|
||||
@@ -137,7 +135,7 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces
|
||||
if err != nil {
|
||||
bklog.G(ctx).Errorf("failed to resize ptm: %s", err)
|
||||
}
|
||||
err = runcProcess.Signal(signal.SIGWINCH)
|
||||
err = runcProcess.Process.Signal(signal.SIGWINCH)
|
||||
if err != nil {
|
||||
bklog.G(ctx).Errorf("failed to send SIGWINCH to process: %s", err)
|
||||
}
|
||||
@@ -156,5 +154,5 @@ func (w *runcExecutor) callWithIO(ctx context.Context, id, bundle string, proces
|
||||
runcIO.stderr = pts
|
||||
}
|
||||
|
||||
return call(ctx, started, runcIO)
|
||||
return call(ctx, startedCh, runcIO)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user