Skip to content

Commit

Permalink
Fixed trigger serialization when pausing a schedule
Browse files Browse the repository at this point in the history
Fixes #923.
  • Loading branch information
agronholm committed Jul 13, 2024
1 parent a956ae3 commit 0240dce
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 2 deletions.
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ APScheduler, see the :doc:`migration section <migration>`.
- Added the ``psycopg`` event broker
- Added useful indexes and removed useless ones in ``SQLAlchemyDatastore`` and
``MongoDBDataStore``
- Fixed serialization error with ``CronTrigger`` when pausing a schedule
(`#923 <https://github.com/agronholm/apscheduler/issues/923>`_)

**4.0.0a5**

Expand Down
4 changes: 2 additions & 2 deletions src/apscheduler/_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class Schedule:
)

def marshal(self, serializer: Serializer) -> dict[str, Any]:
marshalled = attrs.asdict(self, value_serializer=serialize)
marshalled = attrs.asdict(self, recurse=False, value_serializer=serialize)
marshalled["trigger"] = serializer.serialize(self.trigger)
marshalled["args"] = serializer.serialize(self.args)
marshalled["kwargs"] = serializer.serialize(self.kwargs)
Expand Down Expand Up @@ -222,7 +222,7 @@ def original_scheduled_time(self) -> datetime | None:
return self.scheduled_fire_time - self.jitter

def marshal(self, serializer: Serializer) -> dict[str, Any]:
marshalled = attrs.asdict(self, value_serializer=serialize)
marshalled = attrs.asdict(self, recurse=False, value_serializer=serialize)
marshalled["args"] = serializer.serialize(self.args)
marshalled["kwargs"] = serializer.serialize(self.kwargs)
if not self.acquired_by:
Expand Down
59 changes: 59 additions & 0 deletions tests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import threading
import time
from collections.abc import Callable
from contextlib import AsyncExitStack
from datetime import datetime, timedelta, timezone
from functools import partial
from pathlib import Path
Expand Down Expand Up @@ -55,6 +56,7 @@
from apscheduler.executors.async_ import AsyncJobExecutor
from apscheduler.executors.subprocess import ProcessPoolJobExecutor
from apscheduler.executors.thread import ThreadPoolJobExecutor
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger

Expand Down Expand Up @@ -858,6 +860,63 @@ async def acquire_release() -> None:
await scheduler.add_job("dummyjob")
await scheduler.run_until_stopped()

@pytest.mark.parametrize(
"trigger_type, run_job",
[
pytest.param("cron", False, id="cron"),
pytest.param("date", True, id="date"),
],
)
async def test_pause_unpause_schedule(
self,
raw_datastore: DataStore,
timezone: ZoneInfo,
trigger_type: str,
run_job: bool,
) -> None:
if trigger_type == "cron":
trigger = CronTrigger()
else:
trigger = DateTrigger(datetime.now(timezone))

async with AsyncExitStack() as exit_stack:
send, receive = create_memory_object_stream[Event](4)
exit_stack.enter_context(send)
exit_stack.enter_context(receive)
scheduler = await exit_stack.enter_async_context(
AsyncScheduler(data_store=raw_datastore, role=SchedulerRole.scheduler)
)
schedule_id = await scheduler.add_schedule(
dummy_async_job, trigger, id="foo"
)
scheduler.subscribe(send.send, {ScheduleUpdated, JobAdded})

# Pause the schedule and wait for the schedule update event
await scheduler.pause_schedule(schedule_id)
schedule = await scheduler.get_schedule(schedule_id)
assert schedule.paused
event = await receive.receive()
assert isinstance(event, ScheduleUpdated)

if run_job:
# Make sure that no jobs are added when the scheduler is started
await scheduler.start_in_background()
assert not await scheduler.get_jobs()

# Unpause the schedule and wait for the schedule update event
await scheduler.unpause_schedule(schedule_id)
schedule = await scheduler.get_schedule(schedule_id)
assert not schedule.paused
event = await receive.receive()
assert isinstance(event, ScheduleUpdated)

if run_job:
with fail_after(3):
job_added_event = await receive.receive()

assert isinstance(job_added_event, JobAdded)
assert job_added_event.schedule_id == schedule_id


class TestSyncScheduler:
def test_configure(self) -> None:
Expand Down

0 comments on commit 0240dce

Please sign in to comment.