Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wakeup worker when there are resources freed up #882

Merged
merged 7 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ APScheduler, see the :doc:`migration section <migration>`.
- Fixed dialect name checks in the SQLAlchemy job store
- Fixed JSON and CBOR serializers unable to serialize enums
- Fixed infinite loop in CalendarIntervalTrigger with UTC timezone (PR by unights)
- Fixed scheduler not resuming job processing when ``max_concurrent_jobs`` had been
reached and then a job was completed, thus making job processing possible again
(PR by MohammadAmin Vahedinia)

**4.0.0a4**

Expand Down
8 changes: 6 additions & 2 deletions src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ def _get_task_callable(self, task: Task) -> Callable:
async def _process_jobs(self, *, task_status: TaskStatus) -> None:
wakeup_event = anyio.Event()

async def job_added(event: Event) -> None:
async def check_queue_capacity(event: Event) -> None:
if len(self._running_jobs) < self.max_concurrent_jobs:
wakeup_event.set()

Expand All @@ -993,7 +993,11 @@ async def job_added(event: Event) -> None:
task_group = await exit_stack.enter_async_context(create_task_group())

# Fetch new jobs every time
exit_stack.enter_context(self.event_broker.subscribe(job_added, {JobAdded}))
exit_stack.enter_context(
self.event_broker.subscribe(
check_queue_capacity, {JobAdded, JobReleased}
)
)

# Signal that we are ready, and wait for the scheduler start event
task_status.started()
Expand Down
30 changes: 29 additions & 1 deletion tests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@

import anyio
import pytest
from anyio import WouldBlock, create_memory_object_stream, fail_after, sleep
from anyio import (
Lock,
WouldBlock,
create_memory_object_stream,
fail_after,
sleep,
)
from pytest import MonkeyPatch
from pytest_mock import MockerFixture, MockFixture

Expand Down Expand Up @@ -819,6 +825,28 @@ async def test_wait_until_stopped(self) -> None:
# This should be a no-op
await scheduler.wait_until_stopped()

async def test_max_concurrent_jobs(self) -> None:
lock = Lock()
scheduler = AsyncScheduler(max_concurrent_jobs=1)
tasks_done = 0

async def acquire_release() -> None:
nonlocal tasks_done
lock.acquire_nowait()
await sleep(0.1)
tasks_done += 1
if tasks_done == 2:
await scheduler.stop()

lock.release()

with fail_after(3):
async with scheduler:
await scheduler.configure_task("dummyjob", func=acquire_release)
await scheduler.add_job("dummyjob")
await scheduler.add_job("dummyjob")
await scheduler.run_until_stopped()


class TestSyncScheduler:
def test_configure(self) -> None:
Expand Down
Loading