Skip to content

Commit

Permalink
fix(worker/broker): threads were killed prematurely
Browse files Browse the repository at this point in the history
### Summary

This is discussed in FTI-5930 and a workaround for this issue was proposed in:
https://github.com/Kong/kong/pull/13004

Before (dbless node):

1. Worker 0 gets "reconfigure" event and acquires coroutine mutex
2. Worker 1 gets "reconfigure" event and acquires coroutine mutex
3. Worker 0 crashes and with it the events broker crashes too
4. Worker 1 restarts its events processing because of connection issue
   to events broker and it prematurely kills the executing "reconfigure"
   event handler as well, that then doesn't release its mutex.
5. Worker 0 restarts with pristine state (only the possible and
   time-outing node level locks are retained)
6. Worker 0 gets "reconfigure" event and acquires coroutine mutex
7. Worker 1 gets "reconfigure" event and is now in deadlock situation

After (dbless node):

1. Worker 0 gets "reconfigure" event and acquires coroutine mutex
2. Worker 1 gets "reconfigure" event and acquires coroutine mutex
3. Worker 0 crashes and with it the events broker crashes too
4. Worker 1 restarts its events processing because of connection issue
   to events broker but it finishes the execution of an executing event
   handlers allowing the release of locks
5. Worker 0 restarts with pristine state (only the possible and
   time-outing node level locks are retained)
6. Worker 0 gets "reconfigure" event and acquires coroutine mutex
7. Worker 1 gets "reconfigure" event and acquires coroutine mutex
  • Loading branch information
bungle committed May 17, 2024
1 parent 1454851 commit c58af36
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 260 deletions.
227 changes: 106 additions & 121 deletions lualib/resty/events/broker.lua
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
local cjson = require "cjson.safe"
local nkeys = require "table.nkeys"
local codec = require "resty.events.codec"
local lrucache = require "resty.lrucache"

local que = require "resty.events.queue"
local queue = require "resty.events.queue"
local server = require("resty.events.protocol").server
local is_timeout = server.is_timeout
local is_closed = server.is_closed

local pairs = pairs
local setmetatable = setmetatable
local str_sub = string.sub
local random = math.random

local ngx = ngx
local log = ngx.log
Expand All @@ -28,64 +25,125 @@ local decode = codec.decode
local cjson_encode = cjson.encode

local MAX_UNIQUE_EVENTS = 1024
local WEAK_KEYS_MT = { __mode = "k", }

local function is_closed(err)
return err and
(str_sub(err, -6) == "closed" or
str_sub(err, -11) == "broken pipe")
local function terminating(self, worker_connection)
return self._clients[worker_connection] == nil or exiting()
end

-- broadcast to all/unique workers
local function broadcast_events(self, unique, data)
local n = 0

-- if unique, schedule to a random worker
local idx = unique and random(1, nkeys(self._clients))
local function get_event_data(self, event_data)
local unique = event_data.spec.unique
if unique then
local uniques = self._uniques
if uniques:get(unique) then
if not exiting() then
log(DEBUG, "unique event is duplicate: ", unique)
end

for _, q in pairs(self._clients) do
return
end

-- skip some and broadcast to one workers
if unique then
idx = idx - 1
uniques:set(unique, 1, self._opts.unique_timeout)
end
return event_data.data, unique
end

if idx > 0 then
goto continue
end
end
-- broadcast to all/unique workers
local function broadcast_events(self, event_data)
local data, unique = get_event_data(self, event_data)
if not data then
return
end

local ok, err = q:push(data)
local n = 0

if not ok then
-- pairs is "random" enough for unique
for _, client_queue in pairs(self._clients) do
local _, err = client_queue:push(data)
if err then
log(ERR, "failed to publish event: ", err, ". ",
"data is :", cjson_encode(decode(data)))

else
n = n + 1

if unique then
break
end
end
end

log(DEBUG, "event published to ", n, " workers")
end

local function read_thread(self, worker_connection)
while not terminating(self, worker_connection) do
local data, err = worker_connection:recv_frame()
if err then
if not is_timeout(err) then
return nil, "failed to read event from worker: " .. err
end

-- timeout
goto continue
end

if not data then
if not exiting() then
log(ERR, "did not receive event from worker")
end
goto continue
end

local event_data, err = decode(data)
if not event_data then
if not exiting() then
log(ERR, "failed to decode event data: ", err)
end
goto continue
end

broadcast_events(self, event_data)

::continue::
end -- for q in pairs(_clients)
end -- while not exiting

log(DEBUG, "event published to ", n, " workers")
return true
end

local function write_thread(self, worker_connection)
while not terminating(self, worker_connection) do
local payload, err = self._clients[worker_connection]:pop()
if not payload then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

goto continue
end

local _, err = worker_connection:send_frame(payload)
if err then
return nil, "failed to send event: " .. err
end

::continue::
end -- while not exiting

return true
end

local _M = {
_VERSION = '0.1.3',
}

local _MT = { __index = _M, }

function _M.new(opts)
local self = {
return setmetatable({
_opts = opts,
_uniques = nil,
_clients = nil,
}

return setmetatable(self, _MT)
}, _MT)
end

function _M:init()
Expand All @@ -96,108 +154,33 @@ function _M:init()
return nil, "failed to create the events cache: " .. (err or "unknown")
end

local _clients = setmetatable({}, { __mode = "k", })

self._uniques = _uniques
self._clients = _clients
self._clients = setmetatable({}, WEAK_KEYS_MT)

return true
end

function _M:run()
local conn, err = server:new()

if not conn then
local worker_connection, err = server.new()
if not worker_connection then
log(ERR, "failed to init socket: ", err)
exit(444)
end

local queue = que.new(self._opts.max_queue_len)

self._clients[conn] = queue

local read_thread = spawn(function()
while not exiting() do
local data, err = conn:recv_frame()

if exiting() then
return
end

if err then
if not is_timeout(err) then
return nil, err
end

-- timeout
goto continue
end

if not data then
return nil, "did not receive event from worker"
end
self._clients[worker_connection] = queue.new(self._opts.max_queue_len)

local d, err
local read_thread_co = spawn(read_thread, self, worker_connection)
local write_thread_co = spawn(write_thread, self, worker_connection)

d, err = decode(data)
if not d then
log(ERR, "failed to decode event data: ", err)
goto continue
end
local ok, err, perr = wait(write_thread_co, read_thread_co)

-- unique event
local unique = d.spec.unique
if unique then
if self._uniques:get(unique) then
log(DEBUG, "unique event is duplicate: ", unique)
goto continue
end
self._clients[worker_connection] = nil

self._uniques:set(unique, 1, self._opts.unique_timeout)
end

-- broadcast to all/unique workers
broadcast_events(self, unique, d.data)

::continue::
end -- while not exiting
end) -- read_thread

local write_thread = spawn(function()
while not exiting() do
local payload, err = queue:pop()

if not payload then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

goto continue
end

if exiting() then
return
end

local _, err = conn:send_frame(payload)
if err then
log(ERR, "failed to send event: ", err)
end

if is_closed(err) then
return
end

::continue::
end -- while not exiting
end) -- write_thread

local ok, err, perr = wait(write_thread, read_thread)

self._clients[conn] = nil

kill(write_thread)
kill(read_thread)
if exiting() then
kill(read_thread_co)
kill(write_thread_co)
return
end

if not ok and not is_closed(err) then
log(ERR, "event broker failed: ", err)
Expand All @@ -209,8 +192,10 @@ function _M:run()
return exit(ngx.ERROR)
end

wait(read_thread_co)
wait(write_thread_co)

return exit(ngx.OK)
end

return _M

17 changes: 8 additions & 9 deletions lualib/resty/events/protocol.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ local function is_timeout(err)
return err and str_sub(err, -7) == "timeout"
end

local function is_closed(err)
return err and (str_sub(err, -6) == "closed" or
str_sub(err, -11) == "broken pipe")
end

local function recv_frame(self)
local sock = self.sock
if not sock then
Expand All @@ -32,7 +37,6 @@ local function recv_frame(self)
return _recv_frame(sock)
end


local function send_frame(self, payload)
local sock = self.sock
if not sock then
Expand All @@ -42,7 +46,6 @@ local function send_frame(self, payload)
return _send_frame(sock, payload)
end


local _Server = {
_VERSION = "0.1.0",
is_timeout = is_timeout,
Expand All @@ -52,9 +55,7 @@ local _Server = {

local _SERVER_MT = { __index = _Server, }


function _Server.new(self)

function _Server.new()
if subsystem == "http" then
if ngx.headers_sent then
return nil, "response header already sent"
Expand Down Expand Up @@ -90,15 +91,15 @@ end

local _Client = {
_VERSION = "0.1.0",
is_closed = is_closed,
is_timeout = is_timeout,
recv_frame = recv_frame,
send_frame = send_frame,
}

local _CLIENT_MT = { __index = _Client, }


function _Client.new(self)
function _Client.new()
local sock, err = tcp()
if not sock then
return nil, err
Expand All @@ -111,7 +112,6 @@ function _Client.new(self)
}, _CLIENT_MT)
end


function _Client.connect(self, addr)
local sock = self.sock
if not sock then
Expand Down Expand Up @@ -157,7 +157,6 @@ function _Client.connect(self, addr)
return true
end


return {
server = _Server,
client = _Client,
Expand Down
Loading

0 comments on commit c58af36

Please sign in to comment.