Skip to content

Commit

Permalink
refactor(clustering/rpc): simplify the implementation (#14026)
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw authored Dec 23, 2024
1 parent ab888cc commit d07425b
Show file tree
Hide file tree
Showing 14 changed files with 35 additions and 652 deletions.
1 change: 1 addition & 0 deletions kong-3.10.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ build = {
["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua",
["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua",
["kong.db.migrations.core.024_380_to_390"] = "kong/db/migrations/core/024_380_to_390.lua",
["kong.db.migrations.core.025_390_to_3100"] = "kong/db/migrations/core/025_390_to_3100.lua",
["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua",
["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua",
["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua",
Expand Down
58 changes: 2 additions & 56 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ local EMPTY = require("kong.tools.table").EMPTY


local ipairs = ipairs
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG
Expand Down Expand Up @@ -74,29 +73,8 @@ function _M:notify_all_nodes()
end


local function gen_delta(entity, name, options, ws_id, is_delete)
-- composite key, like { id = ... }
local schema = kong.db[name].schema
local pk = schema:extract_pk_values(entity)

assert(schema:validate_primary_key(pk))

local delta = {
type = name,
pk = pk,
ws_id = ws_id,
entity = is_delete and ngx_null or entity,
}

return delta
end


function _M:entity_delta_writer(entity, name, options, ws_id, is_delete)
local d = gen_delta(entity, name, options, ws_id, is_delete)
local deltas = { d, }

local res, err = self.strategy:insert_delta(deltas)
local res, err = self.strategy:insert_delta()
if not res then
self.strategy:cancel_txn()
return nil, err
Expand Down Expand Up @@ -164,39 +142,7 @@ function _M:register_dao_hooks()

ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name)

-- set lmdb value to ngx_null then return entity

local d = gen_delta(entity, name, options, ws_id, true)
local deltas = { d, }

-- delete other related entities
for i, item in ipairs(cascade_entries or EMPTY) do
local e = item.entity
local name = item.dao.schema.name

ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to cascade deleting ", name)

d = gen_delta(e, name, options, e.ws_id, true)

-- #1 item is initial entity
deltas[i + 1] = d
end

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
return nil, err
end

res, err = self.strategy:commit_txn()
if not res then
self.strategy:cancel_txn()
return nil, err
end

self:notify_all_nodes()

return entity -- for other hooks
return self:entity_delta_writer(entity, name, options, ws_id)
end

local dao_hooks = {
Expand Down
53 changes: 4 additions & 49 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@ local fmt = string.format
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_INFO = ngx.INFO
local ngx_DEBUG = ngx.DEBUG


-- number of versions behind before a full sync is forced
local DEFAULT_FULL_SYNC_THRESHOLD = 512


function _M.new(strategy)
local self = {
strategy = strategy,
Expand All @@ -43,8 +38,8 @@ function _M.new(strategy)
end


local function inc_sync_result(res)
return { default = { deltas = res, wipe = false, }, }
local function empty_sync_result()
return { default = { deltas = {}, wipe = false, }, }
end


Expand All @@ -62,10 +57,6 @@ end
function _M:init_cp(manager)
local purge_delay = manager.conf.cluster_data_plane_purge_delay

-- number of versions behind before a full sync is forced
local FULL_SYNC_THRESHOLD = manager.conf.cluster_full_sync_threshold or
DEFAULT_FULL_SYNC_THRESHOLD

-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
Expand Down Expand Up @@ -107,48 +98,12 @@ function _M:init_cp(manager)
return nil, err
end

-- is the node empty? If so, just do a full sync to bring it up to date faster
if default_namespace_version == 0 or
latest_version - default_namespace_version > FULL_SYNC_THRESHOLD
then
-- we need to full sync because holes are found

ngx_log(ngx_INFO,
"[kong.sync.v2] database is empty or too far behind for node_id: ", node_id,
", current_version: ", default_namespace_version,
", forcing a full sync")

default_namespace_version < latest_version then
return full_sync_result()
end

-- do we need an incremental sync?

local res, err = self.strategy:get_delta(default_namespace_version)
if not res then
return nil, err
end

if isempty(res) then
-- node is already up to date
return inc_sync_result(res)
end

-- some deltas are returned, are they contiguous?
if res[1].version == default_namespace_version + 1 then
-- doesn't wipe dp lmdb, incremental sync
return inc_sync_result(res)
end

-- we need to full sync because holes are found
-- in the delta, meaning the oldest version is no longer
-- available

ngx_log(ngx_INFO,
"[kong.sync.v2] delta for node_id no longer available: ", node_id,
", current_version: ", default_namespace_version,
", forcing a full sync")

return full_sync_result()
return empty_sync_result()
end)
end

Expand Down
75 changes: 3 additions & 72 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,7 @@ local _M = {}
local _MT = { __index = _M }


local cjson = require("cjson.safe")
local buffer = require("string.buffer")


local string_format = string.format
local cjson_encode = cjson.encode
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR


local KEEP_VERSION_COUNT = 100
local CLEANUP_TIME_DELAY = 3600 -- 1 hour


function _M.new(db)
Expand All @@ -26,32 +14,8 @@ function _M.new(db)
end


local PURGE_QUERY = [[
DELETE FROM clustering_sync_version
WHERE "version" < (
SELECT MAX("version") - %d
FROM clustering_sync_version
);
]]


-- reserved for future
function _M:init_worker()
local function cleanup_handler(premature)
if premature then
return
end

local res, err = self.connector:query(string_format(PURGE_QUERY, KEEP_VERSION_COUNT))
if not res then
ngx_log(ngx_ERR,
"[incremental] unable to purge old data from incremental delta table, err: ",
err)

return
end
end

assert(ngx.timer.every(CLEANUP_TIME_DELAY, cleanup_handler))
end


Expand All @@ -61,37 +25,12 @@ local NEW_VERSION_QUERY = [[
new_version integer;
BEGIN
INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version;
INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s;
END $$;
]]


-- deltas: {
-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- }
function _M:insert_delta(deltas)
local buf = buffer.new()

local count = #deltas
for i = 1, count do
local d = deltas[i]

buf:putf("(new_version, %s, %s, %s, %s)",
self.connector:escape_literal(d.type),
self.connector:escape_literal(cjson_encode(d.pk)),
self.connector:escape_literal(d.ws_id or kong.default_workspace),
self.connector:escape_literal(cjson_encode(d.entity)))

-- sql values should be separated by comma
if i < count then
buf:put(",")
end
end

local sql = string_format(NEW_VERSION_QUERY, buf:get())

return self.connector:query(sql)
function _M:insert_delta()
return self.connector:query(NEW_VERSION_QUERY)
end


Expand All @@ -112,14 +51,6 @@ function _M:get_latest_version()
end


function _M:get_delta(version)
local sql = "SELECT * FROM clustering_sync_delta" ..
" WHERE version > " .. self.connector:escape_literal(version) ..
" ORDER BY version ASC"
return self.connector:query(sql)
end


function _M:begin_txn()
return self.connector:query("BEGIN;")
end
Expand Down
1 change: 0 additions & 1 deletion kong/conf_loader/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,6 @@ local CONF_PARSERS = {
cluster_dp_labels = { typ = "array" },
cluster_rpc = { typ = "boolean" },
cluster_rpc_sync = { typ = "boolean" },
cluster_full_sync_threshold = { typ = "number" },
cluster_cjson = { typ = "boolean" },

kic = { typ = "boolean" },
Expand Down
9 changes: 0 additions & 9 deletions kong/db/migrations/core/024_380_to_390.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,6 @@ return {
CREATE TABLE IF NOT EXISTS clustering_sync_version (
"version" SERIAL PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS clustering_sync_delta (
"version" INT NOT NULL,
"type" TEXT NOT NULL,
"pk" JSON NOT NULL,
"ws_id" UUID NOT NULL,
"entity" JSON,
FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS clustering_sync_delta_version_idx ON clustering_sync_delta (version);
END;
$$;
]]
Expand Down
12 changes: 12 additions & 0 deletions kong/db/migrations/core/025_390_to_3100.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
return {
postgres = {
up = [[
DO $$
BEGIN
DROP TABLE IF EXISTS clustering_sync_delta;
DROP INDEX IF EXISTS clustering_sync_delta_version_idx;
END;
$$;
]]
}
}
1 change: 0 additions & 1 deletion kong/templates/kong_defaults.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ cluster_use_proxy = off
cluster_dp_labels = NONE
cluster_rpc = off
cluster_rpc_sync = off
cluster_full_sync_threshold = 512
cluster_cjson = off
lmdb_environment_path = dbless.lmdb
Expand Down
6 changes: 6 additions & 0 deletions spec/02-integration/09-hybrid_mode/01-sync_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,12 @@ describe("CP/DP config sync #" .. strategy .. " rpc_sync=" .. rpc_sync, function
end
end, 5)

-- TODO: it may cause flakiness
-- wait for rpc sync finishing
if rpc_sync == "on" then
ngx.sleep(0.5)
end

for i = 5, 2, -1 do
res = proxy_client:get("/" .. i)
assert.res_status(404, res)
Expand Down
1 change: 0 additions & 1 deletion spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ describe("lazy_export with #".. strategy .. " rpc_sync=" .. rpc_sync, function()
touch_config()
if rpc_sync == "on" then
assert.logfile().has.line("[kong.sync.v2] config push (connected client)", true)
assert.logfile().has.line("[kong.sync.v2] database is empty or too far behind for node_id", true)

else
assert.logfile().has.line("[clustering] exporting config", true)
Expand Down
Loading

1 comment on commit d07425b

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:d07425b0a7b5a3e7e29f719dd068f826e36829d9
Artifacts available https://github.com/Kong/kong/actions/runs/12462088044

Please sign in to comment.