diff --git a/lualib/resty/events/broker.lua b/lualib/resty/events/broker.lua index 537beb43..39abde02 100644 --- a/lualib/resty/events/broker.lua +++ b/lualib/resty/events/broker.lua @@ -30,7 +30,7 @@ local cjson_encode = cjson.encode local MAX_UNIQUE_EVENTS = 1024 local function is_closed(err) - return err and + return type(err) == "string" and (str_sub(err, -6) == "closed" or str_sub(err, -11) == "broken pipe") end diff --git a/lualib/resty/events/callback.lua b/lualib/resty/events/callback.lua index 1da6a8b0..10d1cb2d 100644 --- a/lualib/resty/events/callback.lua +++ b/lualib/resty/events/callback.lua @@ -110,9 +110,19 @@ function _M:do_event(d) local event = d.event local data = d.data local wid = d.wid + local time = d.time + + ngx.update_time() + local now = ngx.now() + + if time then + log(DEBUG, "worker-events [receive]: source=", source, + ", event=", event, ", wid=", wid, ", time=", now - time, + ", data=", encode(data)) + end log(DEBUG, "worker-events: handling event; source=", source, - ", event=", event, ", wid=", wid) + ", event=", event, ", wid=", wid, ", data=", encode(data)) local funcs = self._funcs local list @@ -128,6 +138,12 @@ function _M:do_event(d) -- source+event callback list = get_callback_list(self, source, event) do_handlerlist(funcs, list, source, event, data, wid) + + ngx.update_time() + log(DEBUG, "worker-events [done] : source=", source, + ", event=", event, ", wid=", wid, ", time=", ngx.now() - now, + ", data=", encode(data)) + end return _M diff --git a/lualib/resty/events/codec.lua b/lualib/resty/events/codec.lua index eba7ec01..42a7ef02 100644 --- a/lualib/resty/events/codec.lua +++ b/lualib/resty/events/codec.lua @@ -9,6 +9,7 @@ end local options = { dict = { "spec", "data", "source", "event", "wid", "unique", + "time", }, } diff --git a/lualib/resty/events/queue.lua b/lualib/resty/events/queue.lua index 5260afc7..177b286c 100644 --- a/lualib/resty/events/queue.lua +++ b/lualib/resty/events/queue.lua @@ -1,3 +1,5 @@ +local cjson = require "cjson.safe" +local codec = require "resty.events.codec" local semaphore = require "ngx.semaphore" local table_new = require "table.new" @@ -6,6 +8,8 @@ local assert = assert local setmetatable = setmetatable local math_min = math.min +local decode = codec.decode +local cjson_encode = cjson.encode local _M = {} local _MT = { __index = _M, } @@ -20,8 +24,14 @@ function _M.new(max_len) elts = table_new(math_min(max_len, DEFAULT_QUEUE_LEN), 0), first = 0, last = -1, + + -- debug + outcome = 0, + income = 0, } + ngx.log(ngx.DEBUG, "worker-events [queue]: init, max_len=", self.max_len) + return setmetatable(self, _MT) end @@ -29,7 +39,12 @@ end function _M:push(item) local last = self.last - if last - self.first + 1 >= self.max then + local count = last - self.first + 1 + + ngx.log(ngx.DEBUG, "worker-events [queue]: push , len=", count, + ", income=", self.income, ", outcome=", self.outcome) + + if count >= self.max then return nil, "queue overflow" end @@ -37,6 +52,8 @@ function _M:push(item) self.last = last self.elts[last] = item + self.income = self.income + 1 + self.semaphore:post() return true @@ -59,6 +76,13 @@ function _M:pop() self.elts[first] = nil self.first = first + 1 + self.outcome = self.outcome + 1 + + local count = self.last - self.first + 1 + + ngx.log(ngx.DEBUG, "worker-events [queue]: pop , len=", count, + ", income=", self.income, ", outcome=", self.outcome) + return item end diff --git a/lualib/resty/events/worker.lua b/lualib/resty/events/worker.lua index 40b6f1f9..d9ef7931 100644 --- a/lualib/resty/events/worker.lua +++ b/lualib/resty/events/worker.lua @@ -40,6 +40,7 @@ local EVENT_T = { event = '', data = '', wid = '', + time = '', } local SPEC_T = { @@ -172,6 +173,8 @@ function _M:communicate(premature) return nil, "failed to decode event data: " .. err end + ngx.log(ngx.DEBUG, "worker-events [queue]: push sub_queue, data=", cjson_encode(d)) + -- got an event data, push to queue, callback in events_thread local ok, err = self._sub_queue:push(d) if not ok then @@ -198,6 +201,9 @@ function _M:communicate(premature) goto continue end + local obj = decode(decode(payload).data) + ngx.log(ngx.DEBUG, "worker-events [queue]: pop pub_queue, data=", cjson_encode(obj)) + if exiting() then return end @@ -232,6 +238,8 @@ function _M:communicate(premature) goto continue end + ngx.log(ngx.DEBUG, "worker-events [queue]: pop sub_queue, data=", cjson_encode(data)) + if exiting() then return end @@ -286,6 +294,11 @@ local function post_event(self, source, event, data, spec) EVENT_T.data = data EVENT_T.wid = _worker_id + ngx.update_time() + EVENT_T.time = ngx.now() + + ngx.log(ngx.DEBUG, "worker-events [queue]: push pub_queue, data=", cjson_encode(EVENT_T)) + -- encode event info str, err = encode(EVENT_T) @@ -331,6 +344,8 @@ function _M:publish(target, source, event, data) assert(type(source) == "string" and source ~= "", "source is required") assert(type(event) == "string" and event ~= "", "event is required") + log(DEBUG, "[publish] source=", source, ", event=", event, ", data=", cjson_encode(data)) + -- fall back to local events if self._opts.testing == true then log(DEBUG, "event published to 1 workers") @@ -345,6 +360,10 @@ function _M:publish(target, source, event, data) end if target == "current" then + log(DEBUG, "event published to local worker") + + ngx.log(ngx.DEBUG, "worker-events [queue]: push sub_queue, data=", cjson_encode(data)) + ok, err = self._sub_queue:push({ source = source, event = event,