diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 9b13bb515..3b235aadb 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -22,6 +22,9 @@ APScheduler, see the :doc:`migration section `. - Fixed dialect name checks in the SQLAlchemy job store - Fixed JSON and CBOR serializers unable to serialize enums - Fixed infinite loop in CalendarIntervalTrigger with UTC timezone (PR by unights) +- Fixed scheduler not resuming job processing when ``max_concurrent_jobs`` had been + reached and then a job was completed, thus making job processing possible again + (PR by MohammadAmin Vahedinia) **4.0.0a4** diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 22c64654a..986083565 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -981,7 +981,7 @@ def _get_task_callable(self, task: Task) -> Callable: async def _process_jobs(self, *, task_status: TaskStatus) -> None: wakeup_event = anyio.Event() - async def job_added(event: Event) -> None: + async def check_queue_capacity(event: Event) -> None: if len(self._running_jobs) < self.max_concurrent_jobs: wakeup_event.set() @@ -993,7 +993,11 @@ async def job_added(event: Event) -> None: task_group = await exit_stack.enter_async_context(create_task_group()) # Fetch new jobs every time - exit_stack.enter_context(self.event_broker.subscribe(job_added, {JobAdded})) + exit_stack.enter_context( + self.event_broker.subscribe( + check_queue_capacity, {JobAdded, JobReleased} + ) + ) # Signal that we are ready, and wait for the scheduler start event task_status.started() diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 38c3aaf98..e9c3dfce1 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -15,7 +15,13 @@ import anyio import pytest -from anyio import WouldBlock, create_memory_object_stream, fail_after, sleep +from anyio import ( + Lock, + WouldBlock, + create_memory_object_stream, + fail_after, + sleep, +) from pytest import MonkeyPatch from pytest_mock import MockerFixture, MockFixture @@ -819,6 +825,28 @@ async def test_wait_until_stopped(self) -> None: # This should be a no-op await scheduler.wait_until_stopped() + async def test_max_concurrent_jobs(self) -> None: + lock = Lock() + scheduler = AsyncScheduler(max_concurrent_jobs=1) + tasks_done = 0 + + async def acquire_release() -> None: + nonlocal tasks_done + lock.acquire_nowait() + await sleep(0.1) + tasks_done += 1 + if tasks_done == 2: + await scheduler.stop() + + lock.release() + + with fail_after(3): + async with scheduler: + await scheduler.configure_task("dummyjob", func=acquire_release) + await scheduler.add_job("dummyjob") + await scheduler.add_job("dummyjob") + await scheduler.run_until_stopped() + class TestSyncScheduler: def test_configure(self) -> None: