Skip to content

Commit

Permalink
feat(clustering/rpc): use meta RPC call for handshaking with CP (#13887)
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw authored Nov 20, 2024
1 parent 05f3136 commit 7d2f2c1
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 44 deletions.
209 changes: 165 additions & 44 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ local constants = require("kong.constants")
local table_isempty = require("table.isempty")
local pl_tablex = require("pl.tablex")
local cjson = require("cjson.safe")
local string_tools = require("kong.tools.string")


local ipairs = ipairs
local ngx_var = ngx.var
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG
Expand All @@ -29,6 +31,10 @@ local validate_client_cert = clustering_tls.validate_client_cert
local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL


local RPC_MATA_V1 = "kong.meta.v1"
local RPC_SNAPPY_FRAMED = "x-snappy-framed"


local WS_OPTS = {
timeout = constants.CLUSTERING_TIMEOUT,
max_payload_len = kong.configuration.cluster_max_payload,
Expand Down Expand Up @@ -58,7 +64,7 @@ function _M.new(conf, node_id)
end


function _M:_add_socket(socket, capabilities_list)
function _M:_add_socket(socket)
local node_id = socket.node_id

local sockets = self.clients[node_id]
Expand All @@ -71,11 +77,6 @@ function _M:_add_socket(socket, capabilities_list)
self.clients[node_id] = sockets
end

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

assert(not sockets[socket])

sockets[socket] = true
Expand Down Expand Up @@ -141,6 +142,124 @@ function _M:_find_node_and_check_capability(node_id, cap)
end


-- CP => DP
function _M:_handle_meta_call(c, node_id)
local data, typ, err = c:recv_frame()
if err then
return nil, err
end

if typ ~= "binary" then
return nil, "wrong frame type: " .. type
end

local payload = cjson_decode(data)
assert(payload.jsonrpc == "2.0")

if payload.method ~= RPC_MATA_V1 .. ".hello" then
return nil, "wrong RPC meta call: " .. tostring(payload.method)
end

local info = payload.params[1]

local snappy_supported
for _, v in ipairs(info.rpc_frame_encodings) do
if v == RPC_SNAPPY_FRAMED then
snappy_supported = true
break
end
end

if not snappy_supported then
return nil, "unknown encodings: " .. cjson_encode(info.rpc_frame_encodings)
end

-- should have necessary info
assert(type(info.kong_version) == "string")
assert(type(info.kong_node_id) == "string")
assert(type(info.kong_hostname) == "string")
assert(type(info.kong_conf) == "table")

local capabilities_list = info.rpc_capabilities

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

local payload = {
jsonrpc = "2.0",
result = {
rpc_capabilities = self.callbacks:get_capabilities_list(),
-- now we only support snappy
rpc_frame_encoding = RPC_SNAPPY_FRAMED,
},
id = 1,
}

local bytes, err = c:send_binary(cjson_encode(payload))
if not bytes then
return nil, err
end

return true
end


-- DP => CP
function _M:_meta_call(c, meta_cap, node_id)
local info = {
rpc_capabilities = self.callbacks:get_capabilities_list(),

-- now we only support snappy
rpc_frame_encodings = { RPC_SNAPPY_FRAMED, },

kong_version = KONG_VERSION,
kong_hostname = kong.node.get_hostname(),
kong_node_id = self.node_id,
kong_conf = kong.configuration.remove_sensitive(),
}

local payload = {
jsonrpc = "2.0",
method = meta_cap .. ".hello",
params = { info },
id = 1,
}

local bytes, err = c:send_binary(cjson_encode(payload))
if not bytes then
return nil, err
end

local data, typ, err = c:recv_frame()
if err then
return nil, err
end

if typ ~= "binary" then
return nil, "wrong frame type: " .. type
end

local payload = cjson_decode(data)
assert(payload.jsonrpc == "2.0")

local capabilities_list = payload.result.rpc_capabilities

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

-- now we only support snappy
if payload.result.rpc_frame_encoding ~= RPC_SNAPPY_FRAMED then
return nil, "unknown encoding: " .. payload.result.rpc_frame_encoding
end

return true
end


-- low level helper used internally by :call() and concentrator
-- this one does not consider forwarding using concentrator
-- when node does not exist
Expand Down Expand Up @@ -232,61 +351,57 @@ end

-- handle incoming client connections
function _M:handle_websocket()
local kong_version = ngx_var.http_x_kong_version
local node_id = ngx_var.http_x_kong_node_id
local rpc_protocol = ngx_var.http_sec_websocket_protocol
local content_encoding = ngx_var.http_content_encoding
local rpc_capabilities = ngx_var.http_x_kong_rpc_capabilities

if not kong_version then
ngx_log(ngx_ERR, "[rpc] client did not provide version number")
return ngx_exit(ngx.HTTP_CLOSE)
end

if not node_id then
ngx_log(ngx_ERR, "[rpc] client did not provide node ID")
return ngx_exit(ngx.HTTP_CLOSE)
end

if content_encoding ~= "x-snappy-framed" then
ngx_log(ngx_ERR, "[rpc] client does use Snappy compressed frames")
return ngx_exit(ngx.HTTP_CLOSE)
local meta_v1_supported
local protocols = string_tools.split(rpc_protocol, ",")

-- choice a proper protocol
for _, v in ipairs(protocols) do
-- now we only support kong.meta.v1
if RPC_MATA_V1 == string_tools.strip(v) then
meta_v1_supported = true
break
end
end

if rpc_protocol ~= "kong.rpc.v1" then
if not meta_v1_supported then
ngx_log(ngx_ERR, "[rpc] unknown RPC protocol: " ..
tostring(rpc_protocol) ..
", doesn't know how to communicate with client")
return ngx_exit(ngx.HTTP_CLOSE)
end

if not rpc_capabilities then
ngx_log(ngx_ERR, "[rpc] client did not provide capability list")
return ngx_exit(ngx.HTTP_CLOSE)
end

rpc_capabilities = cjson_decode(rpc_capabilities)
if not rpc_capabilities then
ngx_log(ngx_ERR, "[rpc] failed to decode client capability list")
return ngx_exit(ngx.HTTP_CLOSE)
end

local cert, err = validate_client_cert(self.conf, self.cluster_cert, ngx_var.ssl_client_raw_cert)
if not cert then
ngx_log(ngx_ERR, "[rpc] client's certificate failed validation: ", err)
return ngx_exit(ngx.HTTP_CLOSE)
end

ngx.header["X-Kong-RPC-Capabilities"] = cjson_encode(self.callbacks:get_capabilities_list())
-- now we only use kong.meta.v1
ngx.header["Sec-WebSocket-Protocol"] = RPC_MATA_V1

local wb, err = server:new(WS_OPTS)
if not wb then
ngx_log(ngx_ERR, "[rpc] unable to establish WebSocket connection with client: ", err)
return ngx_exit(ngx.HTTP_CLOSE)
end

-- if timeout (default is 5s) we will close the connection
local ok, err = self:_handle_meta_call(wb, node_id)
if not ok then
ngx_log(ngx_ERR, "[rpc] unable to handshake with client: ", err)
return ngx_exit(ngx.HTTP_CLOSE)
end

local s = socket.new(self, wb, node_id)
self:_add_socket(s, rpc_capabilities)
self:_add_socket(s)

-- store DP's ip addr
self.client_ips[node_id] = ngx_var.remote_addr
Expand Down Expand Up @@ -339,13 +454,9 @@ function _M:connect(premature, node_id, host, path, cert, key)
ssl_verify = true,
client_cert = cert,
client_priv_key = key,
protocols = "kong.rpc.v1",
protocols = RPC_MATA_V1,
headers = {
"X-Kong-Version: " .. KONG_VERSION,
"X-Kong-Node-Id: " .. self.node_id,
"X-Kong-Hostname: " .. kong.node.get_hostname(),
"X-Kong-RPC-Capabilities: " .. cjson_encode(self.callbacks:get_capabilities_list()),
"Content-Encoding: x-snappy-framed"
},
}

Expand All @@ -372,24 +483,34 @@ function _M:connect(premature, node_id, host, path, cert, key)
do
local resp_headers = c:get_resp_headers()
-- FIXME: resp_headers should not be case sensitive
if not resp_headers or not resp_headers["x_kong_rpc_capabilities"] then
ngx_log(ngx_ERR, "[rpc] peer did not provide capability list, node_id: ", node_id)

if not resp_headers or not resp_headers["sec_websocket_protocol"] then
ngx_log(ngx_ERR, "[rpc] peer did not provide sec_websocket_protocol, node_id: ", node_id)
c:send_close() -- can't do much if this fails
goto err
end

-- should like "kong.meta.v1"
local meta_cap = resp_headers["sec_websocket_protocol"]

if meta_cap ~= RPC_MATA_V1 then
ngx_log(ngx_ERR, "[rpc] did not support protocol : ", meta_cap)
c:send_close() -- can't do much if this fails
goto err
end

local capabilities = resp_headers["x_kong_rpc_capabilities"]
capabilities = cjson_decode(capabilities)
if not capabilities then
ngx_log(ngx_ERR, "[rpc] unable to decode peer capability list, node_id: ", node_id,
" list: ", capabilities)
-- if timeout (default is 5s) we will close the connection
local ok, err = self:_meta_call(c, meta_cap, node_id)
if not ok then
ngx_log(ngx_ERR, "[rpc] unable to handshake with server, node_id: ", node_id,
" err: ", err)
c:send_close() -- can't do much if this fails
goto err
end

local s = socket.new(self, c, node_id)
s:start()
self:_add_socket(s, capabilities)
self:_add_socket(s)

ok, err = s:join() -- main event loop

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ describe("DP diabled Incremental Sync RPC #" .. strategy, function()
nginx_conf = "spec/fixtures/custom_nginx.template",
nginx_worker_processes = 2, -- multiple workers

cluster_rpc = "off", -- DISABLE rpc
cluster_incremental_sync = "off", -- DISABLE incremental sync

dedicated_config_processing = dedicated, -- privileged agent
Expand Down

1 comment on commit 7d2f2c1

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:7d2f2c1354453318b16f73fccd2c9b9cf0a7e7ae
Artifacts available https://github.com/Kong/kong/actions/runs/11934101092

Please sign in to comment.