Merge pull request #5522 from tonistiigi/history-gracefulstop-fix

history: handle gracefulstop when history is active
This commit is contained in:
Tõnis Tiigi
2024-11-20 11:48:59 -08:00
committed by GitHub
3 changed files with 36 additions and 2 deletions

View File

@@ -341,7 +341,7 @@ func main() {
return err
}
controller, err := newController(c, &cfg)
controller, err := newController(ctx, c, &cfg)
if err != nil {
return err
}
@@ -758,7 +758,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) {
return tlsConf, nil
}
func newController(c *cli.Context, cfg *config.Config) (*control.Controller, error) {
func newController(ctx context.Context, c *cli.Context, cfg *config.Config) (*control.Controller, error) {
sessionManager, err := session.NewManager()
if err != nil {
return nil, err
@@ -851,6 +851,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err
ContentStore: w.ContentStore(),
HistoryConfig: cfg.History,
GarbageCollect: w.GarbageCollect,
GracefulStop: ctx.Done(),
})
}

View File

@@ -71,6 +71,7 @@ type Opt struct {
ContentStore *containerdsnapshot.Store
HistoryConfig *config.HistoryConfig
GarbageCollect func(context.Context) error
GracefulStop <-chan struct{}
}
type Controller struct { // TODO: ControlService
@@ -95,6 +96,7 @@ func NewController(opt Opt) (*Controller, error) {
ContentStore: opt.ContentStore,
CleanConfig: opt.HistoryConfig,
GarbageCollect: opt.GarbageCollect,
GracefulStop: opt.GracefulStop,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create history queue")

View File

@@ -47,6 +47,7 @@ type HistoryQueueOpt struct {
ContentStore *containerdsnapshot.Store
CleanConfig *config.HistoryConfig
GarbageCollect func(context.Context) error
GracefulStop <-chan struct{}
}
type HistoryQueue struct {
@@ -137,6 +138,16 @@ func NewHistoryQueue(opt HistoryQueueOpt) (*HistoryQueue, error) {
}
}()
go func() {
<-h.opt.GracefulStop
h.mu.Lock()
defer h.mu.Unlock()
// if active builds then close will happen in finalizer
if len(h.finalizers) == 0 && len(h.active) == 0 {
go h.ps.Close()
}
}()
return h, nil
}
@@ -637,6 +648,14 @@ func (h *HistoryQueue) AcquireFinalizer(ref string) (<-chan struct{}, func()) {
<-f.done
h.mu.Lock()
delete(h.finalizers, ref)
// if gracefulstop then release listeners after finalize
if len(h.finalizers) == 0 {
select {
case <-h.opt.GracefulStop:
go h.ps.Close()
default:
}
}
h.mu.Unlock()
}()
return trigger, sync.OnceFunc(func() {
@@ -1032,6 +1051,18 @@ func (p *pubsub[T]) Send(v T) {
p.mu.Unlock()
}
func (p *pubsub[T]) Close() {
p.mu.Lock()
channels := make([]*channel[T], 0, len(p.m))
for c := range p.m {
channels = append(channels, c)
}
p.mu.Unlock()
for _, c := range channels {
c.close()
}
}
type channel[T any] struct {
ps *pubsub[T]
ch chan T