-
Notifications
You must be signed in to change notification settings - Fork 1
/
schedule.go
204 lines (176 loc) · 4.46 KB
/
schedule.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package schedule
import (
"math/rand"
"sync"
"time"
)
// A Job is something that can be run at a scheduled time
type Job interface {
Run()
}
// JobFunc adapts a function to the Job interface
type JobFunc func()
func (fn JobFunc) Run() {
fn()
}
// A IntervalFunc is a function that returns the next time interval
// to be used for running a job. It may return different intervals
// on each call.
type IntervalFunc func() time.Duration
// A CancelFunc is a function that may be used to cancel all future
// executions of a Job
type CancelFunc func()
// Every returns a ScheduleFunc that schedules a job to run
// every d nanoseconds.
func Every(d time.Duration) IntervalFunc {
return func() time.Duration {
return d
}
}
// RoughlyEvery returns a ScheduleFunc that schedules a job to run
// repeatedly at random intervals around ±10% of d
func RoughlyEvery(d time.Duration) IntervalFunc {
return func() time.Duration {
return roughDuration(d)
}
}
// After returns a ScheduleFunc that schedules a job to run
// once after interval d. Note that this is essentially the
// same as time.After, which should be preferred for simple
// use cases.
func After(d time.Duration) IntervalFunc {
done := false
return func() time.Duration {
if done {
return 0
}
done = true
return d
}
}
// RoughlyAfter returns a ScheduleFunc that schedules a job to run
// once after a random interval around ±10% of d
func RoughlyAfter(d time.Duration) IntervalFunc {
done := false
return func() time.Duration {
if done {
return 0
}
done = true
return roughDuration(d)
}
}
// EveryAfter schedules a job to run every e nanoseconds
// but waits until a has passed before starting the schedule.
// a and e should be > 0
func EveryAfter(e, a time.Duration) IntervalFunc {
var start bool
return func() time.Duration {
if start {
return e
}
start = true
return a
}
}
// roughDuration returns a duration that is +/- 5% of the supplied duration
func roughDuration(d time.Duration) time.Duration {
return time.Duration(float64(d) * (0.90 + rand.Float64()/5))
}
// Schedule adds job to the default scheduler and returns a function that can be
// used to cancel all future executions of the job.
func Schedule(job Job, sf IntervalFunc) CancelFunc {
defaultSchedulerMutex.Lock()
defer defaultSchedulerMutex.Unlock()
return defaultScheduler.Schedule(job, sf)
}
// CancelAll cancels all scheduled jobs managed by the default scheduler
func CancelAll() {
defaultSchedulerMutex.Lock()
defer defaultSchedulerMutex.Unlock()
defaultScheduler.CancelAll()
}
var (
defaultScheduler = &Scheduler{}
defaultSchedulerMutex sync.Mutex
)
// A Scheduler manages a set of scheduled jobs
type Scheduler struct {
jobs []*scheduledJob
}
// Schedule schedules job to run after the interval returned by sf. After
// each execution of the job, sf is called to determine the next schedule
// interval. If sf returns zero then the job will not be scheduled to run.
// Schedule returns a function that can be used to cancel all future
// executions of the job.
func (s *Scheduler) Schedule(job Job, sf IntervalFunc) CancelFunc {
interval := sf()
if interval == 0 {
// Nothing to do
return noop
}
sj := &scheduledJob{
job: job,
sf: sf,
}
sj.schedule(interval)
s.jobs = append(s.jobs, sj)
return func() {
sj.cancel()
// Note that sj remains in the jobs slice so wont be garbage collected
// Expectation is that cancelling is infrequent and/or exceptional
// behaviour so this won't matter in practice. The job's timer
// should be garbage collected since we're calling stop on it.
}
}
// CancelAll cancels all scheduled jobs managed by this scheduler
func (s *Scheduler) CancelAll() {
for _, sj := range s.jobs {
sj.cancel()
}
s.jobs = []*scheduledJob{}
}
func noop() {}
type scheduledJob struct {
mu sync.Mutex
job Job
sf IntervalFunc
timer *time.Timer
}
func (sj *scheduledJob) schedule(d time.Duration) {
if sj == nil {
return
}
sj.mu.Lock()
sj.timer = time.AfterFunc(d, func() {
sj.mu.Lock()
timer := sj.timer
job := sj.job
intervalFn := sj.sf
sj.mu.Unlock()
if timer == nil {
// Job was cancelled after we had entered
// this function and started acquiring
// lock so don't proceed
return
}
job.Run()
next := intervalFn()
if next != 0 {
sj.schedule(next)
}
})
sj.mu.Unlock()
}
func (sj *scheduledJob) cancel() {
if sj == nil {
return
}
sj.mu.Lock()
defer sj.mu.Unlock()
if sj.timer == nil {
return
}
sj.timer.Stop()
sj.timer = nil
}