From c45cd8fa1634bafd02209495aa34a652096ab076 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Thu, 14 Nov 2024 21:55:51 -0800 Subject: [PATCH] history: handle gracefulstop when history is active When GracefulStop is called gRPC waits for current requests to finish before closing. While this is generally the behavior we want, it is not always same for the History.Listen endpoint. That endpoint is usually open even if buildkit is not actively processing any builds, because client may be waiting for new events. The new logic is that if GracefulStop will happen, history will close active listeners if there are no active builds. If there are active builds then active listeners will be closed after all the active builds have completed their finalizers. Signed-off-by: Tonis Tiigi --- cmd/buildkitd/main.go | 5 +++-- control/control.go | 2 ++ solver/llbsolver/history.go | 31 +++++++++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index 523c605104f2..71d696d3f936 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -341,7 +341,7 @@ func main() { return err } - controller, err := newController(c, &cfg) + controller, err := newController(ctx, c, &cfg) if err != nil { return err } @@ -758,7 +758,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) { return tlsConf, nil } -func newController(c *cli.Context, cfg *config.Config) (*control.Controller, error) { +func newController(ctx context.Context, c *cli.Context, cfg *config.Config) (*control.Controller, error) { sessionManager, err := session.NewManager() if err != nil { return nil, err @@ -851,6 +851,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err ContentStore: w.ContentStore(), HistoryConfig: cfg.History, GarbageCollect: w.GarbageCollect, + GracefulStop: ctx.Done(), }) } diff --git a/control/control.go b/control/control.go index f3a10b5b0993..01408839aa86 100644 --- a/control/control.go +++ b/control/control.go @@ -71,6 +71,7 @@ type Opt struct { ContentStore *containerdsnapshot.Store HistoryConfig *config.HistoryConfig GarbageCollect func(context.Context) error + GracefulStop <-chan struct{} } type Controller struct { // TODO: ControlService @@ -95,6 +96,7 @@ func NewController(opt Opt) (*Controller, error) { ContentStore: opt.ContentStore, CleanConfig: opt.HistoryConfig, GarbageCollect: opt.GarbageCollect, + GracefulStop: opt.GracefulStop, }) if err != nil { return nil, errors.Wrap(err, "failed to create history queue") diff --git a/solver/llbsolver/history.go b/solver/llbsolver/history.go index f342952e80ba..e0b19f7512f0 100644 --- a/solver/llbsolver/history.go +++ b/solver/llbsolver/history.go @@ -47,6 +47,7 @@ type HistoryQueueOpt struct { ContentStore *containerdsnapshot.Store CleanConfig *config.HistoryConfig GarbageCollect func(context.Context) error + GracefulStop <-chan struct{} } type HistoryQueue struct { @@ -137,6 +138,16 @@ func NewHistoryQueue(opt HistoryQueueOpt) (*HistoryQueue, error) { } }() + go func() { + <-h.opt.GracefulStop + h.mu.Lock() + defer h.mu.Unlock() + // if active builds then close will happen in finalizer + if len(h.finalizers) == 0 && len(h.active) == 0 { + go h.ps.Close() + } + }() + return h, nil } @@ -637,6 +648,14 @@ func (h *HistoryQueue) AcquireFinalizer(ref string) (<-chan struct{}, func()) { <-f.done h.mu.Lock() delete(h.finalizers, ref) + // if gracefulstop then release listeners after finalize + if len(h.finalizers) == 0 { + select { + case <-h.opt.GracefulStop: + go h.ps.Close() + default: + } + } h.mu.Unlock() }() return trigger, sync.OnceFunc(func() { @@ -1032,6 +1051,18 @@ func (p *pubsub[T]) Send(v T) { p.mu.Unlock() } +func (p *pubsub[T]) Close() { + p.mu.Lock() + channels := make([]*channel[T], 0, len(p.m)) + for c := range p.m { + channels = append(channels, c) + } + p.mu.Unlock() + for _, c := range channels { + c.close() + } +} + type channel[T any] struct { ps *pubsub[T] ch chan T