Skip to content

Commit

Permalink
fix(sync): avoiding long delay caused by race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
StarlightIbuki committed Nov 27, 2024
1 parent 23b2ae1 commit 87f7eec
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 18 deletions.
54 changes: 36 additions & 18 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ local events = require("kong.runloop.events")
local insert_entity_for_txn = declarative.insert_entity_for_txn
local delete_entity_for_txn = declarative.delete_entity_for_txn
local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY
local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }
local MAX_RETRY = 5


local ipairs = ipairs
Expand Down Expand Up @@ -146,6 +148,7 @@ end


function _M:init_dp(manager)
local kong_shm = ngx.shared.kong
-- DP
-- Method: kong.sync.v2.notify_new_version
-- Params: new_versions: list of namespaces and their new versions, like:
Expand All @@ -164,6 +167,8 @@ function _M:init_dp(manager)

local lmdb_ver = tonumber(declarative.get_current_hash()) or 0
if lmdb_ver < version then
-- set lastest version to shm
kong_shm:set(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY, version)
return self:sync_once()
end

Expand Down Expand Up @@ -383,43 +388,56 @@ local function sync_handler(premature)
return
end

local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function()
-- `do_sync()` is run twice in a row to report back new version number
-- to CP quickly after sync. (`kong.sync.v2.get_delta` is used for both pulling delta
-- as well as status reporting)
for _ = 1, 2 do
local ok, err = do_sync()
if not ok then
return nil, err
end
end -- for

return true
end)
local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, do_sync)
if not res and err ~= "timeout" then
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err)
end
end


local function start_sync_timer(timer_func, delay)
local hdl, err = timer_func(delay, sync_handler)
local sync_once_impl

if not hdl then

local function start_sync_once_timer(retry_count)
local ok, err = ngx.timer.at(0, sync_once_impl, retry_count or 0)
if not ok then
return nil, err
end

return true
end


function sync_once_impl(premature, retry_count)
if premature then
return
end

sync_handler()

local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY)
local current_version = tonumber(declarative.get_current_hash()) or 0

-- retry if the version is not updated
if not latest_notified_version or current_version < latest_notified_version then
retry_count = retry_count or 0
if retry_count > MAX_RETRY then
ngx_log(ngx_ERR, "sync_once retry count exceeded. retry_count: ", retry_count)
return
end

return start_sync_once_timer(retry_count + 1)
end
end


function _M:sync_once(delay)
return start_sync_timer(ngx.timer.at, delay or 0)
return ngx.timer.at(delay or 0, sync_once_impl, 0)
end


function _M:sync_every(delay)
return start_sync_timer(ngx.timer.every, delay)
return ngx.timer.every(delay, sync_handler)
end


Expand Down
1 change: 1 addition & 0 deletions kong/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ local constants = {
RELOAD = "configuration reload failed",
GENERIC = "generic or unknown error",
},
CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = "clustering_data_planes:latest_version",

CLEAR_HEALTH_STATUS_DELAY = 300, -- 300 seconds

Expand Down

0 comments on commit 87f7eec

Please sign in to comment.