Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(worker/broker): events thread was killed prematurely #45

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 106 additions & 121 deletions lualib/resty/events/broker.lua
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
local cjson = require "cjson.safe"
local nkeys = require "table.nkeys"
local codec = require "resty.events.codec"
local lrucache = require "resty.lrucache"

local que = require "resty.events.queue"
local queue = require "resty.events.queue"
local server = require("resty.events.protocol").server
local is_timeout = server.is_timeout
local is_closed = server.is_closed

local pairs = pairs
local setmetatable = setmetatable
local str_sub = string.sub
local random = math.random

local ngx = ngx
local log = ngx.log
Expand All @@ -28,64 +25,125 @@ local decode = codec.decode
local cjson_encode = cjson.encode

local MAX_UNIQUE_EVENTS = 1024
local WEAK_KEYS_MT = { __mode = "k", }

local function is_closed(err)
return err and
(str_sub(err, -6) == "closed" or
str_sub(err, -11) == "broken pipe")
local function terminating(self, worker_connection)
return self._clients[worker_connection] == nil or exiting()
end

-- broadcast to all/unique workers
local function broadcast_events(self, unique, data)
local n = 0

-- if unique, schedule to a random worker
local idx = unique and random(1, nkeys(self._clients))
local function get_event_data(self, event_data)
local unique = event_data.spec.unique
if unique then
local uniques = self._uniques
if uniques:get(unique) then
if not exiting() then
log(DEBUG, "unique event is duplicate: ", unique)
end

for _, q in pairs(self._clients) do
return
end

-- skip some and broadcast to one workers
if unique then
idx = idx - 1
uniques:set(unique, 1, self._opts.unique_timeout)
end
return event_data.data, unique
end

if idx > 0 then
goto continue
end
end
-- broadcast to all/unique workers
local function broadcast_events(self, event_data)
local data, unique = get_event_data(self, event_data)
if not data then
return
end

local ok, err = q:push(data)
local n = 0

if not ok then
-- pairs is "random" enough for unique
for _, client_queue in pairs(self._clients) do
Comment on lines +60 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding is that the order of iteration of pairs depends on the value of the hash of each key in the table, so while this is in fact "random" or unpredictable when keys are created/rehashed, doesn't it mean the iteration order will stay the same for the self._clients table until then? I.e. would this implementation risk to run "unique" events on the same worker repeatedly?

Copy link
Member Author

@bungle bungle May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samugi it will mean a bit like that. Order may change or it may not. It is undefined. We don't almost ever use unique events in Kong and what this does is that then runs it in a same worker. Not a huge deal. There is a bigger benefit of it actually retrying it on other worker if it fails on previous. So it is more robust, but does not balance the load the same as before, though I have no idea why we would need that.

On successs there is break. Unique basically means "run on any one worker".

local _, err = client_queue:push(data)
if err then
log(ERR, "failed to publish event: ", err, ". ",
"data is :", cjson_encode(decode(data)))

else
n = n + 1

if unique then
break
end
end
end

log(DEBUG, "event published to ", n, " workers")
end

local function read_thread(self, worker_connection)
while not terminating(self, worker_connection) do
local data, err = worker_connection:recv_frame()
if err then
if not is_timeout(err) then
return nil, "failed to read event from worker: " .. err
end

-- timeout
goto continue
end

if not data then
if not exiting() then
log(ERR, "did not receive event from worker")
end
goto continue
end

local event_data, err = decode(data)
if not event_data then
if not exiting() then
log(ERR, "failed to decode event data: ", err)
end
goto continue
end

broadcast_events(self, event_data)

::continue::
end -- for q in pairs(_clients)
end -- while not exiting

log(DEBUG, "event published to ", n, " workers")
return true
end

local function write_thread(self, worker_connection)
while not terminating(self, worker_connection) do
local payload, err = self._clients[worker_connection]:pop()
if not payload then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

goto continue
end

local _, err = worker_connection:send_frame(payload)
if err then
return nil, "failed to send event: " .. err
end

::continue::
end -- while not exiting

return true
end

local _M = {
_VERSION = '0.1.3',
}

local _MT = { __index = _M, }

function _M.new(opts)
local self = {
return setmetatable({
_opts = opts,
_uniques = nil,
_clients = nil,
}

return setmetatable(self, _MT)
}, _MT)
end

function _M:init()
Expand All @@ -96,108 +154,33 @@ function _M:init()
return nil, "failed to create the events cache: " .. (err or "unknown")
end

local _clients = setmetatable({}, { __mode = "k", })

self._uniques = _uniques
self._clients = _clients
self._clients = setmetatable({}, WEAK_KEYS_MT)

return true
end

function _M:run()
local conn, err = server:new()

if not conn then
local worker_connection, err = server.new()
if not worker_connection then
log(ERR, "failed to init socket: ", err)
exit(444)
end

local queue = que.new(self._opts.max_queue_len)

self._clients[conn] = queue

local read_thread = spawn(function()
while not exiting() do
local data, err = conn:recv_frame()

if exiting() then
return
end

if err then
if not is_timeout(err) then
return nil, err
end

-- timeout
goto continue
end

if not data then
return nil, "did not receive event from worker"
end
self._clients[worker_connection] = queue.new(self._opts.max_queue_len)

local d, err
local read_thread_co = spawn(read_thread, self, worker_connection)
local write_thread_co = spawn(write_thread, self, worker_connection)

d, err = decode(data)
if not d then
log(ERR, "failed to decode event data: ", err)
goto continue
end
local ok, err, perr = wait(read_thread_co, write_thread_co)

-- unique event
local unique = d.spec.unique
if unique then
if self._uniques:get(unique) then
log(DEBUG, "unique event is duplicate: ", unique)
goto continue
end
self._clients[worker_connection] = nil

self._uniques:set(unique, 1, self._opts.unique_timeout)
end

-- broadcast to all/unique workers
broadcast_events(self, unique, d.data)

::continue::
end -- while not exiting
end) -- read_thread

local write_thread = spawn(function()
while not exiting() do
local payload, err = queue:pop()

if not payload then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

goto continue
end

if exiting() then
return
end

local _, err = conn:send_frame(payload)
if err then
log(ERR, "failed to send event: ", err)
end

if is_closed(err) then
return
end

::continue::
end -- while not exiting
end) -- write_thread

local ok, err, perr = wait(write_thread, read_thread)

self._clients[conn] = nil

kill(write_thread)
kill(read_thread)
if exiting() then
kill(read_thread_co)
kill(write_thread_co)
return
end

if not ok and not is_closed(err) then
log(ERR, "event broker failed: ", err)
Expand All @@ -209,8 +192,10 @@ function _M:run()
return exit(ngx.ERROR)
end

wait(read_thread_co)
wait(write_thread_co)

return exit(ngx.OK)
end

return _M

Loading
Loading