Skip to content

Commit

Permalink
feat(scheduler): wait for workers to finish before shutting down
Browse files Browse the repository at this point in the history
should fix tests reporting they couldn't remove rootDir because it's being
written by tasks

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Oct 20, 2023
1 parent 14d84be commit f8f06ca
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 35 deletions.
24 changes: 13 additions & 11 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Controller struct {
SyncOnDemand SyncOnDemand
RelyingParties map[string]rp.RelyingParty
CookieStore *CookieStore
taskScheduler *scheduler.Scheduler
// runtime params
chosenPort int // kernel-chosen port
}
Expand Down Expand Up @@ -345,13 +346,14 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.
}

func (c *Controller) Shutdown() {
c.taskScheduler.Shutdown()
ctx := context.Background()
_ = c.Server.Shutdown(ctx)
}

func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
taskScheduler := scheduler.NewScheduler(c.Config, c.Log)
taskScheduler.RunScheduler(reloadCtx)
c.taskScheduler = scheduler.NewScheduler(c.Config, c.Log)
c.taskScheduler.RunScheduler(reloadCtx)

// Enable running garbage-collect periodically for DefaultStore
if c.Config.Storage.GC {
Expand All @@ -361,16 +363,16 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
RetentionDelay: c.Config.Storage.UntaggedImageRetentionDelay,
}, c.Log)

gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, taskScheduler)
gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, c.taskScheduler)
}

// Enable running dedupe blobs both ways (dedupe or restore deduped blobs)
c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)

// Enable extensions if extension config is provided for DefaultStore
if c.Config != nil && c.Config.Extensions != nil {
ext.EnableMetricsExtension(c.Config, c.Log, c.Config.Storage.RootDirectory)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, taskScheduler, c.CveScanner, c.Log)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, c.taskScheduler, c.CveScanner, c.Log)
}

if c.Config.Storage.SubPaths != nil {
Expand All @@ -384,7 +386,7 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
RetentionDelay: storageConfig.UntaggedImageRetentionDelay,
}, c.Log)

gc.CleanImageStorePeriodically(storageConfig.GCInterval, taskScheduler)
gc.CleanImageStorePeriodically(storageConfig.GCInterval, c.taskScheduler)
}

// Enable extensions if extension config is provided for subImageStore
Expand All @@ -395,15 +397,15 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
// Enable running dedupe blobs both ways (dedupe or restore deduped blobs) for subpaths
substore := c.StoreController.SubStore[route]
if substore != nil {
substore.RunDedupeBlobs(time.Duration(0), taskScheduler)
substore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)
}
}
}

if c.Config.Extensions != nil {
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler)
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, c.taskScheduler)
//nolint: contextcheck
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, taskScheduler, c.Log)
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.Log)
if err != nil {
c.Log.Error().Err(err).Msg("unable to start sync extension")
}
Expand All @@ -412,11 +414,11 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
}

if c.CookieStore != nil {
c.CookieStore.RunSessionCleaner(taskScheduler)
c.CookieStore.RunSessionCleaner(c.taskScheduler)
}

// we can later move enabling the other scheduled tasks inside the call below
ext.EnableScheduledTasks(c.Config, taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck
ext.EnableScheduledTasks(c.Config, c.taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck
}

type SyncOnDemand interface {
Expand Down
7 changes: 4 additions & 3 deletions pkg/extensions/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,9 +1883,6 @@ func TestConfigReloader(t *testing.T) {
destConfig.Log.Output = logFile.Name()

dctlr := api.NewController(destConfig)
dcm := test.NewControllerManager(dctlr)

defer dcm.StopServer()

//nolint: dupl
Convey("Reload config without sync", func() {
Expand Down Expand Up @@ -1927,6 +1924,8 @@ func TestConfigReloader(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

defer dctlr.Shutdown()

// let it sync
time.Sleep(3 * time.Second)

Expand Down Expand Up @@ -2075,6 +2074,8 @@ func TestConfigReloader(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

defer dctlr.Shutdown()

// let it sync
time.Sleep(3 * time.Second)

Expand Down
49 changes: 28 additions & 21 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"

"zotregistry.io/zot/pkg/api/config"
Expand Down Expand Up @@ -68,11 +69,11 @@ type Scheduler struct {
waitingGenerators []*generator
generatorsLock *sync.Mutex
log log.Logger
stopCh chan struct{}
RateLimit time.Duration
NumWorkers int
workerChan chan Task
workerWg *sync.WaitGroup
inShutdown atomic.Bool
}

func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
Expand All @@ -92,7 +93,6 @@ func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
generators: generatorPQ,
generatorsLock: new(sync.Mutex),
log: log.Logger{Logger: sublogger},
stopCh: make(chan struct{}),
// default value
RateLimit: rateLimit,
NumWorkers: numWorkers,
Expand All @@ -119,20 +119,21 @@ func (scheduler *Scheduler) poolWorker(ctx context.Context) {
}
}

// gracefully shutdown.
func (scheduler *Scheduler) Shutdown() {
close(scheduler.workerChan)
close(scheduler.stopCh)
if !scheduler.inShutdown.Load() {
scheduler.inShutdown.Store(true)
close(scheduler.workerChan)
}

scheduler.workerWg.Wait()
scheduler.log.Info().Msg("scheduler: shutdown")
}

func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
throttle := time.NewTicker(rateLimit).C

numWorkers := scheduler.NumWorkers

// wait all workers to finish their work before exiting from RunScheduler
// wait all workers to finish their work before exiting from Shutdown()
scheduler.workerWg.Add(numWorkers)

// start worker pool
Expand All @@ -142,20 +143,26 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
for {
select {
case <-ctx.Done():
if !scheduler.inShutdown.Load() {
scheduler.inShutdown.Store(true)
close(scheduler.workerChan)
}

scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...")
scheduler.Shutdown()

return
default:
i := 0
for i < numWorkers {
task := scheduler.getTask()
if task != nil {
// push tasks into worker pool
scheduler.log.Debug().Msg("scheduler: pushing task into worker pool")
scheduler.workerChan <- task
if !scheduler.inShutdown.Load() {
i := 0
for i < numWorkers {
task := scheduler.getTask()
if task != nil {
// push tasks into worker pool
scheduler.log.Debug().Msg("scheduler: pushing task into worker pool")
scheduler.workerChan <- task
}
i++
}
i++
}
}

Expand Down Expand Up @@ -265,17 +272,17 @@ func (scheduler *Scheduler) SubmitTask(task Task, priority Priority) {
}

// check if the scheduler it's still running in order to add the task to the channel
select {
case <-scheduler.stopCh:
if scheduler.inShutdown.Load() {
return
default:
}

select {
case <-scheduler.stopCh:
return
case tasksQ <- task:
scheduler.log.Info().Msg("scheduler: adding a new task")
default:
if scheduler.inShutdown.Load() {
return
}
}
}

Expand Down

0 comments on commit f8f06ca

Please sign in to comment.