Skip to content

Commit

Permalink
feat(scheduler): gracefully shutdown
Browse files Browse the repository at this point in the history
wait for workers to finish before

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Oct 20, 2023
1 parent a44ca57 commit 14d84be
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type Scheduler struct {
stopCh chan struct{}
RateLimit time.Duration
NumWorkers int
workerChan chan Task
workerWg *sync.WaitGroup
}

func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
Expand All @@ -94,13 +96,17 @@ func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
// default value
RateLimit: rateLimit,
NumWorkers: numWorkers,
workerChan: make(chan Task, numWorkers),
workerWg: new(sync.WaitGroup),
}
}

func (scheduler *Scheduler) poolWorker(ctx context.Context, numWorkers int, tasks chan Task) {
for i := 0; i < numWorkers; i++ {
func (scheduler *Scheduler) poolWorker(ctx context.Context) {
for i := 0; i < scheduler.NumWorkers; i++ {
go func(workerID int) {
for task := range tasks {
defer scheduler.workerWg.Done()

for task := range scheduler.workerChan {
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task")

if err := task.DoWork(ctx); err != nil {
Expand All @@ -113,23 +119,31 @@ func (scheduler *Scheduler) poolWorker(ctx context.Context, numWorkers int, task
}
}

// gracefully shutdown.
func (scheduler *Scheduler) Shutdown() {
close(scheduler.workerChan)
close(scheduler.stopCh)
scheduler.workerWg.Wait()
scheduler.log.Info().Msg("scheduler: shutdown")
}

func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
throttle := time.NewTicker(rateLimit).C

numWorkers := scheduler.NumWorkers
tasksWorker := make(chan Task, numWorkers)

// wait all workers to finish their work before exiting from RunScheduler
scheduler.workerWg.Add(numWorkers)

// start worker pool
go scheduler.poolWorker(ctx, numWorkers, tasksWorker)
go scheduler.poolWorker(ctx)

go func() {
for {
select {
case <-ctx.Done():
close(tasksWorker)
close(scheduler.stopCh)

scheduler.log.Debug().Msg("scheduler: received stop signal, exiting...")
scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...")
scheduler.Shutdown()

return
default:
Expand All @@ -139,7 +153,7 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
if task != nil {
// push tasks into worker pool
scheduler.log.Debug().Msg("scheduler: pushing task into worker pool")
tasksWorker <- task
scheduler.workerChan <- task
}
i++
}
Expand Down

0 comments on commit 14d84be

Please sign in to comment.