Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(clustering): report node version in sync #13844

Merged
merged 16 commits into from
Nov 27, 2024
49 changes: 28 additions & 21 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function _M.new(conf, node_id)

if conf.role == "control_plane" then
self.concentrator = require("kong.clustering.rpc.concentrator").new(self, kong.db)
self.client_ips = {} -- store DP node's ip addr
self.client_info = {} -- store DP node's ip addr and version
end

return setmetatable(self, _MT)
Expand Down Expand Up @@ -96,7 +96,7 @@ function _M:_remove_socket(socket)
self.client_capabilities[node_id] = nil

if self.concentrator then
self.client_ips[node_id] = nil
self.client_info[node_id] = nil
assert(self.concentrator:_enqueue_unsubscribe(node_id))
end
end
Expand Down Expand Up @@ -180,14 +180,6 @@ function _M:_handle_meta_call(c)
assert(type(info.kong_hostname) == "string")
assert(type(info.kong_conf) == "table")

local capabilities_list = info.rpc_capabilities
local node_id = info.kong_node_id

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

local payload = {
jsonrpc = "2.0",
result = {
Expand All @@ -203,6 +195,24 @@ function _M:_handle_meta_call(c)
return nil, err
end

local capabilities_list = info.rpc_capabilities
local node_id = info.kong_node_id

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

-- we are on cp side
assert(self.concentrator)
assert(self.client_info)

-- store DP's ip addr
self.client_info[node_id] = {
ip = ngx_var.remote_addr,
version = info.kong_version,
}
Comment on lines +211 to +214
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should _handle_meta_call also assert(self.concentrator)? In case self.client_info is not set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.


return node_id
end

Expand Down Expand Up @@ -239,24 +249,24 @@ function _M:_meta_call(c, meta_cap, node_id)
end

if typ ~= "binary" then
return nil, "wrong frame type: " .. type
return nil, "wrong frame type: " .. typ
end

local payload = cjson_decode(data)
assert(payload.jsonrpc == "2.0")

-- now we only support snappy
if payload.result.rpc_frame_encoding ~= RPC_SNAPPY_FRAMED then
return nil, "unknown encoding: " .. payload.result.rpc_frame_encoding
end

chobits marked this conversation as resolved.
Show resolved Hide resolved
local capabilities_list = payload.result.rpc_capabilities

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

-- now we only support snappy
if payload.result.rpc_frame_encoding ~= RPC_SNAPPY_FRAMED then
return nil, "unknown encoding: " .. payload.result.rpc_frame_encoding
end

return true
end

Expand Down Expand Up @@ -398,9 +408,6 @@ function _M:handle_websocket()
local s = socket.new(self, wb, node_id)
self:_add_socket(s)

-- store DP's ip addr
self.client_ips[node_id] = ngx_var.remote_addr

s:start()
local res, err = s:join()
self:_remove_socket(s)
Expand Down Expand Up @@ -533,8 +540,8 @@ function _M:get_peers()
end


function _M:get_peer_ip(node_id)
return self.client_ips[node_id]
function _M:get_peer_info(node_id)
return self.client_info[node_id]
end


Expand Down
21 changes: 12 additions & 9 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }


local assert = assert
local ipairs = ipairs
local fmt = string.format
local ngx_null = ngx.null
Expand Down Expand Up @@ -79,13 +80,14 @@ function _M:init_cp(manager)

-- { default = { version = 1000, }, }
local default_namespace_version = default_namespace.version
local node_info = assert(kong.rpc:get_peer_info(node_id))
chobits marked this conversation as resolved.
Show resolved Hide resolved

-- XXX TODO: follow update_sync_status() in control_plane.lua
-- follow update_sync_status() in control_plane.lua
local ok, err = kong.db.clustering_data_planes:upsert({ id = node_id }, {
last_seen = ngx.time(),
hostname = node_id,
ip = kong.rpc:get_peer_ip(node_id), -- try to get the correct ip
version = "3.8.0.0", -- XXX TODO: get from rpc call
ip = node_info.ip, -- get the correct ip
version = node_info.version, -- get from rpc call
sync_status = CLUSTERING_SYNC_STATUS.NORMAL,
config_hash = fmt("%032d", default_namespace_version),
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
Expand Down Expand Up @@ -202,12 +204,13 @@ local function do_sync()
return nil, "rpc is not ready"
end

local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
{ default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
})
local msg = { default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
}

local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg)
if not ns_deltas then
ngx_log(ngx_ERR, "sync get_delta error: ", err)
return true
Expand Down
11 changes: 10 additions & 1 deletion spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local helpers = require "spec.helpers"
local cjson = require("cjson.safe")
local CLUSTERING_SYNC_STATUS = require("kong.constants").CLUSTERING_SYNC_STATUS

-- we need incremental sync to verify rpc
for _, inc_sync in ipairs { "on" } do
Expand Down Expand Up @@ -58,9 +59,17 @@ for _, strategy in helpers.each_strategy() do
-- TODO: perhaps need a new test method
for _, v in pairs(json.data) do
if v.ip == "127.0.0.1" and v.rpc_capabilities and #v.rpc_capabilities ~= 0 then
table.sort(v.rpc_capabilities)
assert.near(14 * 86400, v.ttl, 3)
assert.matches("^(%d+%.%d+)%.%d+", v.version)
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status)

local reg = [[^(\d+)\.(\d+)]]
local m = assert(ngx.re.match(v.version, reg))
assert(tonumber(m[1]) >= 3)
assert(tonumber(m[2]) >= 9)

-- check the available rpc service
table.sort(v.rpc_capabilities)
assert.same("kong.sync.v2", v.rpc_capabilities[1])
return true
end
Expand Down
Loading