From f8f06ca4b02b46ec9e34c12039e7ccdfdbbb872a Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Fri, 20 Oct 2023 17:28:10 +0300 Subject: [PATCH] feat(scheduler): wait for workers to finish before shutting down should fix tests reporting they couldn't remove rootDir because it's being written by tasks Signed-off-by: Petu Eusebiu --- pkg/api/controller.go | 24 +++++++++------- pkg/extensions/sync/sync_test.go | 7 +++-- pkg/scheduler/scheduler.go | 49 ++++++++++++++++++-------------- 3 files changed, 45 insertions(+), 35 deletions(-) diff --git a/pkg/api/controller.go b/pkg/api/controller.go index aa36222910..c172510c68 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -49,6 +49,7 @@ type Controller struct { SyncOnDemand SyncOnDemand RelyingParties map[string]rp.RelyingParty CookieStore *CookieStore + taskScheduler *scheduler.Scheduler // runtime params chosenPort int // kernel-chosen port } @@ -345,13 +346,14 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config. } func (c *Controller) Shutdown() { + c.taskScheduler.Shutdown() ctx := context.Background() _ = c.Server.Shutdown(ctx) } func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { - taskScheduler := scheduler.NewScheduler(c.Config, c.Log) - taskScheduler.RunScheduler(reloadCtx) + c.taskScheduler = scheduler.NewScheduler(c.Config, c.Log) + c.taskScheduler.RunScheduler(reloadCtx) // Enable running garbage-collect periodically for DefaultStore if c.Config.Storage.GC { @@ -361,16 +363,16 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { RetentionDelay: c.Config.Storage.UntaggedImageRetentionDelay, }, c.Log) - gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, taskScheduler) + gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, c.taskScheduler) } // Enable running dedupe blobs both ways (dedupe or restore deduped blobs) - c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), c.taskScheduler) // Enable extensions if extension config is provided for DefaultStore if c.Config != nil && c.Config.Extensions != nil { ext.EnableMetricsExtension(c.Config, c.Log, c.Config.Storage.RootDirectory) - ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, taskScheduler, c.CveScanner, c.Log) + ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, c.taskScheduler, c.CveScanner, c.Log) } if c.Config.Storage.SubPaths != nil { @@ -384,7 +386,7 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { RetentionDelay: storageConfig.UntaggedImageRetentionDelay, }, c.Log) - gc.CleanImageStorePeriodically(storageConfig.GCInterval, taskScheduler) + gc.CleanImageStorePeriodically(storageConfig.GCInterval, c.taskScheduler) } // Enable extensions if extension config is provided for subImageStore @@ -395,15 +397,15 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { // Enable running dedupe blobs both ways (dedupe or restore deduped blobs) for subpaths substore := c.StoreController.SubStore[route] if substore != nil { - substore.RunDedupeBlobs(time.Duration(0), taskScheduler) + substore.RunDedupeBlobs(time.Duration(0), c.taskScheduler) } } } if c.Config.Extensions != nil { - ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler) + ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, c.taskScheduler) //nolint: contextcheck - syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, taskScheduler, c.Log) + syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.Log) if err != nil { c.Log.Error().Err(err).Msg("unable to start sync extension") } @@ -412,11 +414,11 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { } if c.CookieStore != nil { - c.CookieStore.RunSessionCleaner(taskScheduler) + c.CookieStore.RunSessionCleaner(c.taskScheduler) } // we can later move enabling the other scheduled tasks inside the call below - ext.EnableScheduledTasks(c.Config, taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck + ext.EnableScheduledTasks(c.Config, c.taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck } type SyncOnDemand interface { diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index 52b5dcd737..818b00a9bb 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -1883,9 +1883,6 @@ func TestConfigReloader(t *testing.T) { destConfig.Log.Output = logFile.Name() dctlr := api.NewController(destConfig) - dcm := test.NewControllerManager(dctlr) - - defer dcm.StopServer() //nolint: dupl Convey("Reload config without sync", func() { @@ -1927,6 +1924,8 @@ func TestConfigReloader(t *testing.T) { time.Sleep(100 * time.Millisecond) } + defer dctlr.Shutdown() + // let it sync time.Sleep(3 * time.Second) @@ -2075,6 +2074,8 @@ func TestConfigReloader(t *testing.T) { time.Sleep(100 * time.Millisecond) } + defer dctlr.Shutdown() + // let it sync time.Sleep(3 * time.Second) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 2d397c5df5..4b0a655baf 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -5,6 +5,7 @@ import ( "context" "runtime" "sync" + "sync/atomic" "time" "zotregistry.io/zot/pkg/api/config" @@ -68,11 +69,11 @@ type Scheduler struct { waitingGenerators []*generator generatorsLock *sync.Mutex log log.Logger - stopCh chan struct{} RateLimit time.Duration NumWorkers int workerChan chan Task workerWg *sync.WaitGroup + inShutdown atomic.Bool } func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler { @@ -92,7 +93,6 @@ func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler { generators: generatorPQ, generatorsLock: new(sync.Mutex), log: log.Logger{Logger: sublogger}, - stopCh: make(chan struct{}), // default value RateLimit: rateLimit, NumWorkers: numWorkers, @@ -119,12 +119,13 @@ func (scheduler *Scheduler) poolWorker(ctx context.Context) { } } -// gracefully shutdown. func (scheduler *Scheduler) Shutdown() { - close(scheduler.workerChan) - close(scheduler.stopCh) + if !scheduler.inShutdown.Load() { + scheduler.inShutdown.Store(true) + close(scheduler.workerChan) + } + scheduler.workerWg.Wait() - scheduler.log.Info().Msg("scheduler: shutdown") } func (scheduler *Scheduler) RunScheduler(ctx context.Context) { @@ -132,7 +133,7 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { numWorkers := scheduler.NumWorkers - // wait all workers to finish their work before exiting from RunScheduler + // wait all workers to finish their work before exiting from Shutdown() scheduler.workerWg.Add(numWorkers) // start worker pool @@ -142,20 +143,26 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { for { select { case <-ctx.Done(): + if !scheduler.inShutdown.Load() { + scheduler.inShutdown.Store(true) + close(scheduler.workerChan) + } + scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...") - scheduler.Shutdown() return default: - i := 0 - for i < numWorkers { - task := scheduler.getTask() - if task != nil { - // push tasks into worker pool - scheduler.log.Debug().Msg("scheduler: pushing task into worker pool") - scheduler.workerChan <- task + if !scheduler.inShutdown.Load() { + i := 0 + for i < numWorkers { + task := scheduler.getTask() + if task != nil { + // push tasks into worker pool + scheduler.log.Debug().Msg("scheduler: pushing task into worker pool") + scheduler.workerChan <- task + } + i++ } - i++ } } @@ -265,17 +272,17 @@ func (scheduler *Scheduler) SubmitTask(task Task, priority Priority) { } // check if the scheduler it's still running in order to add the task to the channel - select { - case <-scheduler.stopCh: + if scheduler.inShutdown.Load() { return - default: } select { - case <-scheduler.stopCh: - return case tasksQ <- task: scheduler.log.Info().Msg("scheduler: adding a new task") + default: + if scheduler.inShutdown.Load() { + return + } } }