diff --git a/estuary-cdk/estuary_cdk/capture/common.py b/estuary-cdk/estuary_cdk/capture/common.py index d4b410b70f..84f84f100a 100644 --- a/estuary-cdk/estuary_cdk/capture/common.py +++ b/estuary-cdk/estuary_cdk/capture/common.py @@ -453,6 +453,10 @@ async def _binding_snapshot_task( ) while True: + # Yield to the event loop to prevent starvation. + # Note that wait_for does *not* yield if sleep_for has already elapsed. + await asyncio.sleep(0) + next_sync = state.updated_at + binding.resourceConfig.interval sleep_for = next_sync - datetime.now(tz=UTC) @@ -523,6 +527,9 @@ async def _binding_backfill_task( task.log.info(f"beginning backfill", state) while True: + # Yield to the event loop to prevent starvation. + await asyncio.sleep(0) + page, next_page = await fetch_page(task.log, state.next_page, state.cutoff) for doc in page: task.captured(binding_index, doc) @@ -553,6 +560,8 @@ async def _binding_incremental_task( task.log.info(f"resuming incremental replication", state) while True: + # Yield to the event loop to prevent starvation. + await asyncio.sleep(0) checkpoints = 0 pending = False