diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index 7bfd8f7d94c3..71a6a4b11b81 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -341,7 +341,7 @@ func main() { return err } - controller, err := newController(c, &cfg) + controller, err := newController(ctx, c, &cfg) if err != nil { return err } @@ -758,7 +758,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) { return tlsConf, nil } -func newController(c *cli.Context, cfg *config.Config) (*control.Controller, error) { +func newController(ctx context.Context, c *cli.Context, cfg *config.Config) (*control.Controller, error) { sessionManager, err := session.NewManager() if err != nil { return nil, err @@ -851,6 +851,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err ContentStore: w.ContentStore(), HistoryConfig: cfg.History, GarbageCollect: w.GarbageCollect, + GracefulStop: ctx.Done(), }) } diff --git a/control/control.go b/control/control.go index f3a10b5b0993..01408839aa86 100644 --- a/control/control.go +++ b/control/control.go @@ -71,6 +71,7 @@ type Opt struct { ContentStore *containerdsnapshot.Store HistoryConfig *config.HistoryConfig GarbageCollect func(context.Context) error + GracefulStop <-chan struct{} } type Controller struct { // TODO: ControlService @@ -95,6 +96,7 @@ func NewController(opt Opt) (*Controller, error) { ContentStore: opt.ContentStore, CleanConfig: opt.HistoryConfig, GarbageCollect: opt.GarbageCollect, + GracefulStop: opt.GracefulStop, }) if err != nil { return nil, errors.Wrap(err, "failed to create history queue") diff --git a/solver/llbsolver/history.go b/solver/llbsolver/history.go index f342952e80ba..e0b19f7512f0 100644 --- a/solver/llbsolver/history.go +++ b/solver/llbsolver/history.go @@ -47,6 +47,7 @@ type HistoryQueueOpt struct { ContentStore *containerdsnapshot.Store CleanConfig *config.HistoryConfig GarbageCollect func(context.Context) error + GracefulStop <-chan struct{} } type HistoryQueue struct { @@ -137,6 +138,16 @@ func NewHistoryQueue(opt HistoryQueueOpt) (*HistoryQueue, error) { } }() + go func() { + <-h.opt.GracefulStop + h.mu.Lock() + defer h.mu.Unlock() + // if active builds then close will happen in finalizer + if len(h.finalizers) == 0 && len(h.active) == 0 { + go h.ps.Close() + } + }() + return h, nil } @@ -637,6 +648,14 @@ func (h *HistoryQueue) AcquireFinalizer(ref string) (<-chan struct{}, func()) { <-f.done h.mu.Lock() delete(h.finalizers, ref) + // if gracefulstop then release listeners after finalize + if len(h.finalizers) == 0 { + select { + case <-h.opt.GracefulStop: + go h.ps.Close() + default: + } + } h.mu.Unlock() }() return trigger, sync.OnceFunc(func() { @@ -1032,6 +1051,18 @@ func (p *pubsub[T]) Send(v T) { p.mu.Unlock() } +func (p *pubsub[T]) Close() { + p.mu.Lock() + channels := make([]*channel[T], 0, len(p.m)) + for c := range p.m { + channels = append(channels, c) + } + p.mu.Unlock() + for _, c := range channels { + c.close() + } +} + type channel[T any] struct { ps *pubsub[T] ch chan T