Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(testing): add reconfiguration completion detection mechanism #11581

Merged
merged 3 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/unreleased/reconfiguration-completion-detection.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: Provide mechanism to detect completion of reconfiguration on the proxy path
type: feature
scope: Core
11 changes: 9 additions & 2 deletions kong/clustering/config_helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,12 @@ local function fill_empty_hashes(hashes)
end
end

function _M.update(declarative_config, config_table, config_hash, hashes)
function _M.update(declarative_config, msg)

local config_table = msg.config_table
local config_hash = msg.config_hash
local hashes = msg.hashes

assert(type(config_table) == "table")

if not config_hash then
Expand Down Expand Up @@ -236,11 +241,13 @@ function _M.update(declarative_config, config_table, config_hash, hashes)
-- executed by worker 0

local res
res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes)
res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes, msg.current_transaction_id)
if not res then
return nil, err
end

ngx_log(ngx.NOTICE, "loaded configuration with transaction ID " .. msg.current_transaction_id)

return true
end

Expand Down
5 changes: 5 additions & 0 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ local compat = require("kong.clustering.compat")
local constants = require("kong.constants")
local events = require("kong.clustering.events")
local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash
local global = require("kong.global")


local string = string
Expand Down Expand Up @@ -115,8 +116,10 @@ function _M:export_deflated_reconfigure_payload()

local config_hash, hashes = calculate_config_hash(config_table)

local current_transaction_id = global.get_current_transaction_id()
local payload = {
type = "reconfigure",
current_transaction_id = current_transaction_id,
timestamp = ngx_now(),
config_table = config_table,
config_hash = config_hash,
Expand All @@ -143,6 +146,8 @@ function _M:export_deflated_reconfigure_payload()
self.current_config_hash = config_hash
self.deflated_reconfigure_payload = payload

ngx_log(ngx_NOTICE, "exported configuration with transaction id " .. current_transaction_id)

return payload, nil, config_hash
end

Expand Down
5 changes: 1 addition & 4 deletions kong/clustering/data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,7 @@ function _M:communicate(premature)
msg.timestamp and " with timestamp: " .. msg.timestamp or "",
log_suffix)

local config_table = assert(msg.config_table)

local pok, res, err = pcall(config_helper.update, self.declarative_config,
config_table, msg.config_hash, msg.hashes)
local pok, res, err = pcall(config_helper.update, self.declarative_config, msg)
if pok then
ping_immediately = true
end
Expand Down
7 changes: 6 additions & 1 deletion kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ do
local DECLARATIVE_LOCK_KEY = "declarative:lock"

-- make sure no matter which path it exits, we released the lock.
load_into_cache_with_events = function(entities, meta, hash, hashes)
load_into_cache_with_events = function(entities, meta, hash, hashes, transaction_id)
local kong_shm = ngx.shared.kong

local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL)
Expand All @@ -522,6 +522,11 @@ do
end

ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes)

if ok and transaction_id then
ok, err = kong_shm:set("declarative:current-transaction-id", transaction_id)
end

kong_shm:delete(DECLARATIVE_LOCK_KEY)

return ok, err
Expand Down
8 changes: 6 additions & 2 deletions kong/db/strategies/postgres/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,11 @@ function _mt:query(sql, operation)
end

local phase = get_phase()
local in_admin_api = phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE

if not operation or
not self.config_ro or
(phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE)
not self.config_ro or
in_admin_api
then
-- admin API requests skips the replica optimization
-- to ensure all its results are always strongly consistent
Expand Down Expand Up @@ -552,6 +553,9 @@ function _mt:query(sql, operation)

res, err, partial, num_queries = conn:query(sql)

if in_admin_api and operation == "write" and res and res[1] and res[1]._pg_transaction_id then
kong.response.set_header('X-Kong-Transaction-ID', res[1]._pg_transaction_id)
end
-- if err is string then either it is a SQL error
-- or it is a socket error, here we abort connections
-- that encounter errors instead of reusing them, for
Expand Down
2 changes: 2 additions & 0 deletions kong/db/strategies/postgres/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,8 @@ function _M.new(connector, schema, errors)
insert(upsert_expressions, ttl_escaped .. " = " .. "EXCLUDED." .. ttl_escaped)
end

insert(select_expressions, "pg_current_xact_id() as _pg_transaction_id")

local primary_key_escaped = {}
for i, key in ipairs(primary_key) do
local primary_key_field = primary_key_fields[key]
Expand Down
13 changes: 12 additions & 1 deletion kong/global.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ end


local _GLOBAL = {
phases = phase_checker.phases,
phases = phase_checker.phases,
CURRENT_TRANSACTION_ID = 0,
}


Expand Down Expand Up @@ -294,4 +295,14 @@ function _GLOBAL.init_timing()
end


function _GLOBAL.get_current_transaction_id()
local rows, err = kong.db.connector:query("select pg_current_xact_id() as _pg_transaction_id")
if not rows then
return nil, "could not query postgres for current transaction id: " .. err
else
return tonumber(rows[1]._pg_transaction_id)
end
end


return _GLOBAL
129 changes: 60 additions & 69 deletions kong/runloop/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ local concurrency = require "kong.concurrency"
local lrucache = require "resty.lrucache"
local ktls = require "resty.kong.tls"
local request_id = require "kong.tracing.request_id"


local global = require "kong.global"


local PluginsIterator = require "kong.runloop.plugins_iterator"
Expand Down Expand Up @@ -748,6 +747,8 @@ do
wasm.set_state(wasm_state)
end

global.CURRENT_TRANSACTION_ID = kong_shm:get("declarative:current-transaction-id") or 0

return true
end) -- concurrency.with_coroutine_mutex

Expand All @@ -765,11 +766,6 @@ do
end


local function register_events()
events.register_events(reconfigure_handler)
end


local balancer_prepare
do
local function sleep_once_for_balancer_init()
Expand Down Expand Up @@ -921,7 +917,7 @@ return {
return
end

register_events()
events.register_events(reconfigure_handler)

-- initialize balancers for active healthchecks
timer_at(0, function()
Expand Down Expand Up @@ -967,84 +963,62 @@ return {
if strategy ~= "off" then
local worker_state_update_frequency = kong.configuration.worker_state_update_frequency or 1

local router_async_opts = {
name = "router",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_router_timer(premature)
local function rebuild_timer(premature)
if premature then
return
end

-- Don't wait for the semaphore (timeout = 0) when updating via the
-- timer.
-- If the semaphore is locked, that means that the rebuild is
-- already ongoing.
local ok, err = rebuild_router(router_async_opts)
if not ok then
log(ERR, "could not rebuild router via timer: ", err)
-- Before rebuiding the internal structures, retrieve the current PostgreSQL transaction ID to make it the
-- current transaction ID after the rebuild has finished.
local rebuild_transaction_id, err = global.get_current_transaction_id()
if not rebuild_transaction_id then
log(ERR, err)
end
end

local _, err = kong.timer:named_every("router-rebuild",
worker_state_update_frequency,
rebuild_router_timer)
if err then
log(ERR, "could not schedule timer to rebuild router: ", err)
end

local plugins_iterator_async_opts = {
name = "plugins_iterator",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_plugins_iterator_timer(premature)
if premature then
return
end

local _, err = rebuild_plugins_iterator(plugins_iterator_async_opts)
if err then
log(ERR, "could not rebuild plugins iterator via timer: ", err)
local router_update_status, err = rebuild_router({
name = "router",
timeout = 0,
on_timeout = "return_true",
})
if not router_update_status then
log(ERR, "could not rebuild router via timer: ", err)
end
end

local _, err = kong.timer:named_every("plugins-iterator-rebuild",
worker_state_update_frequency,
rebuild_plugins_iterator_timer)
if err then
log(ERR, "could not schedule timer to rebuild plugins iterator: ", err)
end


if wasm.enabled() then
local wasm_async_opts = {
name = "wasm",
local plugins_iterator_update_status, err = rebuild_plugins_iterator({
name = "plugins_iterator",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_wasm_filter_chains_timer(premature)
if premature then
return
end
})
if not plugins_iterator_update_status then
log(ERR, "could not rebuild plugins iterator via timer: ", err)
end

local _, err = rebuild_wasm_state(wasm_async_opts)
if err then
if wasm.enabled() then
local wasm_update_status, err = rebuild_wasm_state({
name = "wasm",
timeout = 0,
on_timeout = "return_true",
})
if not wasm_update_status then
log(ERR, "could not rebuild wasm filter chains via timer: ", err)
end
end

local _, err = kong.timer:named_every("wasm-filter-chains-rebuild",
worker_state_update_frequency,
rebuild_wasm_filter_chains_timer)
if err then
log(ERR, "could not schedule timer to rebuild wasm filter chains: ", err)
if rebuild_transaction_id then
-- Yield to process any pending invalidations
utils.yield()

log(NOTICE, "configuration processing completed for transaction ID " .. rebuild_transaction_id)
global.CURRENT_TRANSACTION_ID = rebuild_transaction_id
end
end

local _, err = kong.timer:named_every("rebuild",
worker_state_update_frequency,
rebuild_timer)
if err then
log(ERR, "could not schedule timer to rebuild: ", err)
end
end
end,
},
Expand Down Expand Up @@ -1134,6 +1108,23 @@ return {
},
access = {
before = function(ctx)
-- If this is a version-conditional request, abort it if this dataplane has not processed at least the
-- specified configuration version yet.
local if_kong_transaction_id = kong.request and kong.request.get_header('x-if-kong-transaction-id')
if if_kong_transaction_id then
if_kong_transaction_id = tonumber(if_kong_transaction_id)
if if_kong_transaction_id and if_kong_transaction_id >= global.CURRENT_TRANSACTION_ID then
return kong.response.error(
503,
"Service Unavailable",
{
["X-Kong-Reconfiguration-Status"] = "pending",
["Retry-After"] = tostring(kong.configuration.worker_state_update_frequency or 1),
}
)
end
end

-- if there is a gRPC service in the context, don't re-execute the pre-access
-- phase handler - it has been executed before the internal redirect
if ctx.service and (ctx.service.protocol == "grpc" or
Expand Down
Loading