Skip to content

Commit

Permalink
perf(worker) optimize worker queue (#17)
Browse files Browse the repository at this point in the history
* rename queue

* rename to events_thread

* push sub events in queue

* change error msg

* log event data when push queue failed

* bump to 0.1.2

* small clean

* check broken pipe

* yield in events_thread
  • Loading branch information
chronolaw authored Jul 11, 2022
1 parent 9212740 commit 8c984bf
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 17 deletions.
12 changes: 9 additions & 3 deletions lualib/resty/events/broker.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local cjson = require "cjson.safe"
local codec = require "resty.events.codec"
local lrucache = require "resty.lrucache"

Expand All @@ -22,10 +23,14 @@ local wait = ngx.thread.wait

local decode = codec.decode

local cjson_encode = cjson.encode

local MAX_UNIQUE_EVENTS = 1024

local function is_closed(err)
return err and str_sub(err, -6) == "closed"
return err and
(str_sub(err, -6) == "closed" or
str_sub(err, -11) == "broken pipe")
end

local _M = {
Expand Down Expand Up @@ -96,7 +101,7 @@ function _M:run()

d, err = decode(data)
if not d then
log(ERR, "worker-events: failed decoding event data: ", err)
log(ERR, "failed to decode event data: ", err)
goto continue
end

Expand All @@ -117,7 +122,8 @@ function _M:run()
local ok, err = q:push(d.data)

if not ok then
log(ERR, "failed to publish event: ", err)
log(ERR, "failed to publish event: ", err, ". ",
"data is :", cjson_encode(decode(d.data)))

else
n = n + 1
Expand Down
2 changes: 1 addition & 1 deletion lualib/resty/events/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ local str_sub = string.sub
local worker_count = ngx.worker.count()

local _M = {
_VERSION = '0.1.1',
_VERSION = '0.1.2',
}
local _MT = { __index = _M, }

Expand Down
36 changes: 23 additions & 13 deletions lualib/resty/events/worker.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local cjson = require "cjson.safe"
local codec = require "resty.events.codec"
local que = require "resty.events.queue"
local callback = require "resty.events.callback"
Expand All @@ -12,6 +13,7 @@ local random = math.random

local ngx = ngx
local log = ngx.log
local sleep = ngx.sleep
local exiting = ngx.worker.exiting
local ERR = ngx.ERR
local DEBUG = ngx.DEBUG
Expand All @@ -24,6 +26,7 @@ local timer_at = ngx.timer.at

local encode = codec.encode
local decode = codec.decode
local cjson_encode = cjson.encode

local EMPTY_T = {}

Expand Down Expand Up @@ -85,8 +88,8 @@ function _M.new(opts)
local max_queue_len = opts.max_queue_len

local self = {
_queue = que.new(max_queue_len),
_current_queue = que.new(max_queue_len),
_pub_queue = que.new(max_queue_len),
_sub_queue = que.new(max_queue_len),
_callback = callback.new(),
_connected = nil,
_opts = opts,
Expand Down Expand Up @@ -149,19 +152,23 @@ function _M:communicate(premature)

local d, err = decode(data)
if not d then
return nil, "worker-events: failed decoding event data: " .. err
return nil, "failed to decode event data: " .. err
end

-- got an event data, callback
do_event(self, d)
-- got an event data, push to queue, callback in events_thread
local ok, err = self._sub_queue:push(d)
if not ok then
log(ERR, "failed to store event: ", err, ". ",
"data is :", cjson_encode(d))
end

::continue::
end -- while not exiting
end) -- read_thread

local write_thread = spawn(function()
while not exiting() do
local payload, err = self._queue:pop()
local payload, err = self._pub_queue:pop()

if not payload then
if not is_timeout(err) then
Expand All @@ -186,9 +193,9 @@ function _M:communicate(premature)
end -- while not exiting
end) -- write_thread

local current_thread = spawn(function()
local events_thread = spawn(function()
while not exiting() do
local data, err = self._current_queue:pop()
local data, err = self._sub_queue:pop()

if not data then
if not is_timeout(err) then
Expand All @@ -206,15 +213,18 @@ function _M:communicate(premature)
-- got an event data, callback
do_event(self, data)

-- yield, not block other threads
sleep(0)

::continue::
end -- while not exiting
end) -- current_thread
end) -- events_thread

local ok, err, perr = wait(write_thread, read_thread, current_thread)
local ok, err, perr = wait(write_thread, read_thread, events_thread)

kill(write_thread)
kill(read_thread)
kill(current_thread)
kill(events_thread)

self._connected = nil

Expand Down Expand Up @@ -265,7 +275,7 @@ local function post_event(self, source, event, data, spec)
return nil, err
end

local ok, err = self._queue:push(str)
local ok, err = self._pub_queue:push(str)
if not ok then
return nil, "failed to publish event: " .. err
end
Expand All @@ -285,7 +295,7 @@ function _M:publish(target, source, event, data)
assert(type(event) == "string" and event ~= "", "event is required")

if target == "current" then
ok, err = self._current_queue:push({
ok, err = self._sub_queue:push({
source = source,
event = event,
data = data,
Expand Down

0 comments on commit 8c984bf

Please sign in to comment.