Skip to content

Commit

Permalink
feat(queue): implement functions Quque.is_full & Queue.can_enqueue (
Browse files Browse the repository at this point in the history
#13164)

* feat(queue): implement functions `Quque.is_full` & `Queue.will_full`

Generating entries is expensive in some cases,
so we'd better have a way to observe the state of the Queue.

Co-authored-by: Hans Hübner <[email protected]>
Co-authored-by: Chrono <[email protected]>
  • Loading branch information
3 people authored Jun 7, 2024
1 parent c09a099 commit d1994f7
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 33 deletions.
226 changes: 193 additions & 33 deletions kong/tools/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,106 @@ local Queue_mt = {
}


local function make_queue_key(name)
local function _make_queue_key(name)
return (workspaces.get_workspace_id() or "") .. "." .. name
end


local function _remaining_capacity(self)
local remaining_entries = self.max_entries - self:count()
local max_bytes = self.max_bytes

-- we enqueue entries one by one,
-- so it is impossible to have a negative value
assert(remaining_entries >= 0, "queue should not be over capacity")

if not max_bytes then
return remaining_entries
end

local remaining_bytes = max_bytes - self.bytes_queued

-- we check remaining_bytes before enqueueing an entry,
-- so it is impossible to have a negative value
assert(remaining_bytes >= 0, "queue should not be over capacity")

return remaining_entries, remaining_bytes
end


local function _is_reaching_max_entries(self)
-- `()` is used to get the first return value only
return (_remaining_capacity(self)) == 0
end


local function _will_exceed_max_entries(self)
-- `()` is used to get the first return value only
return (_remaining_capacity(self)) - 1 < 0
end


local function _is_entry_too_large(self, entry)
local max_bytes = self.max_bytes

if not max_bytes then
return false
end

if type(entry) ~= "string" then
-- handle non-string entry, including `nil`
return false
end

return #entry > max_bytes
end


local function _is_reaching_max_bytes(self)
if not self.max_bytes then
return false
end

local _, remaining_bytes = _remaining_capacity(self)
return remaining_bytes == 0
end


local function _will_exceed_max_bytes(self, entry)
if not self.max_bytes then
return false
end

if type(entry) ~= "string" then
-- handle non-string entry, including `nil`
return false
end

local _, remaining_bytes = _remaining_capacity(self)
return #entry > remaining_bytes
end


local function _is_full(self)
return _is_reaching_max_entries(self) or _is_reaching_max_bytes(self)
end


local function _can_enqueue(self, entry)
return not (
_is_full(self) or
_is_entry_too_large(self, entry) or
_will_exceed_max_entries(self) or
_will_exceed_max_bytes(self, entry)
)
end


local queues = {}


function Queue.exists(name)
return queues[make_queue_key(name)] and true or false
return queues[_make_queue_key(name)] and true or false
end

-------------------------------------------------------------------------------
Expand All @@ -115,7 +205,7 @@ end
local function get_or_create_queue(queue_conf, handler, handler_conf)

local name = assert(queue_conf.name)
local key = make_queue_key(name)
local key = _make_queue_key(name)

local queue = queues[key]
if queue then
Expand Down Expand Up @@ -193,6 +283,28 @@ function Queue:count()
end


function Queue.is_full(queue_conf)
local queue = queues[_make_queue_key(queue_conf.name)]
if not queue then
-- treat non-existing queues as not full as they will be created on demand
return false
end

return _is_full(queue)
end


function Queue.can_enqueue(queue_conf, entry)
local queue = queues[_make_queue_key(queue_conf.name)]
if not queue then
-- treat non-existing queues as not full as they will be created on demand
return false
end

return _can_enqueue(queue, entry)
end


-- Delete the frontmost entry from the queue and adjust the current utilization variables.
function Queue:delete_frontmost_entry()
if self.max_bytes then
Expand Down Expand Up @@ -260,9 +372,9 @@ function Queue:process_once()
for _ = 1, entry_count do
self:delete_frontmost_entry()
end
if self.queue_full then
if self.already_dropped_entries then
self:log_info('queue resumed processing')
self.queue_full = false
self.already_dropped_entries = false
end

local start_time = now()
Expand Down Expand Up @@ -393,38 +505,54 @@ local function enqueue(self, entry)
self.warned = nil
end

if self:count() == self.max_entries then
if not self.queue_full then
self.queue_full = true
self:log_err("queue full, dropping old entries until processing is successful again")
end
if _is_reaching_max_entries(self) then
self:log_err("queue full, dropping old entries until processing is successful again")
self:drop_oldest_entry()
self.already_dropped_entries = true
end

if _is_entry_too_large(self, entry) then
local err_msg = string.format(
"string to be queued is longer (%d bytes) than the queue's max_bytes (%d bytes)",
#entry,
self.max_bytes
)
self:log_err(err_msg)

return nil, err_msg
end

if _will_exceed_max_bytes(self, entry) then
local dropped = 0

repeat
self:drop_oldest_entry()
dropped = dropped + 1
self.already_dropped_entries = true
until not _will_exceed_max_bytes(self, entry)

self:log_err("byte capacity exceeded, %d queue entries were dropped", dropped)
end

-- safety guard
-- The queue should not be full if we are running into this situation.
-- Since the dropping logic is complicated,
-- further maintenancers might introduce bugs,
-- so I added this assertion to detect this kind of bug early.
-- It's better to crash early than leak memory
-- as analyze memory leak is hard.
assert(
-- assert that enough space is available on the queue now
_can_enqueue(self, entry),
"queue should not be full after dropping entries"
)

if self.max_bytes then
if type(entry) ~= "string" then
self:log_err("queuing non-string entry to a queue that has queue.max_bytes set, capacity monitoring will not be correct")
else
if #entry > self.max_bytes then
local message = string.format(
"string to be queued is longer (%d bytes) than the queue's max_bytes (%d bytes)",
#entry, self.max_bytes)
self:log_err(message)
return nil, message
end

local dropped = 0
while self:count() > 0 and (self.bytes_queued + #entry) > self.max_bytes do
self:drop_oldest_entry()
dropped = dropped + 1
end
if dropped > 0 then
self.queue_full = true
self:log_err("byte capacity exceeded, %d queue entries were dropped", dropped)
end

self.bytes_queued = self.bytes_queued + #entry
end

self.bytes_queued = self.bytes_queued + #entry
end

self.entries[self.back] = entry
Expand All @@ -442,24 +570,56 @@ function Queue.enqueue(queue_conf, handler, handler_conf, value)
assert(type(handler) == "function",
"arg #2 (handler) must be a function")
assert(handler_conf == nil or type(handler_conf) == "table",
"arg #3 (handler_conf) must be a table")

"arg #3 (handler_conf) must be a table or nil")
assert(type(queue_conf.name) == "string",
"arg #1 (queue_conf) must include a name")

assert(
type(queue_conf.max_batch_size) == "number",
"arg #1 (queue_conf) max_batch_size must be a number"
)
assert(
type(queue_conf.max_coalescing_delay) == "number",
"arg #1 (queue_conf) max_coalescing_delay must be a number"
)
assert(
type(queue_conf.max_entries) == "number",
"arg #1 (queue_conf) max_entries must be a number"
)
assert(
type(queue_conf.max_retry_time) == "number",
"arg #1 (queue_conf) max_retry_time must be a number"
)
assert(
type(queue_conf.initial_retry_delay) == "number",
"arg #1 (queue_conf) initial_retry_delay must be a number"
)
assert(
type(queue_conf.max_retry_delay) == "number",
"arg #1 (queue_conf) max_retry_delay must be a number"
)

local max_bytes_type = type(queue_conf.max_bytes)
assert(
max_bytes_type == "nil" or max_bytes_type == "number",
"arg #1 (queue_conf) max_bytes must be a number or nil"
)

local queue = get_or_create_queue(queue_conf, handler, handler_conf)
return enqueue(queue, value)
end

-- For testing, the _exists() function is provided to allow a test to wait for the
-- queue to have been completely processed.
function Queue._exists(name)
local queue = queues[make_queue_key(name)]
local queue = queues[_make_queue_key(name)]
return queue and queue:count() > 0
end


-- [[ For testing purposes only
Queue._CAPACITY_WARNING_THRESHOLD = CAPACITY_WARNING_THRESHOLD
-- ]]


return Queue
76 changes: 76 additions & 0 deletions spec/01-unit/27-queue_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -788,4 +788,80 @@ describe("plugin queue", function()
assert.match_re(log_messages, 'WARN \\[\\] queue continue-processing: handler could not process entries: .*: hard error')
assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries, giving up after \\d retries. 1 queue entries were lost')
end)

it("sanity check for function Queue.is_full() & Queue.can_enqueue()", function()
local queue_conf = {
name = "queue-full-checking-too-many-entries",
max_batch_size = 99999, -- avoiding automatically flushing,
max_entries = 2,
max_bytes = nil, -- avoiding bytes limit
max_coalescing_delay = 99999, -- avoiding automatically flushing,
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
}

local function enqueue(queue_conf, entry)
Queue.enqueue(
queue_conf,
function()
return true
end,
nil,
entry
)
end

assert.is_false(Queue.is_full(queue_conf))
assert.is_false(Queue.can_enqueue(queue_conf, "One"))
enqueue(queue_conf, "One")
assert.is_false(Queue.is_full(queue_conf))

assert.is_true(Queue.can_enqueue(queue_conf, "Two"))
enqueue(queue_conf, "Two")
assert.is_true(Queue.is_full(queue_conf))

assert.is_false(Queue.can_enqueue(queue_conf, "Three"))


queue_conf = {
name = "queue-full-checking-too-many-bytes",
max_batch_size = 99999, -- avoiding automatically flushing,
max_entries = 99999, -- big enough to avoid entries limit
max_bytes = 2,
max_coalescing_delay = 99999, -- avoiding automatically flushing,
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
}

assert.is_false(Queue.is_full(queue_conf))
assert.is_false(Queue.can_enqueue(queue_conf, "1"))
enqueue(queue_conf, "1")
assert.is_false(Queue.is_full(queue_conf))

assert.is_true(Queue.can_enqueue(queue_conf, "2"))
enqueue(queue_conf, "2")
assert.is_true(Queue.is_full(queue_conf))

assert.is_false(Queue.can_enqueue(queue_conf, "3"))

queue_conf = {
name = "queue-full-checking-too-large-entry",
max_batch_size = 99999, -- avoiding automatically flushing,
max_entries = 99999, -- big enough to avoid entries limit
max_bytes = 3,
max_coalescing_delay = 99999, -- avoiding automatically flushing,
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
}

enqueue(queue_conf, "1")

assert.is_false(Queue.is_full(queue_conf))
assert.is_true(Queue.can_enqueue(queue_conf, "1"))
assert.is_true(Queue.can_enqueue(queue_conf, "11"))
assert.is_false(Queue.can_enqueue(queue_conf, "111"))
end)
end)

1 comment on commit d1994f7

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:d1994f7d9121f2ca22ccbffcaf146cbc4ff6b14b
Artifacts available https://github.com/Kong/kong/actions/runs/9413579273

Please sign in to comment.