diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index aa2259312..b80ef862a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -3,6 +3,7 @@ package scheduler import ( "container/heap" "context" + "math" "runtime" "sync" "sync/atomic" @@ -26,7 +27,7 @@ func (pq generatorsPriorityQueue) Len() int { } func (pq generatorsPriorityQueue) Less(i, j int) bool { - return pq[i].priority > pq[j].priority + return pq[i].getRanking() > pq[j].getRanking() } func (pq generatorsPriorityQueue) Swap(i, j int) { @@ -331,7 +332,13 @@ func (scheduler *Scheduler) generateTasks() { // check if the generator with highest priority is ready to run if scheduler.generators[0].getState() == Ready { + // we are not popping it as we will generate multiple tasks until it is done + // we are going to pop after all tasks are generated gen = scheduler.generators[0] + + // trigger a generator reorder, as generating a task may impact the order + // equivalent of pop/remove followed by push, but more efficient + heap.Fix(&scheduler.generators, 0) } else { gen, _ = heap.Pop(&scheduler.generators).(*generator) if gen.getState() == Waiting { @@ -439,6 +446,7 @@ type generator struct { taskGenerator TaskGenerator remainingTask Task index int + taskCount int64 } func (gen *generator) generate(sch *Scheduler) { @@ -460,6 +468,7 @@ func (gen *generator) generate(sch *Scheduler) { if gen.taskGenerator.IsDone() { gen.done = true gen.lastRun = time.Now() + gen.taskCount = 0 gen.taskGenerator.Reset() return @@ -468,6 +477,9 @@ func (gen *generator) generate(sch *Scheduler) { task = nextTask } + // keep track of generated task count to use it for generator ordering + gen.taskCount++ + // check if it's possible to add a new task to the channel // if not, keep the generated task and retry to add it next time select { @@ -502,12 +514,19 @@ func (gen *generator) getState() State { return Ready } +func (gen *generator) getRanking() float64 { + // take into account the priority, but also how many tasks of + // a specific generator were executed in the current generator run + return math.Pow(10, float64(gen.priority)) / (1 + float64(gen.taskCount)) //nolint:gomnd +} + func (scheduler *Scheduler) SubmitGenerator(taskGenerator TaskGenerator, interval time.Duration, priority Priority) { newGenerator := &generator{ interval: interval, done: false, priority: priority, taskGenerator: taskGenerator, + taskCount: 0, remainingTask: nil, } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index defcf6b15..a91476b77 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "runtime" + "strings" "testing" "time" @@ -18,9 +19,10 @@ import ( ) type task struct { - log log.Logger - msg string - err bool + log log.Logger + msg string + err bool + delay time.Duration } var errInternal = errors.New("task: internal error") @@ -35,7 +37,7 @@ func (t *task) DoWork(ctx context.Context) error { return ctx.Err() } - time.Sleep(100 * time.Millisecond) + time.Sleep(t.delay) } t.log.Info().Msg(t.msg) @@ -52,15 +54,17 @@ func (t *task) Name() string { } type generator struct { - log log.Logger - priority string - done bool - index int - step int + log log.Logger + priority string + done bool + index int + step int + limit int + taskDelay time.Duration } func (g *generator) Next() (scheduler.Task, error) { - if g.step > 100 { + if g.step > g.limit { g.done = true } g.step++ @@ -74,7 +78,12 @@ func (g *generator) Next() (scheduler.Task, error) { return nil, errInternal } - return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil + return &task{ + log: g.log, + msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), + err: false, + delay: g.taskDelay, + }, nil } func (g *generator) IsDone() bool { @@ -154,13 +163,13 @@ func TestScheduler(t *testing.T) { sch := scheduler.NewScheduler(cfg, metrics, logger) sch.RateLimit = 5 * time.Second - genL := &generator{log: logger, priority: "low priority"} + genL := &generator{log: logger, priority: "low priority", limit: 100, taskDelay: 100 * time.Millisecond} sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority) - genM := &generator{log: logger, priority: "medium priority"} + genM := &generator{log: logger, priority: "medium priority", limit: 100, taskDelay: 100 * time.Millisecond} sch.SubmitGenerator(genM, time.Duration(0), scheduler.MediumPriority) - genH := &generator{log: logger, priority: "high priority"} + genH := &generator{log: logger, priority: "high priority", limit: 100, taskDelay: 100 * time.Millisecond} sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority) sch.RunScheduler() @@ -176,6 +185,107 @@ func TestScheduler(t *testing.T) { So(string(data), ShouldNotContainSubstring, "failed to execute task") }) + Convey("Test reordering of generators in queue", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + cfg := config.New() + cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3} + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(cfg, metrics, logger) + sch.RateLimit = 1 * time.Nanosecond + + // Testing repordering of generators using the same medium priority, as well as reordering with + // a low priority generator + + genL := &generator{log: logger, priority: "low priority", limit: 110, taskDelay: time.Nanosecond} + sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority) + + genM := &generator{log: logger, priority: "medium 1 priority", limit: 110, taskDelay: time.Nanosecond} + sch.SubmitGenerator(genM, time.Duration(0), scheduler.MediumPriority) + + genH := &generator{log: logger, priority: "medium 2 priority", limit: 110, taskDelay: time.Nanosecond} + sch.SubmitGenerator(genH, time.Duration(0), scheduler.MediumPriority) + + sch.RunScheduler() + time.Sleep(1 * time.Second) + sch.Shutdown() + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + + // Check all tasks show up in the logs + for i := 1; i < 110; i++ { + if i%11 == 0 || i%13 == 0 { + continue + } + + So(string(data), ShouldContainSubstring, fmt.Sprintf("executing medium 1 priority task; index: %d", i)) + So(string(data), ShouldContainSubstring, fmt.Sprintf("executing medium 2 priority task; index: %d", i)) + So(string(data), ShouldContainSubstring, fmt.Sprintf("executing low priority task; index: %d", i)) + } + + taskCounter := 0 + priorityFlippedCounter := 0 + samePriorityFlippedCounter := 0 + lastPriority := "medium" + lastMediumGenerator := "1" + + for _, line := range strings.Split(strings.TrimSuffix(string(data), "\n"), "\n") { + if !strings.Contains(line, "priority task; index: ") { + continue + } + + taskCounter++ + + // low priority tasks start executing later + // medium priority generators are prioritized until the rank 100/9 (8 generated tasks) + // starting with 100/10, a low priority generator could potentially be prioritized instead + // there will be at least 8 * 2 medium priority tasks executed before low priority tasks are pushed + if taskCounter < 17 { + So(line, ShouldContainSubstring, "executing medium") + } + + // medium priority 2*110 medium priority tasks should have been generated, + // medium priority generators should be done + // add around 10 low priority tasks to the counter + // and an additional margin of 5 to make sure the test is stable + if taskCounter > 225 { + So(line, ShouldContainSubstring, "executing low priority") + } + + if strings.Contains(line, "executing medium") { + if !strings.Contains(line, fmt.Sprintf("executing medium %s", lastMediumGenerator)) { + samePriorityFlippedCounter++ + if lastMediumGenerator == "1" { + lastMediumGenerator = "2" + } else { + lastMediumGenerator = "1" + } + } + } + + if !strings.Contains(line, fmt.Sprintf("executing %s", lastPriority)) { + priorityFlippedCounter++ + if lastPriority == "low" { + lastPriority = "medium" + } else { + lastPriority = "low" + } + } + } + + // fairness: make sure none of the medium priority generators is favored by the algorithm + So(samePriorityFlippedCounter, ShouldBeGreaterThanOrEqualTo, 60) + t.Logf("Switched between medium priority generators %d times", samePriorityFlippedCounter) + // fairness: make sure the algorithm alternates between generator priorities + So(priorityFlippedCounter, ShouldBeGreaterThanOrEqualTo, 10) + t.Logf("Switched between generator priorities %d times", priorityFlippedCounter) + }) + Convey("Test task returning an error", t, func() { logFile, err := os.CreateTemp("", "zot-log*.txt") So(err, ShouldBeNil) @@ -209,7 +319,7 @@ func TestScheduler(t *testing.T) { metrics := monitoring.NewMetricsServer(true, logger) sch := scheduler.NewScheduler(config.New(), metrics, logger) - genL := &generator{log: logger, priority: "low priority"} + genL := &generator{log: logger, priority: "low priority", limit: 100, taskDelay: 100 * time.Millisecond} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority) sch.RunScheduler() @@ -272,7 +382,7 @@ func TestScheduler(t *testing.T) { metrics := monitoring.NewMetricsServer(true, logger) sch := scheduler.NewScheduler(config.New(), metrics, logger) - genL := &generator{log: logger, priority: "medium priority"} + genL := &generator{log: logger, priority: "medium priority", limit: 100, taskDelay: 100 * time.Millisecond} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.MediumPriority) sch.RunScheduler()