From 3bf77d7455d019455c017e8e43d05e444bf78c86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20H=C3=BCbner?= Date: Fri, 25 Aug 2023 08:21:16 +0200 Subject: [PATCH] fix(queues): shutdown flag needs to be checked while waiting (#11456) The previous fix (#11376) to enable batching during shutdown failed to check the shutdown flag while waiting for more items to batch. If the coalescing delay was large enough, this could cause the last batch to be lost. --- kong/tools/queue.lua | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index e0845939172e..d632cca8112e 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -234,10 +234,6 @@ function Queue:process_once() -- We've got our first entry from the queue. Collect more entries until max_coalescing_delay expires or we've collected -- max_batch_size entries to send - if ngx.worker.exiting() then - -- minimize coalescing delay during shutdown to quickly process remaining entries - self.max_coalescing_delay = COALESCE_MIN_TIME - end while entry_count < self.max_batch_size and self.max_coalescing_delay - (now() - data_started) >= COALESCE_MIN_TIME do @@ -245,6 +241,12 @@ function Queue:process_once() -- so that we can check for worker shutdown periodically. local wait_time = math_min(self.max_coalescing_delay - (now() - data_started), COALESCE_POLL_TIME) + if ngx.worker.exiting() then + -- minimize coalescing delay during shutdown to quickly process remaining entries + self.max_coalescing_delay = COALESCE_MIN_TIME + wait_time = COALESCE_MIN_TIME + end + ok, err = self.semaphore:wait(wait_time) if not ok and err ~= "timeout" then self:log_err("could not wait for semaphore: %s", err)