From 16272874aaff490c4b410ba27e2f8686845bddb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20H=C3=BCbner?= Date: Fri, 15 Sep 2023 15:05:48 +0200 Subject: [PATCH 1/3] feat(testing): add reconfiguration completion detection mechanism This change adds a new response header X-Kong-Transaction-Id to the Admin API. It contains the (ever incrementing) PostgreSQL transaction ID of the change that was made. The value can then be put into the X-If-Kong-Transaction-Id variable in a request to the proxy path. The request will be rejected with a 503 error if the proxy path has not been reconfigured yet with this or a later transaction id. The mechanism is useful in testing, when changes are made through the Admin API and the effects on the proxy path are then to be verified. Rather than waiting for a static period or retrying the proxy path request until the expected result is received, the proxy path client specifies the last transaction ID received from the Admin API in the X-If-Kong-Transaction-Id header and retries the request if a 503 error is received. --- .../reconfiguration-completion-detection.yml | 3 + kong/clustering/config_helper.lua | 11 +- kong/clustering/control_plane.lua | 5 + kong/clustering/data_plane.lua | 5 +- kong/db/declarative/import.lua | 7 +- kong/db/strategies/postgres/connector.lua | 8 +- kong/db/strategies/postgres/init.lua | 2 + kong/global.lua | 13 +- kong/runloop/handler.lua | 126 +++++++-------- .../24-reconfiguration-completion_spec.lua | 143 ++++++++++++++++++ 10 files changed, 244 insertions(+), 79 deletions(-) create mode 100644 changelog/unreleased/reconfiguration-completion-detection.yml create mode 100644 spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua diff --git a/changelog/unreleased/reconfiguration-completion-detection.yml b/changelog/unreleased/reconfiguration-completion-detection.yml new file mode 100644 index 000000000000..4389fd362a78 --- /dev/null +++ b/changelog/unreleased/reconfiguration-completion-detection.yml @@ -0,0 +1,3 @@ +message: Provide mechanism to detect completion of reconfiguration on the proxy path +type: feature +scope: Core diff --git a/kong/clustering/config_helper.lua b/kong/clustering/config_helper.lua index 790f3e72c15d..1c0083b82ec9 100644 --- a/kong/clustering/config_helper.lua +++ b/kong/clustering/config_helper.lua @@ -202,7 +202,12 @@ local function fill_empty_hashes(hashes) end end -function _M.update(declarative_config, config_table, config_hash, hashes) +function _M.update(declarative_config, msg) + + local config_table = msg.config_table + local config_hash = msg.config_hash + local hashes = msg.hashes + assert(type(config_table) == "table") if not config_hash then @@ -236,11 +241,13 @@ function _M.update(declarative_config, config_table, config_hash, hashes) -- executed by worker 0 local res - res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes) + res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes, msg.current_transaction_id) if not res then return nil, err end + ngx_log(ngx.NOTICE, "loaded configuration with transaction ID " .. msg.current_transaction_id) + return true end diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index a2696f9a3eb1..6939d7a78a5f 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -11,6 +11,7 @@ local compat = require("kong.clustering.compat") local constants = require("kong.constants") local events = require("kong.clustering.events") local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash +local global = require("kong.global") local string = string @@ -115,8 +116,10 @@ function _M:export_deflated_reconfigure_payload() local config_hash, hashes = calculate_config_hash(config_table) + local current_transaction_id = global.get_current_transaction_id() local payload = { type = "reconfigure", + current_transaction_id = current_transaction_id, timestamp = ngx_now(), config_table = config_table, config_hash = config_hash, @@ -143,6 +146,8 @@ function _M:export_deflated_reconfigure_payload() self.current_config_hash = config_hash self.deflated_reconfigure_payload = payload + ngx_log(ngx_NOTICE, "exported configuration with transaction id " .. current_transaction_id) + return payload, nil, config_hash end diff --git a/kong/clustering/data_plane.lua b/kong/clustering/data_plane.lua index d0f0e1e020a9..4030b3174b05 100644 --- a/kong/clustering/data_plane.lua +++ b/kong/clustering/data_plane.lua @@ -213,10 +213,7 @@ function _M:communicate(premature) msg.timestamp and " with timestamp: " .. msg.timestamp or "", log_suffix) - local config_table = assert(msg.config_table) - - local pok, res, err = pcall(config_helper.update, self.declarative_config, - config_table, msg.config_hash, msg.hashes) + local pok, res, err = pcall(config_helper.update, self.declarative_config, msg) if pok then ping_immediately = true end diff --git a/kong/db/declarative/import.lua b/kong/db/declarative/import.lua index 4908e3d6a8e3..3c30a31da262 100644 --- a/kong/db/declarative/import.lua +++ b/kong/db/declarative/import.lua @@ -507,7 +507,7 @@ do local DECLARATIVE_LOCK_KEY = "declarative:lock" -- make sure no matter which path it exits, we released the lock. - load_into_cache_with_events = function(entities, meta, hash, hashes) + load_into_cache_with_events = function(entities, meta, hash, hashes, transaction_id) local kong_shm = ngx.shared.kong local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL) @@ -522,6 +522,11 @@ do end ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes) + + if ok and transaction_id then + ok, err = kong_shm:set("declarative:current-transaction-id", transaction_id) + end + kong_shm:delete(DECLARATIVE_LOCK_KEY) return ok, err diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index fd5e9259066a..b5b9c257d8fa 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -519,10 +519,11 @@ function _mt:query(sql, operation) end local phase = get_phase() + local in_admin_api = phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE if not operation or - not self.config_ro or - (phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE) + not self.config_ro or + in_admin_api then -- admin API requests skips the replica optimization -- to ensure all its results are always strongly consistent @@ -552,6 +553,9 @@ function _mt:query(sql, operation) res, err, partial, num_queries = conn:query(sql) + if in_admin_api and operation == "write" and res and res[1] and res[1]._pg_transaction_id then + kong.response.set_header('X-Kong-Transaction-ID', res[1]._pg_transaction_id) + end -- if err is string then either it is a SQL error -- or it is a socket error, here we abort connections -- that encounter errors instead of reusing them, for diff --git a/kong/db/strategies/postgres/init.lua b/kong/db/strategies/postgres/init.lua index 74da93465aa6..804f4fb0b34a 100644 --- a/kong/db/strategies/postgres/init.lua +++ b/kong/db/strategies/postgres/init.lua @@ -987,6 +987,8 @@ function _M.new(connector, schema, errors) insert(upsert_expressions, ttl_escaped .. " = " .. "EXCLUDED." .. ttl_escaped) end + insert(select_expressions, "pg_current_xact_id() as _pg_transaction_id") + local primary_key_escaped = {} for i, key in ipairs(primary_key) do local primary_key_field = primary_key_fields[key] diff --git a/kong/global.lua b/kong/global.lua index cdceaa7f58ef..2c2449b5c64f 100644 --- a/kong/global.lua +++ b/kong/global.lua @@ -68,7 +68,8 @@ end local _GLOBAL = { - phases = phase_checker.phases, + phases = phase_checker.phases, + CURRENT_TRANSACTION_ID = 0, } @@ -294,4 +295,14 @@ function _GLOBAL.init_timing() end +function _GLOBAL.get_current_transaction_id() + local rows, err = kong.db.connector:query("select pg_current_xact_id() as _pg_transaction_id") + if not rows then + return nil, "could not query postgres for current transaction id: " .. err + else + return tonumber(rows[1]._pg_transaction_id) + end +end + + return _GLOBAL diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index 250d712f55b9..b22fc739086c 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -13,8 +13,7 @@ local concurrency = require "kong.concurrency" local lrucache = require "resty.lrucache" local ktls = require "resty.kong.tls" local request_id = require "kong.tracing.request_id" - - +local global = require "kong.global" local PluginsIterator = require "kong.runloop.plugins_iterator" @@ -748,6 +747,8 @@ do wasm.set_state(wasm_state) end + global.CURRENT_TRANSACTION_ID = kong_shm:get("declarative:current-transaction-id") or 0 + return true end) -- concurrency.with_coroutine_mutex @@ -765,11 +766,6 @@ do end -local function register_events() - events.register_events(reconfigure_handler) -end - - local balancer_prepare do local function sleep_once_for_balancer_init() @@ -921,7 +917,7 @@ return { return end - register_events() + events.register_events(reconfigure_handler) -- initialize balancers for active healthchecks timer_at(0, function() @@ -967,84 +963,59 @@ return { if strategy ~= "off" then local worker_state_update_frequency = kong.configuration.worker_state_update_frequency or 1 - local router_async_opts = { - name = "router", - timeout = 0, - on_timeout = "return_true", - } - - local function rebuild_router_timer(premature) + local function rebuild_timer(premature) if premature then return end - -- Don't wait for the semaphore (timeout = 0) when updating via the - -- timer. - -- If the semaphore is locked, that means that the rebuild is - -- already ongoing. - local ok, err = rebuild_router(router_async_opts) - if not ok then - log(ERR, "could not rebuild router via timer: ", err) + -- Before rebuiding the internal structures, retrieve the current PostgreSQL transaction ID to make it the + -- current transaction ID after the rebuild has finished. + local rebuild_transaction_id, err = global.get_current_transaction_id() + if not rebuild_transaction_id then + log(ERR, err) end - end - local _, err = kong.timer:named_every("router-rebuild", - worker_state_update_frequency, - rebuild_router_timer) - if err then - log(ERR, "could not schedule timer to rebuild router: ", err) - end - - local plugins_iterator_async_opts = { - name = "plugins_iterator", - timeout = 0, - on_timeout = "return_true", - } - - local function rebuild_plugins_iterator_timer(premature) - if premature then - return - end - - local _, err = rebuild_plugins_iterator(plugins_iterator_async_opts) - if err then - log(ERR, "could not rebuild plugins iterator via timer: ", err) + local router_update_status, err = rebuild_router({ + name = "router", + timeout = 0, + on_timeout = "return_true", + }) + if not router_update_status then + log(ERR, "could not rebuild router via timer: ", err) end - end - - local _, err = kong.timer:named_every("plugins-iterator-rebuild", - worker_state_update_frequency, - rebuild_plugins_iterator_timer) - if err then - log(ERR, "could not schedule timer to rebuild plugins iterator: ", err) - end - - if wasm.enabled() then - local wasm_async_opts = { - name = "wasm", + local plugins_iterator_update_status, err = rebuild_plugins_iterator({ + name = "plugins_iterator", timeout = 0, on_timeout = "return_true", - } - - local function rebuild_wasm_filter_chains_timer(premature) - if premature then - return - end + }) + if not plugins_iterator_update_status then + log(ERR, "could not rebuild plugins iterator via timer: ", err) + end - local _, err = rebuild_wasm_state(wasm_async_opts) - if err then + if wasm.enabled() then + local wasm_update_status, err = rebuild_wasm_state({ + name = "wasm", + timeout = 0, + on_timeout = "return_true", + }) + if not wasm_update_status then log(ERR, "could not rebuild wasm filter chains via timer: ", err) end end - local _, err = kong.timer:named_every("wasm-filter-chains-rebuild", - worker_state_update_frequency, - rebuild_wasm_filter_chains_timer) - if err then - log(ERR, "could not schedule timer to rebuild wasm filter chains: ", err) + if rebuild_transaction_id then + log(NOTICE, "configuration processing completed for transaction ID " .. rebuild_transaction_id) + global.CURRENT_TRANSACTION_ID = rebuild_transaction_id end end + + local _, err = kong.timer:named_every("rebuild", + worker_state_update_frequency, + rebuild_timer) + if err then + log(ERR, "could not schedule timer to rebuild: ", err) + end end end, }, @@ -1134,6 +1105,23 @@ return { }, access = { before = function(ctx) + -- If this is a version-conditional request, abort it if this dataplane has not processed at least the + -- specified configuration version yet. + local if_kong_transaction_id = kong.request and kong.request.get_header('x-if-kong-transaction-id') + if if_kong_transaction_id then + if_kong_transaction_id = tonumber(if_kong_transaction_id) + if if_kong_transaction_id and if_kong_transaction_id >= global.CURRENT_TRANSACTION_ID then + return kong.response.error( + 503, + "Service Unavailable", + { + ["X-Kong-Reconfiguration-Status"] = "pending", + ["Retry-After"] = tostring(kong.configuration.worker_state_update_frequency or 1), + } + ) + end + end + -- if there is a gRPC service in the context, don't re-execute the pre-access -- phase handler - it has been executed before the internal redirect if ctx.service and (ctx.service.protocol == "grpc" or diff --git a/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua b/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua new file mode 100644 index 000000000000..c3c70775e3a3 --- /dev/null +++ b/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua @@ -0,0 +1,143 @@ +local helpers = require "spec.helpers" +local cjson = require "cjson" + +describe("Admin API - Reconfiguration Completion -", function() + + local WORKER_STATE_UPDATE_FREQ = 1 + + local admin_client + local proxy_client + + local function run_tests() + + local res = admin_client:post("/services", { + body = { + name = "test-service", + url = "http://example.com", + }, + headers = { ["Content-Type"] = "application/json" }, + }) + local body = assert.res_status(201, res) + local service = cjson.decode(body) + + -- We're running the route setup in `eventually` to cover for the unlikely case that reconfiguration completes + -- between adding the route and requesting the path through the proxy path. + + local next_path do + local path_suffix = 0 + function next_path() + path_suffix = path_suffix + 1 + return "/" .. tostring(path_suffix) + end + end + + local service_path + local kong_transaction_id + + assert.eventually(function() + service_path = next_path() + + res = admin_client:post("/services/" .. service.id .. "/routes", { + body = { + paths = { service_path } + }, + headers = { ["Content-Type"] = "application/json" }, + }) + assert.res_status(201, res) + kong_transaction_id = res.headers['x-kong-transaction-id'] + assert.is_string(kong_transaction_id) + + res = proxy_client:get(service_path, + { + headers = { + ["X-If-Kong-Transaction-Id"] = kong_transaction_id + } + }) + assert.res_status(503, res) + assert.equals("pending", res.headers['x-kong-reconfiguration-status']) + local retry_after = tonumber(res.headers['retry-after']) + ngx.sleep(retry_after) + end) + .has_no_error() + + assert.eventually(function() + res = proxy_client:get(service_path, + { + headers = { + ["X-If-Kong-Transaction-Id"] = kong_transaction_id + } + }) + assert.res_status(200, res) + end) + .has_no_error() + end + + describe("#traditional mode -", function() + lazy_setup(function() + helpers.get_db_utils() + assert(helpers.start_kong({ + worker_consistency = "eventual", + worker_state_update_frequency = WORKER_STATE_UPDATE_FREQ, + })) + admin_client = helpers.admin_client() + proxy_client = helpers.proxy_client() + end) + + teardown(function() + if admin_client then + admin_client:close() + end + if proxy_client then + proxy_client:close() + end + helpers.stop_kong() + end) + + it("rejects proxy requests if worker state has not been updated yet", run_tests) + end) + + describe("#hybrid mode -", function() + lazy_setup(function() + helpers.get_db_utils() + + assert(helpers.start_kong({ + role = "control_plane", + database = "postgres", + prefix = "cp", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + lua_ssl_trusted_certificate = "spec/fixtures/kong_clustering.crt", + cluster_listen = "127.0.0.1:9005", + cluster_telemetry_listen = "127.0.0.1:9006", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "dp", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + lua_ssl_trusted_certificate = "spec/fixtures/kong_clustering.crt", + cluster_control_plane = "127.0.0.1:9005", + cluster_telemetry_endpoint = "127.0.0.1:9006", + proxy_listen = "0.0.0.0:9002", + })) + admin_client = helpers.admin_client() + proxy_client = helpers.proxy_client("127.0.0.1", 9002) + end) + + teardown(function() + if admin_client then + admin_client:close() + end + if proxy_client then + proxy_client:close() + end + helpers.stop_kong("dp") + helpers.stop_kong("cp") + end) + + it("rejects proxy requests if worker state has not been updated yet", run_tests) + end) +end) From ca4d6060c2a0714c22aee509de8d31f3d2f15826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20H=C3=BCbner?= Date: Thu, 26 Oct 2023 11:45:38 +0200 Subject: [PATCH 2/3] fix(test): remove external dependency --- .../24-reconfiguration-completion_spec.lua | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua b/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua index c3c70775e3a3..9f528c4bb46b 100644 --- a/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua +++ b/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua @@ -10,10 +10,22 @@ describe("Admin API - Reconfiguration Completion -", function() local function run_tests() - local res = admin_client:post("/services", { + local res = admin_client:post("/plugins", { + body = { + name = "request-termination", + config = { + status_code = 200, + body = "kong terminated the request", + } + }, + headers = { ["Content-Type"] = "application/json" }, + }) + assert.res_status(201, res) + + res = admin_client:post("/services", { body = { name = "test-service", - url = "http://example.com", + url = "http://127.0.0.1", }, headers = { ["Content-Type"] = "application/json" }, }) @@ -67,7 +79,8 @@ describe("Admin API - Reconfiguration Completion -", function() ["X-If-Kong-Transaction-Id"] = kong_transaction_id } }) - assert.res_status(200, res) + body = assert.res_status(200, res) + assert.equals("kong terminated the request", body) end) .has_no_error() end From 2adca37a9aa1c125f8f0ea385abc267f02b6a3be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20H=C3=BCbner?= Date: Thu, 26 Oct 2023 12:40:54 +0200 Subject: [PATCH 3/3] fix(core): yield before updating globals.CURRENT_TRANSACTION_ID --- kong/runloop/handler.lua | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index b22fc739086c..e2759287ed4c 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -1005,6 +1005,9 @@ return { end if rebuild_transaction_id then + -- Yield to process any pending invalidations + utils.yield() + log(NOTICE, "configuration processing completed for transaction ID " .. rebuild_transaction_id) global.CURRENT_TRANSACTION_ID = rebuild_transaction_id end