From 9d163e0fa59c93a36a09a54588bfb29dea88b025 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Mon, 4 Mar 2024 23:18:16 +0000 Subject: [PATCH] estuary-cdk: `common` routines should yield to the event loop Ensure all tasks get a chance to make progress between each tasks's iterations. --- estuary-cdk/estuary_cdk/capture/common.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/estuary-cdk/estuary_cdk/capture/common.py b/estuary-cdk/estuary_cdk/capture/common.py index d4b410b70f..84f84f100a 100644 --- a/estuary-cdk/estuary_cdk/capture/common.py +++ b/estuary-cdk/estuary_cdk/capture/common.py @@ -453,6 +453,10 @@ async def _binding_snapshot_task( ) while True: + # Yield to the event loop to prevent starvation. + # Note that wait_for does *not* yield if sleep_for has already elapsed. + await asyncio.sleep(0) + next_sync = state.updated_at + binding.resourceConfig.interval sleep_for = next_sync - datetime.now(tz=UTC) @@ -523,6 +527,9 @@ async def _binding_backfill_task( task.log.info(f"beginning backfill", state) while True: + # Yield to the event loop to prevent starvation. + await asyncio.sleep(0) + page, next_page = await fetch_page(task.log, state.next_page, state.cutoff) for doc in page: task.captured(binding_index, doc) @@ -553,6 +560,8 @@ async def _binding_incremental_task( task.log.info(f"resuming incremental replication", state) while True: + # Yield to the event loop to prevent starvation. + await asyncio.sleep(0) checkpoints = 0 pending = False