Skip to content

Commit

Permalink
fix: update cron package to obtain "1w" support (influxdata#21844)
Browse files Browse the repository at this point in the history
* fix: update cron spackage to obtain "w" support

* chore: update CHANGELOG

* test: add a regression test for the 1w interval in the task scheduler
  • Loading branch information
williamhbaker authored Jul 14, 2021
1 parent b5b36b2 commit e787113
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 104 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ This release adds an embedded SQLite database for storing metadata required by t
1. [21747](https://github.com/influxdata/influxdb/pull/21747): Rename arm rpms with yum-compatible names.
1. [21800](https://github.com/influxdata/influxdb/pull/21800): Return an error instead of panicking when InfluxQL statement rewrites fail.
1. [21840](https://github.com/influxdata/influxdb/pull/21840): Run migrations on restored bolt & SQLite metadata databases as part of the restore process.
1. [21844](https://github.com/influxdata/influxdb/pull/21844): Upgrade to latest version of `influxdata/cron` so that tasks can be created with interval of `every: 1w`.

## v2.0.7 [2021-06-04]

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/hashicorp/raft v1.0.0 // indirect
github.com/hashicorp/vault/api v1.0.2
github.com/imdario/mergo v0.3.9 // indirect
github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6
github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe
github.com/influxdata/flux v0.120.1
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influx-cli/v2 v2.0.0-20210702141951-3ca681b1dd48
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg=
github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6 h1:OtjKkeWDjUbyMi82C7XXy7Tvm2LXMwiBBXyFIGNPaGA=
github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og=
github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe h1:7j4SdN/BvQwN6WoUq7mv0kg5U9NhnFBxPGMafYRKym0=
github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og=
github.com/influxdata/flux v0.120.1 h1:M4x6e25+ao95N98kB65wd59juA+RV7WDhcsYuxL5/6M=
github.com/influxdata/flux v0.120.1/go.mod h1:pGSAvyAA5d3et7SSzajaYShWYXmnRnJJq2qWi+WWZ2I=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU=
Expand Down
172 changes: 71 additions & 101 deletions task/backend/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,114 +61,84 @@ func (m *mockSchedulableService) UpdateLastScheduled(ctx context.Context, id ID,

func TestSchedule_Next(t *testing.T) {
t.Run("@every fires on appropriate boundaries", func(t *testing.T) {
t.Run("@every 1m", func(t *testing.T) {
mockTime := clock.NewMock()
mockTime.Set(time.Now())
c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) {
select {
case <-ctx.Done():
t.Log("ctx done")
case c <- scheduledAt:
}
}}
sch, _, err := NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error {
return nil
}},
WithTime(mockTime),
WithMaxConcurrentWorkers(20))
if err != nil {
t.Fatal(err)
}
defer sch.Stop()
schedule, ts, err := NewSchedule("@every 1m", mockTime.Now().UTC())
if err != nil {
t.Fatal(err)
}
// For these tests, the "timeElapsed" is the amount of time that is
// simulated to pass for the purposes of verifying that the task fires the
// correct amount of times. It is multiplied by a factor within the tests to
// simulated firing multiple times.
tests := []struct {
name string // also used as the cron time string
timeElapsed time.Duration
}{
{
name: "@every 1m",
timeElapsed: 1 * time.Minute,
},
{
name: "@every 1h",
timeElapsed: 1 * time.Hour,
},
{
name: "@every 1w", // regression test for https://github.com/influxdata/influxdb/issues/21842
timeElapsed: 7 * 24 * time.Hour, // 1 week
},
}

err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: ts})
if err != nil {
t.Fatal(err)
}
go func() {
sch.mu.Lock()
mockTime.Set(mockTime.Now().UTC().Add(17 * time.Minute))
sch.mu.Unlock()
}()

after := time.After(6 * time.Second)
oldCheckC := ts
for i := 0; i < 16; i++ {
select {
case checkC := <-c:
if checkC.Sub(oldCheckC) != time.Minute {
t.Fatalf("task didn't fire on correct interval fired on %s interval", checkC.Sub(oldCheckC))
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) {
select {
case <-ctx.Done():
t.Log("ctx done")
case c <- scheduledAt:
}
if !checkC.Truncate(time.Minute).Equal(checkC) {
t.Fatalf("task didn't fire at the correct time boundary")
}
oldCheckC = checkC
case <-after:
t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i)
}}
mockTime := clock.NewMock()
mockTime.Set(time.Now())
sch, _, err := NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error {
return nil
}},
WithTime(mockTime),
WithMaxConcurrentWorkers(20))
if err != nil {
t.Fatal(err)
}
}
})
t.Run("@every 1h", func(t *testing.T) {
c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) {
select {
case <-ctx.Done():
t.Log("ctx done")
case c <- scheduledAt:
defer sch.Stop()
schedule, ts, err := NewSchedule(tt.name, mockTime.Now().UTC())
if err != nil {
t.Fatal(err)
}
}}
mockTime := clock.NewMock()
mockTime.Set(time.Now())
sch, _, err := NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error {
return nil
}},
WithTime(mockTime),
WithMaxConcurrentWorkers(20))
if err != nil {
t.Fatal(err)
}
defer sch.Stop()
schedule, ts, err := NewSchedule("@every 1h", mockTime.Now().UTC())
if err != nil {
t.Fatal(err)
}

err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: ts})
if err != nil {
t.Fatal(err)
}
go func() {
sch.mu.Lock()
mockTime.Set(mockTime.Now().UTC().Add(17 * time.Hour))
sch.mu.Unlock()
}()

after := time.After(6 * time.Second)
oldCheckC := ts
for i := 0; i < 16; i++ {
select {
case checkC := <-c:
if checkC.Sub(oldCheckC) != time.Hour {
t.Fatalf("task didn't fire on correct interval fired on %s interval", checkC.Sub(oldCheckC))
}
if !checkC.Truncate(time.Hour).Equal(checkC) {
t.Fatalf("task didn't fire at the correct time boundary")
err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: ts})
if err != nil {
t.Fatal(err)
}
go func() {
sch.mu.Lock()
mockTime.Set(mockTime.Now().UTC().Add(17 * tt.timeElapsed))
sch.mu.Unlock()
}()

after := time.After(6 * time.Second)
oldCheckC := ts
for i := 0; i < 16; i++ {
select {
case checkC := <-c:
if checkC.Sub(oldCheckC) != tt.timeElapsed {
t.Fatalf("task didn't fire on correct interval fired on %s interval", checkC.Sub(oldCheckC))
}
if !checkC.Truncate(tt.timeElapsed).Equal(checkC) {
t.Fatalf("task didn't fire at the correct time boundary")
}
oldCheckC = checkC
case <-after:
t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i)
}
oldCheckC = checkC
case <-after:
t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i)
}
}
})
})
}
})
t.Run("fires properly with non-mocked time", func(t *testing.T) {
now := time.Now()
Expand Down

0 comments on commit e787113

Please sign in to comment.