From 8feb98fa0e671f85049603694663b7bb471be974 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Fri, 10 May 2024 11:16:38 +0300 Subject: [PATCH] Moved/added Job related fields --- docs/versionhistory.rst | 3 +++ src/apscheduler/_events.py | 15 +++++++++++++++ src/apscheduler/_schedulers/async_.py | 9 ++++----- src/apscheduler/_structures.py | 16 +++++++++++----- src/apscheduler/datastores/sqlalchemy.py | 4 ++-- tests/test_schedulers.py | 13 ++++++++++++- tests/test_serializers.py | 3 +++ 7 files changed, 50 insertions(+), 13 deletions(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 9cd5fa490..aa78f7ad5 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -15,7 +15,10 @@ APScheduler, see the :doc:`migration section `. - **BREAKING** Made publishing ``JobReleased`` events the responsibility of the ``DataStore`` implementation, rather than the scheduler, for consistency with the ``acquire_jobs()`` method +- **BREAKING** The ``started_at`` field was moved from ``Job`` to ``JobResult`` - Added the ability to pause and unpause schedules (PR by @WillDaSilva) +- Added the ``scheduled_start`` field to the ``JobAcquired`` event +- Added the ``scheduled_start`` and ``started_at`` fields to the ``JobReleased`` event - Fixed large parts of ``MongoDBDataStore`` still calling blocking functions in the event loop thread - Fixed JSON serialization of triggers that had been used at least once diff --git a/src/apscheduler/_events.py b/src/apscheduler/_events.py index d9297ff1b..be2b9fa26 100644 --- a/src/apscheduler/_events.py +++ b/src/apscheduler/_events.py @@ -214,12 +214,17 @@ class JobAcquired(SchedulerEvent): :param job_id: the ID of the job that was acquired :param scheduler_id: the ID of the scheduler that acquired the job + :param task_id: ID of the task the job belongs to + :param schedule_id: ID of the schedule that + :param scheduled_start: the time the job was scheduled to start via a schedule (if + any) """ job_id: UUID = attrs.field(converter=as_uuid) scheduler_id: str task_id: str schedule_id: str | None = None + scheduled_start: datetime | None = attrs.field(converter=as_aware_datetime) @classmethod def from_job(cls, job: Job, scheduler_id: str) -> JobAcquired: @@ -236,6 +241,7 @@ def from_job(cls, job: Job, scheduler_id: str) -> JobAcquired: scheduler_id=scheduler_id, task_id=job.task_id, schedule_id=job.schedule_id, + scheduled_start=job.scheduled_fire_time, ) @@ -246,6 +252,10 @@ class JobReleased(SchedulerEvent): :param uuid.UUID job_id: the ID of the job that was released :param scheduler_id: the ID of the scheduler that released the job + :param scheduled_start: the time the job was scheduled to start via a schedule (if + any) + :param started_at: the time the executor actually started running the job (``None`` + if the job was skipped due to missing its start deadline) :param outcome: the outcome of the job :param exception_type: the fully qualified name of the exception if ``outcome`` is :attr:`JobOutcome.error` @@ -259,6 +269,8 @@ class JobReleased(SchedulerEvent): scheduler_id: str task_id: str schedule_id: str | None = None + scheduled_start: datetime | None = attrs.field(converter=as_aware_datetime) + started_at: datetime | None = attrs.field(converter=as_aware_datetime) outcome: JobOutcome = attrs.field(converter=as_enum(JobOutcome)) exception_type: str | None = None exception_message: str | None = None @@ -285,11 +297,14 @@ def from_result(cls, job: Job, result: JobResult, scheduler_id: str) -> JobRelea exception_type = exception_message = exception_traceback = None return cls( + timestamp=result.finished_at, job_id=result.job_id, scheduler_id=scheduler_id, task_id=job.task_id, schedule_id=job.schedule_id, outcome=result.outcome, + scheduled_start=job.scheduled_fire_time, + started_at=result.started_at, exception_type=exception_type, exception_message=exception_message, exception_traceback=exception_traceback, diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 986083565..00e3772a1 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -1024,9 +1024,7 @@ async def _run_job(self, job: Job, func: Callable, executor: str) -> None: start_time = datetime.now(timezone.utc) if job.start_deadline is not None and start_time > job.start_deadline: result = JobResult.from_job( - job, - outcome=JobOutcome.missed_start_deadline, - finished_at=start_time, + job, JobOutcome.missed_start_deadline, finished_at=start_time ) await self.data_store.release_job(self.identity, job, result) return @@ -1043,8 +1041,7 @@ async def _run_job(self, job: Job, func: Callable, executor: str) -> None: self.logger.info("Job %s was cancelled", job.id) with CancelScope(shield=True): result = JobResult.from_job( - job, - outcome=JobOutcome.cancelled, + job, JobOutcome.cancelled, started_at=start_time ) await self.data_store.release_job(self.identity, job, result) except BaseException as exc: @@ -1058,6 +1055,7 @@ async def _run_job(self, job: Job, func: Callable, executor: str) -> None: result = JobResult.from_job( job, JobOutcome.error, + started_at=start_time, exception=exc, ) await self.data_store.release_job(self.identity, job, result) @@ -1068,6 +1066,7 @@ async def _run_job(self, job: Job, func: Callable, executor: str) -> None: result = JobResult.from_job( job, JobOutcome.success, + started_at=start_time, return_value=retval, ) await self.data_store.release_job(self.identity, job, result) diff --git a/src/apscheduler/_structures.py b/src/apscheduler/_structures.py index 55e2d9a1c..1f7e372e0 100644 --- a/src/apscheduler/_structures.py +++ b/src/apscheduler/_structures.py @@ -211,9 +211,6 @@ class Job: converter=as_aware_datetime, factory=partial(datetime.now, timezone.utc), ) - started_at: datetime | None = attrs.field( - eq=False, order=False, converter=as_aware_datetime, default=None - ) acquired_by: str | None = attrs.field(eq=False, order=False, default=None) acquired_until: datetime | None = attrs.field( eq=False, order=False, converter=as_aware_datetime, default=None @@ -251,7 +248,10 @@ class JobResult: :var ~uuid.UUID job_id: the unique identifier of the job :var JobOutcome outcome: indicates how the job ended - :var ~datetime.datetime finished_at: the time when the job ended + :var ~datetime.datetime started_at: the time when the job was submitted to the + executor (``None`` if the job never started in the first place) + :var ~datetime.datetime finished_at: the time when the job ended (``None`` if the + job never started in the first place) :var BaseException | None exception: the exception object if the job ended due to an exception being raised :var return_value: the return value from the task function (if the job ran to @@ -262,11 +262,15 @@ class JobResult: outcome: JobOutcome = attrs.field( eq=False, order=False, converter=as_enum(JobOutcome) ) + started_at: datetime | None = attrs.field( + eq=False, + order=False, + converter=as_aware_datetime, + ) finished_at: datetime = attrs.field( eq=False, order=False, converter=as_aware_datetime, - factory=partial(datetime.now, timezone.utc), ) expires_at: datetime = attrs.field( eq=False, converter=as_aware_datetime, order=False @@ -281,6 +285,7 @@ def from_job( outcome: JobOutcome, *, finished_at: datetime | None = None, + started_at: datetime | None = None, exception: BaseException | None = None, return_value: Any = None, ) -> JobResult: @@ -289,6 +294,7 @@ def from_job( return cls( job_id=job.id, outcome=outcome, + started_at=started_at, finished_at=real_finished_at, expires_at=expires_at, exception=exception, diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 17f2ec132..fbbebf430 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -317,7 +317,6 @@ def get_table_definitions(self) -> MetaData: Column("start_deadline", timestamp_type), Column("result_expiration_time", interval_type), Column("created_at", timestamp_type, nullable=False), - Column("started_at", timestamp_type), Column("acquired_by", Unicode(500)), Column("acquired_until", timestamp_type), ) @@ -326,7 +325,8 @@ def get_table_definitions(self) -> MetaData: metadata, Column("job_id", Uuid, primary_key=True), Column("outcome", Enum(JobOutcome), nullable=False), - Column("finished_at", timestamp_type, index=True), + Column("started_at", timestamp_type, index=True), + Column("finished_at", timestamp_type, nullable=False, index=True), Column("expires_at", timestamp_type, nullable=False, index=True), Column("exception", LargeBinary), Column("return_value", LargeBinary), diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index e9c3dfce1..520dc8adf 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -294,12 +294,16 @@ async def test_add_job_wait_result(self, raw_datastore: DataStore) -> None: assert event.job_id == job_id assert event.task_id == f"{__name__}:dummy_async_job" assert event.schedule_id is None + assert event.scheduled_start is None + acquired_at = event.timestamp event = await receive.receive() assert isinstance(event, JobReleased) assert event.job_id == job_id assert event.task_id == f"{__name__}:dummy_async_job" assert event.schedule_id is None + assert event.scheduled_start is None + assert event.started_at > acquired_at assert event.outcome is JobOutcome.success result = await scheduler.get_job_result(job_id) @@ -341,7 +345,9 @@ async def test_run_job(self, raw_datastore: DataStore, success: bool) -> None: assert event.job_id == job_id assert event.task_id == f"{__name__}:dummy_async_job" assert event.schedule_id is None + assert event.scheduled_start is None assert event.scheduler_id == scheduler.identity + acquired_at = event.timestamp # The scheduler released the job event = await receive.receive() @@ -349,6 +355,8 @@ async def test_run_job(self, raw_datastore: DataStore, success: bool) -> None: assert event.job_id == job_id assert event.task_id == f"{__name__}:dummy_async_job" assert event.schedule_id is None + assert event.scheduled_start is None + assert event.started_at > acquired_at assert event.scheduler_id == scheduler.identity # The scheduler was stopped @@ -424,7 +432,8 @@ async def test_scheduled_job_missed_deadline( self, raw_datastore: DataStore, timezone: ZoneInfo ) -> None: send, receive = create_memory_object_stream[Event](4) - trigger = DateTrigger(datetime.now(timezone) - timedelta(seconds=1)) + one_second_in_past = datetime.now(timezone) - timedelta(seconds=1) + trigger = DateTrigger(one_second_in_past) async with AsyncScheduler(data_store=raw_datastore) as scheduler: await scheduler.add_schedule( dummy_async_job, trigger, misfire_grace_time=0, id="foo" @@ -462,6 +471,8 @@ async def test_scheduled_job_missed_deadline( assert event.job_id == job_id assert event.task_id == "test_schedulers:dummy_async_job" assert event.schedule_id == "foo" + assert event.scheduled_start == one_second_in_past + assert event.started_at is None assert event.outcome is JobOutcome.missed_start_deadline # The scheduler was stopped diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 5bd52b5f5..f58dc87d5 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import datetime, timezone from uuid import uuid4 import pytest @@ -26,6 +27,8 @@ task_id="task", schedule_id="schedule", outcome=JobOutcome.success, + scheduled_start=datetime.now(timezone.utc), + started_at=datetime.now(timezone.utc), ), id="job_released", ),