From e7871130edd2b22e50f3f7f8120e2be598e1641c Mon Sep 17 00:00:00 2001 From: William Baker Date: Wed, 14 Jul 2021 12:11:19 -0500 Subject: [PATCH] fix: update cron package to obtain "1w" support (#21844) * fix: update cron spackage to obtain "w" support * chore: update CHANGELOG * test: add a regression test for the 1w interval in the task scheduler --- CHANGELOG.md | 1 + go.mod | 2 +- go.sum | 4 +- task/backend/scheduler/scheduler_test.go | 172 ++++++++++------------- 4 files changed, 75 insertions(+), 104 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05e75488182..483112c2d72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ This release adds an embedded SQLite database for storing metadata required by t 1. [21747](https://github.com/influxdata/influxdb/pull/21747): Rename arm rpms with yum-compatible names. 1. [21800](https://github.com/influxdata/influxdb/pull/21800): Return an error instead of panicking when InfluxQL statement rewrites fail. 1. [21840](https://github.com/influxdata/influxdb/pull/21840): Run migrations on restored bolt & SQLite metadata databases as part of the restore process. +1. [21844](https://github.com/influxdata/influxdb/pull/21844): Upgrade to latest version of `influxdata/cron` so that tasks can be created with interval of `every: 1w`. ## v2.0.7 [2021-06-04] diff --git a/go.mod b/go.mod index 6bb8fb13370..bd93d44b708 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/hashicorp/raft v1.0.0 // indirect github.com/hashicorp/vault/api v1.0.2 github.com/imdario/mergo v0.3.9 // indirect - github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6 + github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe github.com/influxdata/flux v0.120.1 github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 github.com/influxdata/influx-cli/v2 v2.0.0-20210702141951-3ca681b1dd48 diff --git a/go.sum b/go.sum index f8d81800a27..b9d9cb4d2a7 100644 --- a/go.sum +++ b/go.sum @@ -337,8 +337,8 @@ github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg= github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6 h1:OtjKkeWDjUbyMi82C7XXy7Tvm2LXMwiBBXyFIGNPaGA= -github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og= +github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe h1:7j4SdN/BvQwN6WoUq7mv0kg5U9NhnFBxPGMafYRKym0= +github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og= github.com/influxdata/flux v0.120.1 h1:M4x6e25+ao95N98kB65wd59juA+RV7WDhcsYuxL5/6M= github.com/influxdata/flux v0.120.1/go.mod h1:pGSAvyAA5d3et7SSzajaYShWYXmnRnJJq2qWi+WWZ2I= github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU= diff --git a/task/backend/scheduler/scheduler_test.go b/task/backend/scheduler/scheduler_test.go index f8af52c829a..f50ba2bb09d 100644 --- a/task/backend/scheduler/scheduler_test.go +++ b/task/backend/scheduler/scheduler_test.go @@ -61,114 +61,84 @@ func (m *mockSchedulableService) UpdateLastScheduled(ctx context.Context, id ID, func TestSchedule_Next(t *testing.T) { t.Run("@every fires on appropriate boundaries", func(t *testing.T) { - t.Run("@every 1m", func(t *testing.T) { - mockTime := clock.NewMock() - mockTime.Set(time.Now()) - c := make(chan time.Time, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { - select { - case <-ctx.Done(): - t.Log("ctx done") - case c <- scheduledAt: - } - }} - sch, _, err := NewScheduler( - exe, - &mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error { - return nil - }}, - WithTime(mockTime), - WithMaxConcurrentWorkers(20)) - if err != nil { - t.Fatal(err) - } - defer sch.Stop() - schedule, ts, err := NewSchedule("@every 1m", mockTime.Now().UTC()) - if err != nil { - t.Fatal(err) - } + // For these tests, the "timeElapsed" is the amount of time that is + // simulated to pass for the purposes of verifying that the task fires the + // correct amount of times. It is multiplied by a factor within the tests to + // simulated firing multiple times. + tests := []struct { + name string // also used as the cron time string + timeElapsed time.Duration + }{ + { + name: "@every 1m", + timeElapsed: 1 * time.Minute, + }, + { + name: "@every 1h", + timeElapsed: 1 * time.Hour, + }, + { + name: "@every 1w", // regression test for https://github.com/influxdata/influxdb/issues/21842 + timeElapsed: 7 * 24 * time.Hour, // 1 week + }, + } - err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: ts}) - if err != nil { - t.Fatal(err) - } - go func() { - sch.mu.Lock() - mockTime.Set(mockTime.Now().UTC().Add(17 * time.Minute)) - sch.mu.Unlock() - }() - - after := time.After(6 * time.Second) - oldCheckC := ts - for i := 0; i < 16; i++ { - select { - case checkC := <-c: - if checkC.Sub(oldCheckC) != time.Minute { - t.Fatalf("task didn't fire on correct interval fired on %s interval", checkC.Sub(oldCheckC)) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := make(chan time.Time, 100) + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + select { + case <-ctx.Done(): + t.Log("ctx done") + case c <- scheduledAt: } - if !checkC.Truncate(time.Minute).Equal(checkC) { - t.Fatalf("task didn't fire at the correct time boundary") - } - oldCheckC = checkC - case <-after: - t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i) + }} + mockTime := clock.NewMock() + mockTime.Set(time.Now()) + sch, _, err := NewScheduler( + exe, + &mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error { + return nil + }}, + WithTime(mockTime), + WithMaxConcurrentWorkers(20)) + if err != nil { + t.Fatal(err) } - } - }) - t.Run("@every 1h", func(t *testing.T) { - c := make(chan time.Time, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { - select { - case <-ctx.Done(): - t.Log("ctx done") - case c <- scheduledAt: + defer sch.Stop() + schedule, ts, err := NewSchedule(tt.name, mockTime.Now().UTC()) + if err != nil { + t.Fatal(err) } - }} - mockTime := clock.NewMock() - mockTime.Set(time.Now()) - sch, _, err := NewScheduler( - exe, - &mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error { - return nil - }}, - WithTime(mockTime), - WithMaxConcurrentWorkers(20)) - if err != nil { - t.Fatal(err) - } - defer sch.Stop() - schedule, ts, err := NewSchedule("@every 1h", mockTime.Now().UTC()) - if err != nil { - t.Fatal(err) - } - err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: ts}) - if err != nil { - t.Fatal(err) - } - go func() { - sch.mu.Lock() - mockTime.Set(mockTime.Now().UTC().Add(17 * time.Hour)) - sch.mu.Unlock() - }() - - after := time.After(6 * time.Second) - oldCheckC := ts - for i := 0; i < 16; i++ { - select { - case checkC := <-c: - if checkC.Sub(oldCheckC) != time.Hour { - t.Fatalf("task didn't fire on correct interval fired on %s interval", checkC.Sub(oldCheckC)) - } - if !checkC.Truncate(time.Hour).Equal(checkC) { - t.Fatalf("task didn't fire at the correct time boundary") + err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: ts}) + if err != nil { + t.Fatal(err) + } + go func() { + sch.mu.Lock() + mockTime.Set(mockTime.Now().UTC().Add(17 * tt.timeElapsed)) + sch.mu.Unlock() + }() + + after := time.After(6 * time.Second) + oldCheckC := ts + for i := 0; i < 16; i++ { + select { + case checkC := <-c: + if checkC.Sub(oldCheckC) != tt.timeElapsed { + t.Fatalf("task didn't fire on correct interval fired on %s interval", checkC.Sub(oldCheckC)) + } + if !checkC.Truncate(tt.timeElapsed).Equal(checkC) { + t.Fatalf("task didn't fire at the correct time boundary") + } + oldCheckC = checkC + case <-after: + t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i) } - oldCheckC = checkC - case <-after: - t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i) } - } - }) + }) + } }) t.Run("fires properly with non-mocked time", func(t *testing.T) { now := time.Now()