Skip to content

Commit

Permalink
feat: Support pausing and unpausing schedules (#902)
Browse files Browse the repository at this point in the history
Closes #899.
  • Loading branch information
WillDaSilva authored May 9, 2024
1 parent 4ca9251 commit 9e9f905
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 10 deletions.
27 changes: 27 additions & 0 deletions docs/userguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,31 @@ the schedule you want to remove as an argument. This is the ID you got from
Note that removing a schedule does not cancel any jobs derived from it, but does prevent
further jobs from being created from that schedule.

Pausing schedules
-----------------

To pause a schedule, call :meth:`~Scheduler.pause_schedule`. Pass the identifier of the
schedule you want to pause as an argument. This is the ID you got from
:meth:`~Scheduler.add_schedule`.

Pausing a schedule prevents any new jobs from being created from it, but does not cancel
any jobs that have already been created from that schedule.

The schedule can be unpaused by calling :meth:`~Scheduler.unpause_schedule` with the
identifier of the schedule you want to unpause.

By default the schedule will retain the next fire time it had when it was paused, which
may result in the schedule being considered to have misfired when it is unpaused,
resulting in whatever misfire behavior it has configured
(see :ref:`controlling-how-much-a-job-can-be-started-late` for more details).

The ``resume_from`` parameter can be used to specify the time from which the schedule
should be resumed. This can be used to avoid the misfire behavior mentioned above. It
can be either a datetime object, or the string ``"now"`` as a convenient shorthand for
the current datetime. If this parameter is provided, the schedules trigger will be
repeatedly advanced to determine a next fire time that is at or after the specified time
to resume from.

Limiting the number of concurrently executing instances of a job
----------------------------------------------------------------

Expand All @@ -344,6 +369,8 @@ still running, the later job is terminated with the outcome of
To allow more jobs to be concurrently running for a task, pass the desired maximum
number as the ``max_running_jobs`` keyword argument to :meth:`~Scheduler.add_schedule`.

.. _controlling-how-much-a-job-can-be-started-late:

Controlling how much a job can be started late
----------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ APScheduler, see the :doc:`migration section <migration>`.
- **BREAKING** Made publishing ``JobReleased`` events the responsibility of the
``DataStore`` implementation, rather than the scheduler, for consistency with the
``acquire_jobs()`` method
- Added the ability to pause and unpause schedules (PR by @WillDaSilva)
- Fixed large parts of ``MongoDBDataStore`` still calling blocking functions in the
event loop thread
- Fixed JSON serialization of triggers that had been used at least once
Expand Down
57 changes: 56 additions & 1 deletion src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from inspect import isbuiltin, isclass, ismethod, ismodule
from logging import Logger, getLogger
from types import TracebackType
from typing import Any, Callable, Iterable, Mapping, cast, overload
from typing import Any, Callable, Iterable, Literal, Mapping, cast, overload
from uuid import UUID, uuid4

import anyio
Expand Down Expand Up @@ -410,6 +410,7 @@ async def add_schedule(
id: str | None = None,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
paused: bool = False,
job_executor: str | UnsetValue = unset,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None | UnsetValue = unset,
Expand All @@ -427,6 +428,7 @@ async def add_schedule(
based ID will be assigned)
:param args: positional arguments to be passed to the task function
:param kwargs: keyword arguments to be passed to the task function
:param paused: whether the schedule is paused
:param job_executor: name of the job executor to run the task with
:param coalesce: determines what to do when processing the schedule if multiple
fire times have become due for this schedule since the last processing
Expand Down Expand Up @@ -478,6 +480,7 @@ async def add_schedule(
trigger=trigger,
args=args,
kwargs=kwargs,
paused=paused,
coalesce=coalesce,
misfire_grace_time=task.misfire_grace_time
if misfire_grace_time is unset
Expand Down Expand Up @@ -529,6 +532,58 @@ async def remove_schedule(self, id: str) -> None:
self._check_initialized()
await self.data_store.remove_schedules({id})

async def pause_schedule(self, id: str) -> None:
"""Pause the specified schedule."""
self._check_initialized()
await self.data_store.add_schedule(
schedule=attrs.evolve(await self.get_schedule(id), paused=True),
conflict_policy=ConflictPolicy.replace,
)

async def unpause_schedule(
self,
id: str,
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
"""
Unpause the specified schedule.
:param resume_from: the time to resume the schedules from, or ``'now'`` as a
shorthand for ``datetime.now(tz=UTC)`` or ``None`` to resume from where the
schedule left off which may cause it to misfire
"""
self._check_initialized()
schedule = await self.get_schedule(id)

if resume_from == "now":
resume_from = datetime.now(tz=timezone.utc)

if resume_from is None:
next_fire_time = schedule.next_fire_time
elif (
schedule.next_fire_time is not None
and schedule.next_fire_time >= resume_from
):
next_fire_time = schedule.next_fire_time
else:
# Advance `next_fire_time` until its at or past `resume_from`, or until it's
# exhausted
while next_fire_time := schedule.trigger.next():
if next_fire_time is None or next_fire_time >= resume_from:
break

await self.data_store.add_schedule(
schedule=attrs.evolve(
schedule,
paused=False,
next_fire_time=next_fire_time,
),
conflict_policy=ConflictPolicy.replace,
)

async def add_job(
self,
func_or_task_id: TaskType,
Expand Down
25 changes: 23 additions & 2 deletions src/apscheduler/_schedulers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import threading
from collections.abc import MutableMapping, Sequence
from contextlib import ExitStack
from datetime import timedelta
from datetime import datetime, timedelta
from functools import partial
from logging import Logger
from types import TracebackType
from typing import Any, Callable, Iterable, Mapping, overload
from typing import Any, Callable, Iterable, Literal, Mapping, overload
from uuid import UUID

from anyio.from_thread import BlockingPortal, start_blocking_portal
Expand Down Expand Up @@ -238,6 +238,7 @@ def add_schedule(
id: str | None = None,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
paused: bool = False,
job_executor: str | UnsetValue = unset,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None | UnsetValue = unset,
Expand All @@ -254,6 +255,7 @@ def add_schedule(
id=id,
args=args,
kwargs=kwargs,
paused=paused,
job_executor=job_executor,
coalesce=coalesce,
misfire_grace_time=misfire_grace_time,
Expand All @@ -275,6 +277,25 @@ def remove_schedule(self, id: str) -> None:
self._ensure_services_ready()
self._portal.call(self._async_scheduler.remove_schedule, id)

def pause_schedule(self, id: str) -> None:
self._ensure_services_ready()
self._portal.call(self._async_scheduler.pause_schedule, id)

def unpause_schedule(
self,
id: str,
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
self._ensure_services_ready()
self._portal.call(
partial(
self._async_scheduler.unpause_schedule,
id,
resume_from=resume_from,
)
)

def add_job(
self,
func_or_task_id: TaskType,
Expand Down
2 changes: 2 additions & 0 deletions src/apscheduler/_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Schedule:
:var str task_id: unique identifier of the task to be run on this schedule
:var tuple args: positional arguments to pass to the task callable
:var dict[str, Any] kwargs: keyword arguments to pass to the task callable
:var bool paused: whether the schedule is paused
:var CoalescePolicy coalesce: determines what to do when processing the schedule if
multiple fire times have become due for this schedule since the last processing
:var ~datetime.timedelta | None misfire_grace_time: maximum number of seconds the
Expand Down Expand Up @@ -105,6 +106,7 @@ class Schedule:
kwargs: dict[str, Any] = attrs.field(
eq=False, order=False, converter=dict, default=()
)
paused: bool = attrs.field(eq=False, order=False, default=False)
coalesce: CoalescePolicy = attrs.field(
eq=False,
order=False,
Expand Down
6 changes: 5 additions & 1 deletion src/apscheduler/datastores/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,12 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
schedules: list[Schedule] = []
for state in self._schedules:
if state.next_fire_time is None or state.next_fire_time > now:
# The schedule is either paused or not yet due
# The schedule is either exhausted or not yet due. There will be no
# schedules that are due after this one, so we can stop here.
break
elif state.schedule.paused:
# The schedule is paused
continue
elif state.acquired_by is not None:
if state.acquired_by != scheduler_id and now <= state.acquired_until:
# The schedule has been acquired by another scheduler and the
Expand Down
16 changes: 13 additions & 3 deletions src/apscheduler/datastores/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,19 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
lambda: self._schedules.find(
{
"next_fire_time": {"$lte": now},
"$or": [
{"acquired_until": {"$exists": False}},
{"acquired_until": {"$lt": now}},
"$and": [
{
"$or": [
{"paused": {"$exists": False}},
{"paused": False},
]
},
{
"$or": [
{"acquired_until": {"$exists": False}},
{"acquired_until": {"$lt": now}},
]
},
],
},
session=session,
Expand Down
9 changes: 8 additions & 1 deletion src/apscheduler/datastores/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from anyio import CancelScope, to_thread
from sqlalchemy import (
BigInteger,
Boolean,
Column,
DateTime,
Enum,
Expand All @@ -31,6 +32,7 @@
Uuid,
and_,
bindparam,
false,
or_,
select,
)
Expand Down Expand Up @@ -293,6 +295,7 @@ def get_table_definitions(self) -> MetaData:
Column("trigger", LargeBinary),
Column("args", LargeBinary),
Column("kwargs", LargeBinary),
Column("paused", Boolean, nullable=False, server_default=literal(False)),
Column("coalesce", Enum(CoalescePolicy), nullable=False),
Column("misfire_grace_time", interval_type),
Column("max_jitter", interval_type),
Expand Down Expand Up @@ -600,6 +603,7 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
and_(
self._t_schedules.c.next_fire_time.isnot(None),
comparison,
self._t_schedules.c.paused == false(),
or_(
self._t_schedules.c.acquired_until.is_(None),
self._t_schedules.c.acquired_until < now,
Expand Down Expand Up @@ -752,7 +756,10 @@ async def get_next_schedule_run_time(self) -> datetime | None:

statenent = (
select(*columns)
.where(self._t_schedules.c.next_fire_time.isnot(None))
.where(
self._t_schedules.c.next_fire_time.isnot(None),
self._t_schedules.c.paused == false(),
)
.order_by(self._t_schedules.c.next_fire_time)
.limit(1)
)
Expand Down
26 changes: 24 additions & 2 deletions tests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ async def test_configure_task(self, raw_datastore: DataStore) -> None:
assert isinstance(event, TaskUpdated)
assert event.task_id == "mytask"

async def test_add_remove_schedule(
async def test_add_pause_unpause_remove_schedule(
self, raw_datastore: DataStore, timezone: ZoneInfo
) -> None:
send, receive = create_memory_object_stream[Event](3)
send, receive = create_memory_object_stream[Event](5)
async with AsyncScheduler(data_store=raw_datastore) as scheduler:
scheduler.subscribe(send.send)
now = datetime.now(timezone)
Expand All @@ -210,6 +210,16 @@ async def test_add_remove_schedule(
assert schedules[0].id == "foo"
assert schedules[0].task_id == f"{__name__}:dummy_async_job"

await scheduler.pause_schedule("foo")
schedule = await scheduler.get_schedule("foo")
assert schedule.paused
assert schedule.next_fire_time == now

await scheduler.unpause_schedule("foo")
schedule = await scheduler.get_schedule("foo")
assert not schedule.paused
assert schedule.next_fire_time == now

await scheduler.remove_schedule(schedule_id)
assert not await scheduler.get_schedules()

Expand All @@ -224,6 +234,18 @@ async def test_add_remove_schedule(
assert event.task_id == f"{__name__}:dummy_async_job"
assert event.next_fire_time == now

event = await receive.receive()
assert isinstance(event, ScheduleUpdated)
assert event.schedule_id == "foo"
assert event.task_id == f"{__name__}:dummy_async_job"
assert event.next_fire_time == now

event = await receive.receive()
assert isinstance(event, ScheduleUpdated)
assert event.schedule_id == "foo"
assert event.task_id == f"{__name__}:dummy_async_job"
assert event.next_fire_time == now

event = await receive.receive()
assert isinstance(event, ScheduleRemoved)
assert event.schedule_id == "foo"
Expand Down

0 comments on commit 9e9f905

Please sign in to comment.