Skip to content

Commit

Permalink
feat(scheduler): gracefully shutdown
Browse files Browse the repository at this point in the history
wait for workers to finish before exiting

should fix tests reporting they couldn't remove rootDir because it's being
written by tasks

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Oct 23, 2023
1 parent 4cb7a6c commit c951284
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 30 deletions.
24 changes: 13 additions & 11 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Controller struct {
SyncOnDemand SyncOnDemand
RelyingParties map[string]rp.RelyingParty
CookieStore *CookieStore
taskScheduler *scheduler.Scheduler
// runtime params
chosenPort int // kernel-chosen port
}
Expand Down Expand Up @@ -345,13 +346,14 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.
}

func (c *Controller) Shutdown() {
c.taskScheduler.Shutdown()
ctx := context.Background()
_ = c.Server.Shutdown(ctx)
}

func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
taskScheduler := scheduler.NewScheduler(c.Config, c.Log)
taskScheduler.RunScheduler(reloadCtx)
c.taskScheduler = scheduler.NewScheduler(c.Config, c.Log)
c.taskScheduler.RunScheduler(reloadCtx)

// Enable running garbage-collect periodically for DefaultStore
if c.Config.Storage.GC {
Expand All @@ -361,16 +363,16 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
RetentionDelay: c.Config.Storage.UntaggedImageRetentionDelay,
}, c.Log)

gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, taskScheduler)
gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, c.taskScheduler)
}

// Enable running dedupe blobs both ways (dedupe or restore deduped blobs)
c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)

// Enable extensions if extension config is provided for DefaultStore
if c.Config != nil && c.Config.Extensions != nil {
ext.EnableMetricsExtension(c.Config, c.Log, c.Config.Storage.RootDirectory)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, taskScheduler, c.CveScanner, c.Log)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, c.taskScheduler, c.CveScanner, c.Log)
}

if c.Config.Storage.SubPaths != nil {
Expand All @@ -384,7 +386,7 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
RetentionDelay: storageConfig.UntaggedImageRetentionDelay,
}, c.Log)

gc.CleanImageStorePeriodically(storageConfig.GCInterval, taskScheduler)
gc.CleanImageStorePeriodically(storageConfig.GCInterval, c.taskScheduler)
}

// Enable extensions if extension config is provided for subImageStore
Expand All @@ -395,15 +397,15 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
// Enable running dedupe blobs both ways (dedupe or restore deduped blobs) for subpaths
substore := c.StoreController.SubStore[route]
if substore != nil {
substore.RunDedupeBlobs(time.Duration(0), taskScheduler)
substore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)
}
}
}

if c.Config.Extensions != nil {
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler)
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, c.taskScheduler)
//nolint: contextcheck
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, taskScheduler, c.Log)
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.Log)
if err != nil {
c.Log.Error().Err(err).Msg("unable to start sync extension")
}
Expand All @@ -412,11 +414,11 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
}

if c.CookieStore != nil {
c.CookieStore.RunSessionCleaner(taskScheduler)
c.CookieStore.RunSessionCleaner(c.taskScheduler)
}

// we can later move enabling the other scheduled tasks inside the call below
ext.EnableScheduledTasks(c.Config, taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck
ext.EnableScheduledTasks(c.Config, c.taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck
}

type SyncOnDemand interface {
Expand Down
7 changes: 4 additions & 3 deletions pkg/extensions/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,9 +1883,6 @@ func TestConfigReloader(t *testing.T) {
destConfig.Log.Output = logFile.Name()

dctlr := api.NewController(destConfig)
dcm := test.NewControllerManager(dctlr)

defer dcm.StopServer()

//nolint: dupl
Convey("Reload config without sync", func() {
Expand Down Expand Up @@ -1927,6 +1924,8 @@ func TestConfigReloader(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

defer dctlr.Shutdown()

// let it sync
time.Sleep(3 * time.Second)

Expand Down Expand Up @@ -2075,6 +2074,8 @@ func TestConfigReloader(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

defer dctlr.Shutdown()

// let it sync
time.Sleep(3 * time.Second)

Expand Down
61 changes: 45 additions & 16 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"

"zotregistry.io/zot/pkg/api/config"
Expand Down Expand Up @@ -68,9 +69,11 @@ type Scheduler struct {
waitingGenerators []*generator
generatorsLock *sync.Mutex
log log.Logger
stopCh chan struct{}
RateLimit time.Duration
NumWorkers int
workerChan chan Task
workerWg *sync.WaitGroup
isShuttingDown atomic.Bool
}

func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
Expand All @@ -90,17 +93,20 @@ func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
generators: generatorPQ,
generatorsLock: new(sync.Mutex),
log: log.Logger{Logger: sublogger},
stopCh: make(chan struct{}),
// default value
RateLimit: rateLimit,
NumWorkers: numWorkers,
workerChan: make(chan Task, numWorkers),
workerWg: new(sync.WaitGroup),
}
}

func (scheduler *Scheduler) poolWorker(ctx context.Context, numWorkers int, tasks chan Task) {
for i := 0; i < numWorkers; i++ {
func (scheduler *Scheduler) poolWorker(ctx context.Context) {
for i := 0; i < scheduler.NumWorkers; i++ {
go func(workerID int) {
for task := range tasks {
defer scheduler.workerWg.Done()

for task := range scheduler.workerChan {
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task")

if err := task.DoWork(ctx); err != nil {
Expand All @@ -113,33 +119,56 @@ func (scheduler *Scheduler) poolWorker(ctx context.Context, numWorkers int, task
}
}

func (scheduler *Scheduler) Shutdown() {
if !scheduler.inShutdown() {
scheduler.shutdown()
}

scheduler.workerWg.Wait()
}

func (scheduler *Scheduler) inShutdown() bool {
return scheduler.isShuttingDown.Load()
}

func (scheduler *Scheduler) shutdown() {
close(scheduler.workerChan)
scheduler.isShuttingDown.Store(true)
}

func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
throttle := time.NewTicker(rateLimit).C

numWorkers := scheduler.NumWorkers
tasksWorker := make(chan Task, numWorkers)

// wait all workers to finish their work before exiting from Shutdown()
scheduler.workerWg.Add(numWorkers)

// start worker pool
go scheduler.poolWorker(ctx, numWorkers, tasksWorker)
go scheduler.poolWorker(ctx)

go func() {
for {
select {
case <-ctx.Done():
close(tasksWorker)
close(scheduler.stopCh)
if !scheduler.inShutdown() {
scheduler.shutdown()
}

scheduler.log.Debug().Msg("scheduler: received stop signal, exiting...")
scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...")

return
default:
i := 0
for i < numWorkers {
task := scheduler.getTask()

if task != nil {
// push tasks into worker pool
scheduler.log.Debug().Msg("scheduler: pushing task into worker pool")
tasksWorker <- task
if !scheduler.inShutdown() {

Check failure on line 169 in pkg/scheduler/scheduler.go

View workflow job for this annotation

GitHub Actions / lint

if statements should only be cuddled with assignments (wsl)
scheduler.workerChan <- task
}
}
i++
}
Expand Down Expand Up @@ -251,17 +280,17 @@ func (scheduler *Scheduler) SubmitTask(task Task, priority Priority) {
}

// check if the scheduler it's still running in order to add the task to the channel
select {
case <-scheduler.stopCh:
if scheduler.inShutdown() {
return
default:
}

select {
case <-scheduler.stopCh:
return
case tasksQ <- task:
scheduler.log.Info().Msg("scheduler: adding a new task")
default:
if scheduler.inShutdown() {
return
}
}
}

Expand Down

0 comments on commit c951284

Please sign in to comment.