Skip to content

Commit

Permalink
feat(testing): add reconfiguration completion detection mechanism
Browse files Browse the repository at this point in the history
This change adds a new response header X-Kong-Transaction-Id to the
Admin API.  It contains the (ever incrementing) PostgreSQL transaction
ID of the change that was made.  The value can then be put into the
X-If-Kong-Transaction-Id variable in a request to the proxy path.  The
request will be rejected with a 503 error if the proxy path has not
been reconfigured yet with this or a later transaction id.

The mechanism is useful in testing, when changes are made through the
Admin API and the effects on the proxy path are then to be verified.
Rather than waiting for a static period or retrying the proxy path
request until the expected result is received, the proxy path client
specifies the last transaction ID received from the Admin API in the
X-If-Kong-Transaction-Id header and retries the request if a 503 error
is received.
  • Loading branch information
hanshuebner committed Oct 27, 2023
1 parent 310a50b commit 1627287
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 79 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/reconfiguration-completion-detection.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: Provide mechanism to detect completion of reconfiguration on the proxy path
type: feature
scope: Core
11 changes: 9 additions & 2 deletions kong/clustering/config_helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,12 @@ local function fill_empty_hashes(hashes)
end
end

function _M.update(declarative_config, config_table, config_hash, hashes)
function _M.update(declarative_config, msg)

local config_table = msg.config_table
local config_hash = msg.config_hash
local hashes = msg.hashes

assert(type(config_table) == "table")

if not config_hash then
Expand Down Expand Up @@ -236,11 +241,13 @@ function _M.update(declarative_config, config_table, config_hash, hashes)
-- executed by worker 0

local res
res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes)
res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes, msg.current_transaction_id)
if not res then
return nil, err
end

ngx_log(ngx.NOTICE, "loaded configuration with transaction ID " .. msg.current_transaction_id)

return true
end

Expand Down
5 changes: 5 additions & 0 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ local compat = require("kong.clustering.compat")
local constants = require("kong.constants")
local events = require("kong.clustering.events")
local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash
local global = require("kong.global")


local string = string
Expand Down Expand Up @@ -115,8 +116,10 @@ function _M:export_deflated_reconfigure_payload()

local config_hash, hashes = calculate_config_hash(config_table)

local current_transaction_id = global.get_current_transaction_id()
local payload = {
type = "reconfigure",
current_transaction_id = current_transaction_id,
timestamp = ngx_now(),
config_table = config_table,
config_hash = config_hash,
Expand All @@ -143,6 +146,8 @@ function _M:export_deflated_reconfigure_payload()
self.current_config_hash = config_hash
self.deflated_reconfigure_payload = payload

ngx_log(ngx_NOTICE, "exported configuration with transaction id " .. current_transaction_id)

return payload, nil, config_hash
end

Expand Down
5 changes: 1 addition & 4 deletions kong/clustering/data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,7 @@ function _M:communicate(premature)
msg.timestamp and " with timestamp: " .. msg.timestamp or "",
log_suffix)

local config_table = assert(msg.config_table)

local pok, res, err = pcall(config_helper.update, self.declarative_config,
config_table, msg.config_hash, msg.hashes)
local pok, res, err = pcall(config_helper.update, self.declarative_config, msg)
if pok then
ping_immediately = true
end
Expand Down
7 changes: 6 additions & 1 deletion kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ do
local DECLARATIVE_LOCK_KEY = "declarative:lock"

-- make sure no matter which path it exits, we released the lock.
load_into_cache_with_events = function(entities, meta, hash, hashes)
load_into_cache_with_events = function(entities, meta, hash, hashes, transaction_id)
local kong_shm = ngx.shared.kong

local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL)
Expand All @@ -522,6 +522,11 @@ do
end

ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes)

if ok and transaction_id then
ok, err = kong_shm:set("declarative:current-transaction-id", transaction_id)
end

kong_shm:delete(DECLARATIVE_LOCK_KEY)

return ok, err
Expand Down
8 changes: 6 additions & 2 deletions kong/db/strategies/postgres/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,11 @@ function _mt:query(sql, operation)
end

local phase = get_phase()
local in_admin_api = phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE

if not operation or
not self.config_ro or
(phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE)
not self.config_ro or
in_admin_api
then
-- admin API requests skips the replica optimization
-- to ensure all its results are always strongly consistent
Expand Down Expand Up @@ -552,6 +553,9 @@ function _mt:query(sql, operation)

res, err, partial, num_queries = conn:query(sql)

if in_admin_api and operation == "write" and res and res[1] and res[1]._pg_transaction_id then
kong.response.set_header('X-Kong-Transaction-ID', res[1]._pg_transaction_id)
end
-- if err is string then either it is a SQL error
-- or it is a socket error, here we abort connections
-- that encounter errors instead of reusing them, for
Expand Down
2 changes: 2 additions & 0 deletions kong/db/strategies/postgres/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,8 @@ function _M.new(connector, schema, errors)
insert(upsert_expressions, ttl_escaped .. " = " .. "EXCLUDED." .. ttl_escaped)
end

insert(select_expressions, "pg_current_xact_id() as _pg_transaction_id")

local primary_key_escaped = {}
for i, key in ipairs(primary_key) do
local primary_key_field = primary_key_fields[key]
Expand Down
13 changes: 12 additions & 1 deletion kong/global.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ end


local _GLOBAL = {
phases = phase_checker.phases,
phases = phase_checker.phases,
CURRENT_TRANSACTION_ID = 0,
}


Expand Down Expand Up @@ -294,4 +295,14 @@ function _GLOBAL.init_timing()
end


function _GLOBAL.get_current_transaction_id()
local rows, err = kong.db.connector:query("select pg_current_xact_id() as _pg_transaction_id")
if not rows then
return nil, "could not query postgres for current transaction id: " .. err
else
return tonumber(rows[1]._pg_transaction_id)
end
end


return _GLOBAL
126 changes: 57 additions & 69 deletions kong/runloop/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ local concurrency = require "kong.concurrency"
local lrucache = require "resty.lrucache"
local ktls = require "resty.kong.tls"
local request_id = require "kong.tracing.request_id"


local global = require "kong.global"


local PluginsIterator = require "kong.runloop.plugins_iterator"
Expand Down Expand Up @@ -748,6 +747,8 @@ do
wasm.set_state(wasm_state)
end

global.CURRENT_TRANSACTION_ID = kong_shm:get("declarative:current-transaction-id") or 0

return true
end) -- concurrency.with_coroutine_mutex

Expand All @@ -765,11 +766,6 @@ do
end


local function register_events()
events.register_events(reconfigure_handler)
end


local balancer_prepare
do
local function sleep_once_for_balancer_init()
Expand Down Expand Up @@ -921,7 +917,7 @@ return {
return
end

register_events()
events.register_events(reconfigure_handler)

-- initialize balancers for active healthchecks
timer_at(0, function()
Expand Down Expand Up @@ -967,84 +963,59 @@ return {
if strategy ~= "off" then
local worker_state_update_frequency = kong.configuration.worker_state_update_frequency or 1

local router_async_opts = {
name = "router",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_router_timer(premature)
local function rebuild_timer(premature)
if premature then
return
end

-- Don't wait for the semaphore (timeout = 0) when updating via the
-- timer.
-- If the semaphore is locked, that means that the rebuild is
-- already ongoing.
local ok, err = rebuild_router(router_async_opts)
if not ok then
log(ERR, "could not rebuild router via timer: ", err)
-- Before rebuiding the internal structures, retrieve the current PostgreSQL transaction ID to make it the
-- current transaction ID after the rebuild has finished.
local rebuild_transaction_id, err = global.get_current_transaction_id()
if not rebuild_transaction_id then
log(ERR, err)
end
end

local _, err = kong.timer:named_every("router-rebuild",
worker_state_update_frequency,
rebuild_router_timer)
if err then
log(ERR, "could not schedule timer to rebuild router: ", err)
end

local plugins_iterator_async_opts = {
name = "plugins_iterator",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_plugins_iterator_timer(premature)
if premature then
return
end

local _, err = rebuild_plugins_iterator(plugins_iterator_async_opts)
if err then
log(ERR, "could not rebuild plugins iterator via timer: ", err)
local router_update_status, err = rebuild_router({
name = "router",
timeout = 0,
on_timeout = "return_true",
})
if not router_update_status then
log(ERR, "could not rebuild router via timer: ", err)
end
end

local _, err = kong.timer:named_every("plugins-iterator-rebuild",
worker_state_update_frequency,
rebuild_plugins_iterator_timer)
if err then
log(ERR, "could not schedule timer to rebuild plugins iterator: ", err)
end


if wasm.enabled() then
local wasm_async_opts = {
name = "wasm",
local plugins_iterator_update_status, err = rebuild_plugins_iterator({
name = "plugins_iterator",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_wasm_filter_chains_timer(premature)
if premature then
return
end
})
if not plugins_iterator_update_status then
log(ERR, "could not rebuild plugins iterator via timer: ", err)
end

local _, err = rebuild_wasm_state(wasm_async_opts)
if err then
if wasm.enabled() then
local wasm_update_status, err = rebuild_wasm_state({
name = "wasm",
timeout = 0,
on_timeout = "return_true",
})
if not wasm_update_status then
log(ERR, "could not rebuild wasm filter chains via timer: ", err)
end
end

local _, err = kong.timer:named_every("wasm-filter-chains-rebuild",
worker_state_update_frequency,
rebuild_wasm_filter_chains_timer)
if err then
log(ERR, "could not schedule timer to rebuild wasm filter chains: ", err)
if rebuild_transaction_id then
log(NOTICE, "configuration processing completed for transaction ID " .. rebuild_transaction_id)
global.CURRENT_TRANSACTION_ID = rebuild_transaction_id
end
end

local _, err = kong.timer:named_every("rebuild",
worker_state_update_frequency,
rebuild_timer)
if err then
log(ERR, "could not schedule timer to rebuild: ", err)
end
end
end,
},
Expand Down Expand Up @@ -1134,6 +1105,23 @@ return {
},
access = {
before = function(ctx)
-- If this is a version-conditional request, abort it if this dataplane has not processed at least the
-- specified configuration version yet.
local if_kong_transaction_id = kong.request and kong.request.get_header('x-if-kong-transaction-id')
if if_kong_transaction_id then
if_kong_transaction_id = tonumber(if_kong_transaction_id)
if if_kong_transaction_id and if_kong_transaction_id >= global.CURRENT_TRANSACTION_ID then
return kong.response.error(
503,
"Service Unavailable",
{
["X-Kong-Reconfiguration-Status"] = "pending",
["Retry-After"] = tostring(kong.configuration.worker_state_update_frequency or 1),
}
)
end
end

-- if there is a gRPC service in the context, don't re-execute the pre-access
-- phase handler - it has been executed before the internal redirect
if ctx.service and (ctx.service.protocol == "grpc" or
Expand Down
Loading

0 comments on commit 1627287

Please sign in to comment.