diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index cc771d065c..2d397c5df5 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -71,6 +71,8 @@ type Scheduler struct { stopCh chan struct{} RateLimit time.Duration NumWorkers int + workerChan chan Task + workerWg *sync.WaitGroup } func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler { @@ -94,13 +96,17 @@ func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler { // default value RateLimit: rateLimit, NumWorkers: numWorkers, + workerChan: make(chan Task, numWorkers), + workerWg: new(sync.WaitGroup), } } -func (scheduler *Scheduler) poolWorker(ctx context.Context, numWorkers int, tasks chan Task) { - for i := 0; i < numWorkers; i++ { +func (scheduler *Scheduler) poolWorker(ctx context.Context) { + for i := 0; i < scheduler.NumWorkers; i++ { go func(workerID int) { - for task := range tasks { + defer scheduler.workerWg.Done() + + for task := range scheduler.workerChan { scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task") if err := task.DoWork(ctx); err != nil { @@ -113,23 +119,31 @@ func (scheduler *Scheduler) poolWorker(ctx context.Context, numWorkers int, task } } +// gracefully shutdown. +func (scheduler *Scheduler) Shutdown() { + close(scheduler.workerChan) + close(scheduler.stopCh) + scheduler.workerWg.Wait() + scheduler.log.Info().Msg("scheduler: shutdown") +} + func (scheduler *Scheduler) RunScheduler(ctx context.Context) { throttle := time.NewTicker(rateLimit).C numWorkers := scheduler.NumWorkers - tasksWorker := make(chan Task, numWorkers) + + // wait all workers to finish their work before exiting from RunScheduler + scheduler.workerWg.Add(numWorkers) // start worker pool - go scheduler.poolWorker(ctx, numWorkers, tasksWorker) + go scheduler.poolWorker(ctx) go func() { for { select { case <-ctx.Done(): - close(tasksWorker) - close(scheduler.stopCh) - - scheduler.log.Debug().Msg("scheduler: received stop signal, exiting...") + scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...") + scheduler.Shutdown() return default: @@ -139,7 +153,7 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { if task != nil { // push tasks into worker pool scheduler.log.Debug().Msg("scheduler: pushing task into worker pool") - tasksWorker <- task + scheduler.workerChan <- task } i++ }