diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 1e80906b4649..76b853eb16c7 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -13,9 +13,11 @@ local events = require("kong.runloop.events") local insert_entity_for_txn = declarative.insert_entity_for_txn local delete_entity_for_txn = declarative.delete_entity_for_txn local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY +local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } +local MAX_RETRY = 5 local ipairs = ipairs @@ -146,6 +148,7 @@ end function _M:init_dp(manager) + local kong_shm = ngx.shared.kong -- DP -- Method: kong.sync.v2.notify_new_version -- Params: new_versions: list of namespaces and their new versions, like: @@ -164,6 +167,8 @@ function _M:init_dp(manager) local lmdb_ver = tonumber(declarative.get_current_hash()) or 0 if lmdb_ver < version then + -- set lastest version to shm + kong_shm:set(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY, version) return self:sync_once() end @@ -363,29 +368,19 @@ local function sync_handler(premature) return end - local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() - -- `do_sync()` is run twice in a row to report back new version number - -- to CP quickly after sync. (`kong.sync.v2.get_delta` is used for both pulling delta - -- as well as status reporting) - for _ = 1, 2 do - local ok, err = do_sync() - if not ok then - return nil, err - end - end -- for - - return true - end) + local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, do_sync) if not res and err ~= "timeout" then ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) end end -local function start_sync_timer(timer_func, delay) - local hdl, err = timer_func(delay, sync_handler) +local sync_once - if not hdl then + +local function start_sync_once_timer(retry_count) + local ok, err = ngx.timer.at(0, sync_once, retry_count or 0) + if not ok then return nil, err end @@ -393,15 +388,49 @@ local function start_sync_timer(timer_func, delay) end +function sync_once(premature, retry_count) + if premature then + return + end + + sync_handler() + + local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY) + local current_version = tonumber(declarative.get_current_hash()) or 0 + + -- retry if the version is not updated + if not latest_notified_version or current_version < latest_notified_version then + retry_count = retry_count or 0 + if retry_count > MAX_RETRY then + ngx_log(ngx_ERR, "sync_once retry count exceeded. retry_count: ", retry_count) + return + end + + return start_sync_once_timer(retry_count + 1) + end +end + + function _M:sync_once(delay) --- XXX TODO: check rpc connection is ready + local hdl, err = ngx.timer.at(delay or 0, sync_once, 0) + + if not hdl then + return nil, err + end - return start_sync_timer(ngx.timer.at, delay or 0) + return true end function _M:sync_every(delay) - return start_sync_timer(ngx.timer.every, delay) + local hdl, err = ngx.timer.every(delay, sync_handler) + + if not hdl then + return nil, err + end + + return true end diff --git a/kong/constants.lua b/kong/constants.lua index 7a05f24cf530..03373c89f279 100644 --- a/kong/constants.lua +++ b/kong/constants.lua @@ -233,6 +233,7 @@ local constants = { RELOAD = "configuration reload failed", GENERIC = "generic or unknown error", }, + CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = "clustering_data_planes:latest_version", CLEAR_HEALTH_STATUS_DELAY = 300, -- 300 seconds