-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtimer.lua
265 lines (246 loc) · 9.64 KB
/
timer.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
--------------------------------------------------------------------------
-- Extended timer.
--
-- @copyright 2017 - 2018 Kong Inc.
-- @author Thijs Schreijer
-- @license Apache 2.0
local timer_at = ngx.timer.at
local pack = function(...) return { n = select("#", ...), ...} end
local _unpack = unpack or table.unpack
local unpack = function(t, i, j) return _unpack(t, i or 1, j or t.n or #t) end
local anchor_registry = {}
local gc_registry = setmetatable({},{ __mode = "v" })
local timer_id = 0
local KEY_PREFIX = "[lua-resty-timer]"
local LOG_PREFIX = "[resty-timer] "
local CANCEL_GC = "GC"
local CANCEL_SYSTEM = "SYSTEM"
local CANCEL_USER = "USER"
--- Cancel the timer.
-- Will run the 'cancel'-callback if provided. Will only cancel the timer
-- in the current worker.
-- @function timer:cancel
-- @return results of the 'cancel' callback, or `true` if no callback was provided
-- or `nil + "already cancelled"` if called repeatedly
-- @usage local t, err = resty_timer(options) -- create a timer
-- if t then
-- t:cancel() -- immediately cancel the timer again
-- end
local function cancel(self)
if self.cancel_flag then
return nil, "already cancelled"
end
local registry = self.detached and anchor_registry or gc_registry
if self.id then
registry[self.id] = nil
self.id = nil
end
self.cancel_flag = true
self.premature_reason = self.premature_reason or CANCEL_USER
if self.cb_cancel then
return self.cb_cancel(self.premature_reason, unpack(self.args))
end
return true
end
local handler = function(premature, timer_id)
local self = gc_registry[timer_id] or anchor_registry[timer_id]
if not self then -- timer was garbage collected exit
return
end
local registry = self.detached and anchor_registry or gc_registry
if self.cancel_flag then -- timer was cancelled, but not yet GC'ed, exit
return
end
if premature then -- premature, so we're being cancelled by the system
self.premature_reason = self.premature_reason or CANCEL_SYSTEM
return self:cancel()
end
if self.recurring then
self:schedule() -- no error checking required
else
-- not recurring, so must make available for GC
registry[timer_id] = nil
self.timer_id = nil
end
if self.key_name then
-- node wide timer, so validate we're up to run
local ok, err = self.shm:add(self.key_name, true, self.interval - 0.001)
if not ok then
if err == "exists" then
return -- we're not up
end
ngx.log(ngx.ERR, LOG_PREFIX, "failed to add key '", self.key_name, "': ", err)
end
end
self.cb_expire(unpack(self.args)) -- already rescheduled, so no pcall required
end
local function schedule(self)
local interval = self.sub_interval
local id = self.id
if not id then
timer_id = timer_id + 1
id = timer_id
self.id = id
interval = self.immediate and 0 or interval
end
local registry = self.detached and anchor_registry or gc_registry
local ok, err = timer_at(interval, handler, id)
if ok then
registry[id] = self
else
ngx.log(ngx.ERR, LOG_PREFIX, "failed to create timer: " .. err)
end
return ok and self or ok, err
end
--- Create a new timer.
-- The `opts` table is not stored nor altered, and can hence be safely reused to
-- create multiple timers. It supports the following parameters:
--
-- * `interval` : (number) interval in seconds after which the timer expires
--
-- * `recurring` : (boolean) set to `true` to make it a recurring timer
--
-- * `immediate` : (boolean) will do the first run immediately (the initial
-- interval will be set to 0 seconds). This option requires the `recurring` option.
--
-- * `detached` : (boolean) if set to `true` the timer will keep running detached, if
-- set to `false` the timer will be garbage collected unless anchored
-- by the user.
--
-- * `expire` : (function) callback called as `function(...)` with the arguments passed
-- as extra beyond the `opts` table to this `new` function.
--
-- * `cancel` : (optional, function) callback called as `function(reason, ...)`. Where
-- `reason` indicates why it was cancelled. The additional arguments will be the
-- arguments as passed to this `new` function, beyond the `opts` table. See the
-- usage example below for possible values for `reason`.
--
-- * `shm_name` : (optional, string) name of the shm to use to synchronize with the
-- other workers if `key_name` is set.
--
-- * `key_name` : (optional, string) key name to use in shm `shm_name`. If this key is given
-- the timer will only be executed in a single worker. All timers (across all workers) with the same
-- key will share this. The key will always be prefixed with this module's
-- name to prevent name collisions in the shm. This option requires the `shm_name` option.
--
-- * `sub_interval` : (optional, number) interval in milliseconds to check whether
-- the timer needs to run. Only used for cross-worker timers. This setting reduces
-- the maximum delay when a worker that currently runs the timer exits. In this case the
-- maximum delay could be `interval * 2` before another worker picks it up. With
-- this option set, the maximum delay will be `interval + sub_interval`.
-- This option requires the `immediate` and `key_name` options.
--
-- @function new
-- @param opts table with options
-- @param ... arguments to pass to the callbacks `expire` and `cancel`.
-- @return `timer` object or `nil + err`
-- @usage
-- local object = {
-- name = "myName",
-- }
--
-- function object:timer_callback(...)
-- -- Note: here we use colon-":" syntax
-- print("starting ", self.name, ": ", ...) --> "starting myName: 1 two 3"
-- end
--
-- function object.cancel_callback(reason, self, ...)
-- -- Note: here we cannot use colon-":" syntax, due to the 'reason' parameter
-- print("stopping ", self.name, ": ", ...) --> "stopping myName: 1 two 3"
-- if reason == resty_timer.CANCEL_USER then
-- -- user called `timer:cancel`
-- elseif reason == resty_timer.CANCEL_GC then
-- -- the timer was garbage-collected
-- elseif reason == resty_timer.CANCEL_SYSTEM then
-- -- prematurely cancelled by the system (worker is exiting)
-- else
-- -- should not happen
-- end
-- end
--
-- function object:start()
-- if self.timer then return end
-- self.timer = resty_timer({
-- interval = 1,
-- expire = self.timer_callback,
-- cancel = self.cancel_callback,
-- }, self, 1, " two ", 3) -- 'self' + 3 parameters to pass to the callbacks
--
-- function object:stop()
-- if self.timer then
-- self.timer:cancel()
-- self.timer = nil
-- end
-- end
local function new(opts, ...)
local self = {
-- timer basics
interval = tonumber(opts.interval), -- interval in ms
recurring = opts.recurring, -- should the timer be recurring?
immediate = opts.immediate, -- do first run immediately, at 0 seconds
detached = opts.detached, -- should run detached, prevent GC
args = pack(...), -- arguments to pass along
-- callbacks
cb_expire = opts.expire, -- the callback function
cb_cancel = opts.cancel, -- callback function on cancellation
-- shm info for node-wide timers
shm = nil, -- the shm to use based on `opts.shm_name` (set below)
key_name = opts.key_name, -- unique shm key, if provided it will be a node-wide timer
sub_interval = opts.sub_interval, -- sub_interval to use in ms
-- methods
cancel = cancel, -- cancel method
schedule = schedule, -- schedule method
-- internal stuff
id = nil, -- timer id in the registry
cancel_flag = nil, -- indicator timer was cancelled
premature_reason = nil, -- inicator why we're being cancelled
gc_proxy = nil, -- userdata proxy to track GC
}
assert(self.interval, "expected 'interval' to be a number")
assert(self.interval >= 0, "expected 'interval' to be greater than or equal to 0")
assert(type(self.cb_expire) == "function", "expected 'expire' to be a function")
if not self.recurring then
assert(not self.immediate, "the 'immediate' option requires 'recurring'")
end
if self.cb_cancel then
assert(type(self.cb_cancel) == "function", "expected 'cancel' to be a function")
if not self.detached then
-- add a proxy to track GC
self.gc_proxy = newproxy(true)
getmetatable(self.gc_proxy).__gc = function()
self.premature_reason = self.premature_reason or CANCEL_GC
return self:cancel()
end
end
end
if self.sub_interval then
self.sub_interval = tonumber(self.sub_interval)
assert(self.sub_interval, "expected 'sub_interval' to be a number")
assert(self.key_name, "'key_name' is required when specifying 'sub_interval'")
assert(self.immediate, "'immediate' is required when specifying 'sub_interval'")
assert(self.sub_interval >= 0, "expected 'sub_interval' to be greater than or equal to 0")
assert(self.sub_interval <= self.interval, "expected 'sub_interval' to be less than or equal to 'interval'")
else
self.sub_interval = self.interval
end
if self.key_name then
assert(type(self.key_name) == "string", "expected 'key_name' to be a string")
assert(opts.shm_name, "'shm_name' is required when specifying 'key_name'")
self.shm = ngx.shared[opts.shm_name]
assert(self.shm, "shm by name '" .. tostring(opts.shm_name) .. "' not found")
self.key_name = KEY_PREFIX .. self.key_name
end
return self:schedule()
end
return setmetatable(
{
new = new,
CANCEL_GC = CANCEL_GC,
CANCEL_SYSTEM = CANCEL_SYSTEM,
CANCEL_USER = CANCEL_USER,
-- __anchor = anchor_registry, -- for test purposes
-- __gc = gc_registry, -- for test purposes
}, {
__call = function(self, ...) return new(...) end,
}
)