Skip to content

Commit

Permalink
print data for debug
Browse files Browse the repository at this point in the history
fix typo

add time field for debug

debug queue

debug log

queue log

log change

log format

fix

logs

decode binary data

fix broker

push/pop log

log
  • Loading branch information
chronolaw committed Nov 1, 2023
1 parent 51007b9 commit 147036e
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 3 deletions.
2 changes: 1 addition & 1 deletion lualib/resty/events/broker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ local cjson_encode = cjson.encode
local MAX_UNIQUE_EVENTS = 1024

local function is_closed(err)
return err and
return type(err) == "string" and
(str_sub(err, -6) == "closed" or
str_sub(err, -11) == "broken pipe")
end
Expand Down
18 changes: 17 additions & 1 deletion lualib/resty/events/callback.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,19 @@ function _M:do_event(d)
local event = d.event
local data = d.data
local wid = d.wid
local time = d.time

ngx.update_time()
local now = ngx.now()

if time then
log(DEBUG, "worker-events [receive]: source=", source,
", event=", event, ", wid=", wid, ", time=", now - time,
", data=", encode(data))
end

log(DEBUG, "worker-events: handling event; source=", source,
", event=", event, ", wid=", wid)
", event=", event, ", wid=", wid, ", data=", encode(data))

local funcs = self._funcs
local list
Expand All @@ -128,6 +138,12 @@ function _M:do_event(d)
-- source+event callback
list = get_callback_list(self, source, event)
do_handlerlist(funcs, list, source, event, data, wid)

ngx.update_time()
log(DEBUG, "worker-events [done] : source=", source,
", event=", event, ", wid=", wid, ", time=", ngx.now() - now,
", data=", encode(data))

end

return _M
1 change: 1 addition & 0 deletions lualib/resty/events/codec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ end
local options = {
dict = { "spec", "data",
"source", "event", "wid", "unique",
"time",
},
}

Expand Down
26 changes: 25 additions & 1 deletion lualib/resty/events/queue.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
local cjson = require "cjson.safe"
local codec = require "resty.events.codec"
local semaphore = require "ngx.semaphore"

local table_new = require "table.new"
Expand All @@ -6,6 +8,8 @@ local assert = assert
local setmetatable = setmetatable
local math_min = math.min

local decode = codec.decode
local cjson_encode = cjson.encode

local _M = {}
local _MT = { __index = _M, }
Expand All @@ -20,23 +24,36 @@ function _M.new(max_len)
elts = table_new(math_min(max_len, DEFAULT_QUEUE_LEN), 0),
first = 0,
last = -1,

-- debug
outcome = 0,
income = 0,
}

ngx.log(ngx.DEBUG, "worker-events [queue]: init, max_len=", self.max_len)

return setmetatable(self, _MT)
end


function _M:push(item)
local last = self.last

if last - self.first + 1 >= self.max then
local count = last - self.first + 1

ngx.log(ngx.DEBUG, "worker-events [queue]: push , len=", count,
", income=", self.income, ", outcome=", self.outcome)

if count >= self.max then
return nil, "queue overflow"
end

last = last + 1
self.last = last
self.elts[last] = item

self.income = self.income + 1

self.semaphore:post()

return true
Expand All @@ -59,6 +76,13 @@ function _M:pop()
self.elts[first] = nil
self.first = first + 1

self.outcome = self.outcome + 1

local count = self.last - self.first + 1

ngx.log(ngx.DEBUG, "worker-events [queue]: pop , len=", count,
", income=", self.income, ", outcome=", self.outcome)

return item
end

Expand Down
19 changes: 19 additions & 0 deletions lualib/resty/events/worker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ local EVENT_T = {
event = '',
data = '',
wid = '',
time = '',
}

local SPEC_T = {
Expand Down Expand Up @@ -172,6 +173,8 @@ function _M:communicate(premature)
return nil, "failed to decode event data: " .. err
end

ngx.log(ngx.DEBUG, "worker-events [queue]: push sub_queue, data=", cjson_encode(d))

-- got an event data, push to queue, callback in events_thread
local ok, err = self._sub_queue:push(d)
if not ok then
Expand All @@ -198,6 +201,9 @@ function _M:communicate(premature)
goto continue
end

local obj = decode(decode(payload).data)
ngx.log(ngx.DEBUG, "worker-events [queue]: pop pub_queue, data=", cjson_encode(obj))

if exiting() then
return
end
Expand Down Expand Up @@ -232,6 +238,8 @@ function _M:communicate(premature)
goto continue
end

ngx.log(ngx.DEBUG, "worker-events [queue]: pop sub_queue, data=", cjson_encode(data))

if exiting() then
return
end
Expand Down Expand Up @@ -286,6 +294,11 @@ local function post_event(self, source, event, data, spec)
EVENT_T.data = data
EVENT_T.wid = _worker_id

ngx.update_time()
EVENT_T.time = ngx.now()

ngx.log(ngx.DEBUG, "worker-events [queue]: push pub_queue, data=", cjson_encode(EVENT_T))

-- encode event info
str, err = encode(EVENT_T)

Expand Down Expand Up @@ -331,6 +344,8 @@ function _M:publish(target, source, event, data)
assert(type(source) == "string" and source ~= "", "source is required")
assert(type(event) == "string" and event ~= "", "event is required")

log(DEBUG, "[publish] source=", source, ", event=", event, ", data=", cjson_encode(data))

-- fall back to local events
if self._opts.testing == true then
log(DEBUG, "event published to 1 workers")
Expand All @@ -345,6 +360,10 @@ function _M:publish(target, source, event, data)
end

if target == "current" then
log(DEBUG, "event published to local worker")

ngx.log(ngx.DEBUG, "worker-events [queue]: push sub_queue, data=", cjson_encode(data))

ok, err = self._sub_queue:push({
source = source,
event = event,
Expand Down

0 comments on commit 147036e

Please sign in to comment.