From 0240dce08e84c5d2e8fa8c124947d703721870d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 13 Jul 2024 13:25:05 +0300 Subject: [PATCH] Fixed trigger serialization when pausing a schedule Fixes #923. --- docs/versionhistory.rst | 2 ++ src/apscheduler/_structures.py | 4 +-- tests/test_schedulers.py | 59 ++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index b041b5fd9..8a9261af1 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -11,6 +11,8 @@ APScheduler, see the :doc:`migration section `. - Added the ``psycopg`` event broker - Added useful indexes and removed useless ones in ``SQLAlchemyDatastore`` and ``MongoDBDataStore`` +- Fixed serialization error with ``CronTrigger`` when pausing a schedule + (`#923 `_) **4.0.0a5** diff --git a/src/apscheduler/_structures.py b/src/apscheduler/_structures.py index 183af280b..90b04095c 100644 --- a/src/apscheduler/_structures.py +++ b/src/apscheduler/_structures.py @@ -139,7 +139,7 @@ class Schedule: ) def marshal(self, serializer: Serializer) -> dict[str, Any]: - marshalled = attrs.asdict(self, value_serializer=serialize) + marshalled = attrs.asdict(self, recurse=False, value_serializer=serialize) marshalled["trigger"] = serializer.serialize(self.trigger) marshalled["args"] = serializer.serialize(self.args) marshalled["kwargs"] = serializer.serialize(self.kwargs) @@ -222,7 +222,7 @@ def original_scheduled_time(self) -> datetime | None: return self.scheduled_fire_time - self.jitter def marshal(self, serializer: Serializer) -> dict[str, Any]: - marshalled = attrs.asdict(self, value_serializer=serialize) + marshalled = attrs.asdict(self, recurse=False, value_serializer=serialize) marshalled["args"] = serializer.serialize(self.args) marshalled["kwargs"] = serializer.serialize(self.kwargs) if not self.acquired_by: diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 66bdac237..507234c0e 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -7,6 +7,7 @@ import threading import time from collections.abc import Callable +from contextlib import AsyncExitStack from datetime import datetime, timedelta, timezone from functools import partial from pathlib import Path @@ -55,6 +56,7 @@ from apscheduler.executors.async_ import AsyncJobExecutor from apscheduler.executors.subprocess import ProcessPoolJobExecutor from apscheduler.executors.thread import ThreadPoolJobExecutor +from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger @@ -858,6 +860,63 @@ async def acquire_release() -> None: await scheduler.add_job("dummyjob") await scheduler.run_until_stopped() + @pytest.mark.parametrize( + "trigger_type, run_job", + [ + pytest.param("cron", False, id="cron"), + pytest.param("date", True, id="date"), + ], + ) + async def test_pause_unpause_schedule( + self, + raw_datastore: DataStore, + timezone: ZoneInfo, + trigger_type: str, + run_job: bool, + ) -> None: + if trigger_type == "cron": + trigger = CronTrigger() + else: + trigger = DateTrigger(datetime.now(timezone)) + + async with AsyncExitStack() as exit_stack: + send, receive = create_memory_object_stream[Event](4) + exit_stack.enter_context(send) + exit_stack.enter_context(receive) + scheduler = await exit_stack.enter_async_context( + AsyncScheduler(data_store=raw_datastore, role=SchedulerRole.scheduler) + ) + schedule_id = await scheduler.add_schedule( + dummy_async_job, trigger, id="foo" + ) + scheduler.subscribe(send.send, {ScheduleUpdated, JobAdded}) + + # Pause the schedule and wait for the schedule update event + await scheduler.pause_schedule(schedule_id) + schedule = await scheduler.get_schedule(schedule_id) + assert schedule.paused + event = await receive.receive() + assert isinstance(event, ScheduleUpdated) + + if run_job: + # Make sure that no jobs are added when the scheduler is started + await scheduler.start_in_background() + assert not await scheduler.get_jobs() + + # Unpause the schedule and wait for the schedule update event + await scheduler.unpause_schedule(schedule_id) + schedule = await scheduler.get_schedule(schedule_id) + assert not schedule.paused + event = await receive.receive() + assert isinstance(event, ScheduleUpdated) + + if run_job: + with fail_after(3): + job_added_event = await receive.receive() + + assert isinstance(job_added_event, JobAdded) + assert job_added_event.schedule_id == schedule_id + class TestSyncScheduler: def test_configure(self) -> None: