From c58af36c1eda019a5ee44685ec236d3717f208b9 Mon Sep 17 00:00:00 2001 From: Aapo Talvensaari Date: Tue, 14 May 2024 15:32:09 +0300 Subject: [PATCH] fix(worker/broker): threads were killed prematurely ### Summary This is discussed in FTI-5930 and a workaround for this issue was proposed in: https://github.com/Kong/kong/pull/13004 Before (dbless node): 1. Worker 0 gets "reconfigure" event and acquires coroutine mutex 2. Worker 1 gets "reconfigure" event and acquires coroutine mutex 3. Worker 0 crashes and with it the events broker crashes too 4. Worker 1 restarts its events processing because of connection issue to events broker and it prematurely kills the executing "reconfigure" event handler as well, that then doesn't release its mutex. 5. Worker 0 restarts with pristine state (only the possible and time-outing node level locks are retained) 6. Worker 0 gets "reconfigure" event and acquires coroutine mutex 7. Worker 1 gets "reconfigure" event and is now in deadlock situation After (dbless node): 1. Worker 0 gets "reconfigure" event and acquires coroutine mutex 2. Worker 1 gets "reconfigure" event and acquires coroutine mutex 3. Worker 0 crashes and with it the events broker crashes too 4. Worker 1 restarts its events processing because of connection issue to events broker but it finishes the execution of an executing event handlers allowing the release of locks 5. Worker 0 restarts with pristine state (only the possible and time-outing node level locks are retained) 6. Worker 0 gets "reconfigure" event and acquires coroutine mutex 7. Worker 1 gets "reconfigure" event and acquires coroutine mutex --- lualib/resty/events/broker.lua | 227 +++++++++++++-------------- lualib/resty/events/protocol.lua | 17 +-- lualib/resty/events/queue.lua | 11 +- lualib/resty/events/worker.lua | 255 ++++++++++++++++--------------- 4 files changed, 250 insertions(+), 260 deletions(-) diff --git a/lualib/resty/events/broker.lua b/lualib/resty/events/broker.lua index 537beb43..85b3c33b 100644 --- a/lualib/resty/events/broker.lua +++ b/lualib/resty/events/broker.lua @@ -1,16 +1,13 @@ local cjson = require "cjson.safe" -local nkeys = require "table.nkeys" local codec = require "resty.events.codec" local lrucache = require "resty.lrucache" - -local que = require "resty.events.queue" +local queue = require "resty.events.queue" local server = require("resty.events.protocol").server local is_timeout = server.is_timeout +local is_closed = server.is_closed local pairs = pairs local setmetatable = setmetatable -local str_sub = string.sub -local random = math.random local ngx = ngx local log = ngx.log @@ -28,37 +25,44 @@ local decode = codec.decode local cjson_encode = cjson.encode local MAX_UNIQUE_EVENTS = 1024 +local WEAK_KEYS_MT = { __mode = "k", } -local function is_closed(err) - return err and - (str_sub(err, -6) == "closed" or - str_sub(err, -11) == "broken pipe") +local function terminating(self, worker_connection) + return self._clients[worker_connection] == nil or exiting() end --- broadcast to all/unique workers -local function broadcast_events(self, unique, data) - local n = 0 - - -- if unique, schedule to a random worker - local idx = unique and random(1, nkeys(self._clients)) +local function get_event_data(self, event_data) + local unique = event_data.spec.unique + if unique then + local uniques = self._uniques + if uniques:get(unique) then + if not exiting() then + log(DEBUG, "unique event is duplicate: ", unique) + end - for _, q in pairs(self._clients) do + return + end - -- skip some and broadcast to one workers - if unique then - idx = idx - 1 + uniques:set(unique, 1, self._opts.unique_timeout) + end + return event_data.data, unique +end - if idx > 0 then - goto continue - end - end +-- broadcast to all/unique workers +local function broadcast_events(self, event_data) + local data, unique = get_event_data(self, event_data) + if not data then + return + end - local ok, err = q:push(data) + local n = 0 - if not ok then + -- pairs is "random" enough for unique + for _, client_queue in pairs(self._clients) do + local _, err = client_queue:push(data) + if err then log(ERR, "failed to publish event: ", err, ". ", "data is :", cjson_encode(decode(data))) - else n = n + 1 @@ -66,26 +70,80 @@ local function broadcast_events(self, unique, data) break end end + end + + log(DEBUG, "event published to ", n, " workers") +end + +local function read_thread(self, worker_connection) + while not terminating(self, worker_connection) do + local data, err = worker_connection:recv_frame() + if err then + if not is_timeout(err) then + return nil, "failed to read event from worker: " .. err + end + + -- timeout + goto continue + end + + if not data then + if not exiting() then + log(ERR, "did not receive event from worker") + end + goto continue + end + + local event_data, err = decode(data) + if not event_data then + if not exiting() then + log(ERR, "failed to decode event data: ", err) + end + goto continue + end + + broadcast_events(self, event_data) ::continue:: - end -- for q in pairs(_clients) + end -- while not exiting - log(DEBUG, "event published to ", n, " workers") + return true +end + +local function write_thread(self, worker_connection) + while not terminating(self, worker_connection) do + local payload, err = self._clients[worker_connection]:pop() + if not payload then + if not is_timeout(err) then + return nil, "semaphore wait error: " .. err + end + + goto continue + end + + local _, err = worker_connection:send_frame(payload) + if err then + return nil, "failed to send event: " .. err + end + + ::continue:: + end -- while not exiting + + return true end local _M = { _VERSION = '0.1.3', } + local _MT = { __index = _M, } function _M.new(opts) - local self = { + return setmetatable({ _opts = opts, _uniques = nil, _clients = nil, - } - - return setmetatable(self, _MT) + }, _MT) end function _M:init() @@ -96,108 +154,33 @@ function _M:init() return nil, "failed to create the events cache: " .. (err or "unknown") end - local _clients = setmetatable({}, { __mode = "k", }) - self._uniques = _uniques - self._clients = _clients + self._clients = setmetatable({}, WEAK_KEYS_MT) return true end function _M:run() - local conn, err = server:new() - - if not conn then + local worker_connection, err = server.new() + if not worker_connection then log(ERR, "failed to init socket: ", err) exit(444) end - local queue = que.new(self._opts.max_queue_len) - - self._clients[conn] = queue - - local read_thread = spawn(function() - while not exiting() do - local data, err = conn:recv_frame() - - if exiting() then - return - end - - if err then - if not is_timeout(err) then - return nil, err - end - - -- timeout - goto continue - end - - if not data then - return nil, "did not receive event from worker" - end + self._clients[worker_connection] = queue.new(self._opts.max_queue_len) - local d, err + local read_thread_co = spawn(read_thread, self, worker_connection) + local write_thread_co = spawn(write_thread, self, worker_connection) - d, err = decode(data) - if not d then - log(ERR, "failed to decode event data: ", err) - goto continue - end + local ok, err, perr = wait(write_thread_co, read_thread_co) - -- unique event - local unique = d.spec.unique - if unique then - if self._uniques:get(unique) then - log(DEBUG, "unique event is duplicate: ", unique) - goto continue - end + self._clients[worker_connection] = nil - self._uniques:set(unique, 1, self._opts.unique_timeout) - end - - -- broadcast to all/unique workers - broadcast_events(self, unique, d.data) - - ::continue:: - end -- while not exiting - end) -- read_thread - - local write_thread = spawn(function() - while not exiting() do - local payload, err = queue:pop() - - if not payload then - if not is_timeout(err) then - return nil, "semaphore wait error: " .. err - end - - goto continue - end - - if exiting() then - return - end - - local _, err = conn:send_frame(payload) - if err then - log(ERR, "failed to send event: ", err) - end - - if is_closed(err) then - return - end - - ::continue:: - end -- while not exiting - end) -- write_thread - - local ok, err, perr = wait(write_thread, read_thread) - - self._clients[conn] = nil - - kill(write_thread) - kill(read_thread) + if exiting() then + kill(read_thread_co) + kill(write_thread_co) + return + end if not ok and not is_closed(err) then log(ERR, "event broker failed: ", err) @@ -209,8 +192,10 @@ function _M:run() return exit(ngx.ERROR) end + wait(read_thread_co) + wait(write_thread_co) + return exit(ngx.OK) end return _M - diff --git a/lualib/resty/events/protocol.lua b/lualib/resty/events/protocol.lua index 19f96d7e..16668888 100644 --- a/lualib/resty/events/protocol.lua +++ b/lualib/resty/events/protocol.lua @@ -23,6 +23,11 @@ local function is_timeout(err) return err and str_sub(err, -7) == "timeout" end +local function is_closed(err) + return err and (str_sub(err, -6) == "closed" or + str_sub(err, -11) == "broken pipe") +end + local function recv_frame(self) local sock = self.sock if not sock then @@ -32,7 +37,6 @@ local function recv_frame(self) return _recv_frame(sock) end - local function send_frame(self, payload) local sock = self.sock if not sock then @@ -42,7 +46,6 @@ local function send_frame(self, payload) return _send_frame(sock, payload) end - local _Server = { _VERSION = "0.1.0", is_timeout = is_timeout, @@ -52,9 +55,7 @@ local _Server = { local _SERVER_MT = { __index = _Server, } - -function _Server.new(self) - +function _Server.new() if subsystem == "http" then if ngx.headers_sent then return nil, "response header already sent" @@ -90,6 +91,7 @@ end local _Client = { _VERSION = "0.1.0", + is_closed = is_closed, is_timeout = is_timeout, recv_frame = recv_frame, send_frame = send_frame, @@ -97,8 +99,7 @@ local _Client = { local _CLIENT_MT = { __index = _Client, } - -function _Client.new(self) +function _Client.new() local sock, err = tcp() if not sock then return nil, err @@ -111,7 +112,6 @@ function _Client.new(self) }, _CLIENT_MT) end - function _Client.connect(self, addr) local sock = self.sock if not sock then @@ -157,7 +157,6 @@ function _Client.connect(self, addr) return true end - return { server = _Server, client = _Client, diff --git a/lualib/resty/events/queue.lua b/lualib/resty/events/queue.lua index 5260afc7..be617d4e 100644 --- a/lualib/resty/events/queue.lua +++ b/lualib/resty/events/queue.lua @@ -1,7 +1,7 @@ local semaphore = require "ngx.semaphore" - local table_new = require "table.new" + local assert = assert local setmetatable = setmetatable local math_min = math.min @@ -10,14 +10,16 @@ local math_min = math.min local _M = {} local _MT = { __index = _M, } -local DEFAULT_QUEUE_LEN = 4096 + +local MAX_QUEUE_PREALLOCATE = 4096 + function _M.new(max_len) local self = { semaphore = assert(semaphore.new()), max = max_len, - elts = table_new(math_min(max_len, DEFAULT_QUEUE_LEN), 0), + elts = table_new(math_min(max_len, MAX_QUEUE_PREALLOCATE), 0), first = 0, last = -1, } @@ -28,7 +30,6 @@ end function _M:push(item) local last = self.last - if last - self.first + 1 >= self.max then return nil, "queue overflow" end @@ -50,7 +51,6 @@ function _M:pop() end local first = self.first - if first > self.last then return nil, "queue is empty" end @@ -58,7 +58,6 @@ function _M:pop() local item = self.elts[first] self.elts[first] = nil self.first = first + 1 - return item end diff --git a/lualib/resty/events/worker.lua b/lualib/resty/events/worker.lua index 40b6f1f9..e5915fa7 100644 --- a/lualib/resty/events/worker.lua +++ b/lualib/resty/events/worker.lua @@ -1,13 +1,12 @@ local cjson = require "cjson.safe" local codec = require "resty.events.codec" -local que = require "resty.events.queue" +local queue = require "resty.events.queue" local callback = require "resty.events.callback" +local frame_validate = require("resty.events.frame").validate local client = require("resty.events.protocol").client local is_timeout = client.is_timeout -local frame_validate = require("resty.events.frame").validate - local type = type local assert = assert local setmetatable = setmetatable @@ -65,14 +64,20 @@ local function random_delay() return random(10, 50) / 1000 end -local function do_event(self, d) - self._callback:do_event(d) +local function communicate(premature, self) + if premature then + return + end + + self:communicate() end local function start_timer(self, delay) - assert(timer_at(delay, function(premature) - self:communicate(premature) - end)) + assert(timer_at(delay, communicate, self)) +end + +local function terminating(self) + return self._connected ~= true or exiting() end local check_sock_exist @@ -80,7 +85,7 @@ do local ffi = require "ffi" local C = ffi.C ffi.cdef [[ - int access(const char *pathname, int mode); + int access(const char *pathname, int mode); ]] -- remove prefix 'unix:' @@ -94,8 +99,8 @@ function _M.new(opts) local max_queue_len = opts.max_queue_len local self = { - _pub_queue = que.new(max_queue_len), - _sub_queue = que.new(max_queue_len), + _pub_queue = queue.new(max_queue_len), + _sub_queue = queue.new(max_queue_len), _callback = callback.new(), _connected = nil, _opts = opts, @@ -104,12 +109,106 @@ function _M.new(opts) return setmetatable(self, _MT) end -function _M:communicate(premature) - if premature then - -- worker wants to exit - return - end +local function read_thread(self, broker_connection) + while not terminating(self) do + local data, err = broker_connection:recv_frame() + if err then + if not is_timeout(err) then + return nil, "failed to read event: " .. err + end + + -- timeout + goto continue + end + + if not data then + if not exiting() then + log(ERR, "did not receive event from broker") + end + goto continue + end + + local event_data, err = decode(data) + if err then + if not exiting() then + log(ERR, "failed to decode event data: ", err) + end + goto continue + end + + -- got an event data, push to queue, callback in events_thread + local _, err = self._sub_queue:push(event_data) + if err then + if not exiting() then + log(ERR, "failed to store event: ", err, ". data is: ", + cjson_encode(event_data)) + end + goto continue + end + + ::continue:: + end -- while not terminating + + return true +end + +local function write_thread(self, broker_connection) + local counter = 0 + + while not terminating(self) do + local payload, err = self._pub_queue:pop() + if err then + if not is_timeout(err) then + return nil, "semaphore wait error: " .. err + end + + -- timeout + goto continue + end + + local _, err = broker_connection:send_frame(payload) + if err then + return nil, "failed to send event: " .. err + end + + -- events rate limiting + counter = counter + 1 + if counter >= EVENTS_COUNT_LIMIT then + sleep(EVENTS_SLEEP_TIME) + counter = 0 + end + + ::continue:: + end -- while not terminating + + return true +end + +local function events_thread(self) + while not terminating(self) do + local data, err = self._sub_queue:pop() + if err then + if not is_timeout(err) then + return nil, "semaphore wait error: " .. err + end + + -- timeout + goto continue + end + + -- got an event data, callback + self._callback:do_event(data) + + -- yield, not block other threads + sleep(0) + + ::continue:: + end -- while not terminating + return true +end + +function _M:communicate() -- only for testing, skip read/write/events threads if self._opts.testing == true then self._connected = true @@ -126,9 +225,9 @@ function _M:communicate(premature) return end - local conn = assert(client:new()) + local broker_connection = assert(client.new()) - local ok, err = conn:connect(listening) + local ok, err = broker_connection:connect(listening) if exiting() then return @@ -143,118 +242,22 @@ function _M:communicate(premature) return end - self._connected = true log(DEBUG, _worker_id, " on (", listening, ") is ready") - local read_thread = spawn(function() - while not exiting() do - local data, err = conn:recv_frame() - - if exiting() then - return - end - - if err then - if not is_timeout(err) then - return nil, err - end - - -- timeout - goto continue - end - - if not data then - return nil, "did not receive event from broker" - end - - local d, err = decode(data) - if not d then - return nil, "failed to decode event data: " .. err - end - - -- got an event data, push to queue, callback in events_thread - local ok, err = self._sub_queue:push(d) - if not ok then - log(ERR, "failed to store event: ", err, ". ", - "data is :", cjson_encode(d)) - end - - ::continue:: - end -- while not exiting - end) -- read_thread - - local write_thread = spawn(function() - local counter = 0 - - while not exiting() do - local payload, err = self._pub_queue:pop() - - if not payload then - if not is_timeout(err) then - return nil, "semaphore wait error: " .. err - end - - -- timeout - goto continue - end - - if exiting() then - return - end - - local _, err = conn:send_frame(payload) - if err then - log(ERR, "failed to send event: ", err) - return - end - - -- events rate limiting - counter = counter + 1 - if counter >= EVENTS_COUNT_LIMIT then - sleep(EVENTS_SLEEP_TIME) - counter = 0 - end - - ::continue:: - end -- while not exiting - end) -- write_thread - - local events_thread = spawn(function() - while not exiting() do - local data, err = self._sub_queue:pop() - - if not data then - if not is_timeout(err) then - return nil, "semaphore wait error: " .. err - end - - -- timeout - goto continue - end - - if exiting() then - return - end - - -- got an event data, callback - do_event(self, data) - - -- yield, not block other threads - sleep(0) - - ::continue:: - end -- while not exiting - end) -- events_thread + self._connected = true - local ok, err, perr = wait(write_thread, read_thread, events_thread) + local read_thread_co = spawn(read_thread, self, broker_connection) + local write_thread_co = spawn(write_thread, self, broker_connection) + local events_thread_co = spawn(events_thread, self) - kill(write_thread) - kill(read_thread) - kill(events_thread) + local ok, err, perr = wait(write_thread_co, read_thread_co, events_thread_co) self._connected = nil if exiting() then + kill(read_thread_co) + kill(write_thread_co) + kill(events_thread_co) return end @@ -266,6 +269,10 @@ function _M:communicate(premature) log(ERR, "event worker failed: ", perr) end + wait(read_thread_co) + wait(write_thread_co) + wait(events_thread_co) + start_timer(self, random_delay()) end @@ -335,7 +342,7 @@ function _M:publish(target, source, event, data) if self._opts.testing == true then log(DEBUG, "event published to 1 workers") - do_event(self, { + self._callback:do_event({ source = source, event = event, data = data, @@ -368,7 +375,7 @@ end function _M:subscribe(source, event, callback) assert(type(source) == "string" and source ~= "", "source is required") assert(type(event) == "string" and event ~= "", "event is required") - assert(type(callback) == "function", "expected function, got: ".. + assert(type(callback) == "function", "expected function, got: " .. type(callback)) return self._callback:subscribe(source, event, callback)