Skip to content

Commit

Permalink
Moved/added Job related fields
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed May 11, 2024
1 parent 3390502 commit 8feb98f
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 13 deletions.
3 changes: 3 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ APScheduler, see the :doc:`migration section <migration>`.
- **BREAKING** Made publishing ``JobReleased`` events the responsibility of the
``DataStore`` implementation, rather than the scheduler, for consistency with the
``acquire_jobs()`` method
- **BREAKING** The ``started_at`` field was moved from ``Job`` to ``JobResult``
- Added the ability to pause and unpause schedules (PR by @WillDaSilva)
- Added the ``scheduled_start`` field to the ``JobAcquired`` event
- Added the ``scheduled_start`` and ``started_at`` fields to the ``JobReleased`` event
- Fixed large parts of ``MongoDBDataStore`` still calling blocking functions in the
event loop thread
- Fixed JSON serialization of triggers that had been used at least once
Expand Down
15 changes: 15 additions & 0 deletions src/apscheduler/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,17 @@ class JobAcquired(SchedulerEvent):
:param job_id: the ID of the job that was acquired
:param scheduler_id: the ID of the scheduler that acquired the job
:param task_id: ID of the task the job belongs to
:param schedule_id: ID of the schedule that
:param scheduled_start: the time the job was scheduled to start via a schedule (if
any)
"""

job_id: UUID = attrs.field(converter=as_uuid)
scheduler_id: str
task_id: str
schedule_id: str | None = None
scheduled_start: datetime | None = attrs.field(converter=as_aware_datetime)

@classmethod
def from_job(cls, job: Job, scheduler_id: str) -> JobAcquired:
Expand All @@ -236,6 +241,7 @@ def from_job(cls, job: Job, scheduler_id: str) -> JobAcquired:
scheduler_id=scheduler_id,
task_id=job.task_id,
schedule_id=job.schedule_id,
scheduled_start=job.scheduled_fire_time,
)


Expand All @@ -246,6 +252,10 @@ class JobReleased(SchedulerEvent):
:param uuid.UUID job_id: the ID of the job that was released
:param scheduler_id: the ID of the scheduler that released the job
:param scheduled_start: the time the job was scheduled to start via a schedule (if
any)
:param started_at: the time the executor actually started running the job (``None``
if the job was skipped due to missing its start deadline)
:param outcome: the outcome of the job
:param exception_type: the fully qualified name of the exception if ``outcome`` is
:attr:`JobOutcome.error`
Expand All @@ -259,6 +269,8 @@ class JobReleased(SchedulerEvent):
scheduler_id: str
task_id: str
schedule_id: str | None = None
scheduled_start: datetime | None = attrs.field(converter=as_aware_datetime)
started_at: datetime | None = attrs.field(converter=as_aware_datetime)
outcome: JobOutcome = attrs.field(converter=as_enum(JobOutcome))
exception_type: str | None = None
exception_message: str | None = None
Expand All @@ -285,11 +297,14 @@ def from_result(cls, job: Job, result: JobResult, scheduler_id: str) -> JobRelea
exception_type = exception_message = exception_traceback = None

return cls(
timestamp=result.finished_at,
job_id=result.job_id,
scheduler_id=scheduler_id,
task_id=job.task_id,
schedule_id=job.schedule_id,
outcome=result.outcome,
scheduled_start=job.scheduled_fire_time,
started_at=result.started_at,
exception_type=exception_type,
exception_message=exception_message,
exception_traceback=exception_traceback,
Expand Down
9 changes: 4 additions & 5 deletions src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,9 +1024,7 @@ async def _run_job(self, job: Job, func: Callable, executor: str) -> None:
start_time = datetime.now(timezone.utc)
if job.start_deadline is not None and start_time > job.start_deadline:
result = JobResult.from_job(
job,
outcome=JobOutcome.missed_start_deadline,
finished_at=start_time,
job, JobOutcome.missed_start_deadline, finished_at=start_time
)
await self.data_store.release_job(self.identity, job, result)
return
Expand All @@ -1043,8 +1041,7 @@ async def _run_job(self, job: Job, func: Callable, executor: str) -> None:
self.logger.info("Job %s was cancelled", job.id)
with CancelScope(shield=True):
result = JobResult.from_job(
job,
outcome=JobOutcome.cancelled,
job, JobOutcome.cancelled, started_at=start_time
)
await self.data_store.release_job(self.identity, job, result)
except BaseException as exc:
Expand All @@ -1058,6 +1055,7 @@ async def _run_job(self, job: Job, func: Callable, executor: str) -> None:
result = JobResult.from_job(
job,
JobOutcome.error,
started_at=start_time,
exception=exc,
)
await self.data_store.release_job(self.identity, job, result)
Expand All @@ -1068,6 +1066,7 @@ async def _run_job(self, job: Job, func: Callable, executor: str) -> None:
result = JobResult.from_job(
job,
JobOutcome.success,
started_at=start_time,
return_value=retval,
)
await self.data_store.release_job(self.identity, job, result)
Expand Down
16 changes: 11 additions & 5 deletions src/apscheduler/_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,6 @@ class Job:
converter=as_aware_datetime,
factory=partial(datetime.now, timezone.utc),
)
started_at: datetime | None = attrs.field(
eq=False, order=False, converter=as_aware_datetime, default=None
)
acquired_by: str | None = attrs.field(eq=False, order=False, default=None)
acquired_until: datetime | None = attrs.field(
eq=False, order=False, converter=as_aware_datetime, default=None
Expand Down Expand Up @@ -251,7 +248,10 @@ class JobResult:
:var ~uuid.UUID job_id: the unique identifier of the job
:var JobOutcome outcome: indicates how the job ended
:var ~datetime.datetime finished_at: the time when the job ended
:var ~datetime.datetime started_at: the time when the job was submitted to the
executor (``None`` if the job never started in the first place)
:var ~datetime.datetime finished_at: the time when the job ended (``None`` if the
job never started in the first place)
:var BaseException | None exception: the exception object if the job ended due to an
exception being raised
:var return_value: the return value from the task function (if the job ran to
Expand All @@ -262,11 +262,15 @@ class JobResult:
outcome: JobOutcome = attrs.field(
eq=False, order=False, converter=as_enum(JobOutcome)
)
started_at: datetime | None = attrs.field(
eq=False,
order=False,
converter=as_aware_datetime,
)
finished_at: datetime = attrs.field(
eq=False,
order=False,
converter=as_aware_datetime,
factory=partial(datetime.now, timezone.utc),
)
expires_at: datetime = attrs.field(
eq=False, converter=as_aware_datetime, order=False
Expand All @@ -281,6 +285,7 @@ def from_job(
outcome: JobOutcome,
*,
finished_at: datetime | None = None,
started_at: datetime | None = None,
exception: BaseException | None = None,
return_value: Any = None,
) -> JobResult:
Expand All @@ -289,6 +294,7 @@ def from_job(
return cls(
job_id=job.id,
outcome=outcome,
started_at=started_at,
finished_at=real_finished_at,
expires_at=expires_at,
exception=exception,
Expand Down
4 changes: 2 additions & 2 deletions src/apscheduler/datastores/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ def get_table_definitions(self) -> MetaData:
Column("start_deadline", timestamp_type),
Column("result_expiration_time", interval_type),
Column("created_at", timestamp_type, nullable=False),
Column("started_at", timestamp_type),
Column("acquired_by", Unicode(500)),
Column("acquired_until", timestamp_type),
)
Expand All @@ -326,7 +325,8 @@ def get_table_definitions(self) -> MetaData:
metadata,
Column("job_id", Uuid, primary_key=True),
Column("outcome", Enum(JobOutcome), nullable=False),
Column("finished_at", timestamp_type, index=True),
Column("started_at", timestamp_type, index=True),
Column("finished_at", timestamp_type, nullable=False, index=True),
Column("expires_at", timestamp_type, nullable=False, index=True),
Column("exception", LargeBinary),
Column("return_value", LargeBinary),
Expand Down
13 changes: 12 additions & 1 deletion tests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,16 @@ async def test_add_job_wait_result(self, raw_datastore: DataStore) -> None:
assert event.job_id == job_id
assert event.task_id == f"{__name__}:dummy_async_job"
assert event.schedule_id is None
assert event.scheduled_start is None
acquired_at = event.timestamp

event = await receive.receive()
assert isinstance(event, JobReleased)
assert event.job_id == job_id
assert event.task_id == f"{__name__}:dummy_async_job"
assert event.schedule_id is None
assert event.scheduled_start is None
assert event.started_at > acquired_at
assert event.outcome is JobOutcome.success

result = await scheduler.get_job_result(job_id)
Expand Down Expand Up @@ -341,14 +345,18 @@ async def test_run_job(self, raw_datastore: DataStore, success: bool) -> None:
assert event.job_id == job_id
assert event.task_id == f"{__name__}:dummy_async_job"
assert event.schedule_id is None
assert event.scheduled_start is None
assert event.scheduler_id == scheduler.identity
acquired_at = event.timestamp

# The scheduler released the job
event = await receive.receive()
assert isinstance(event, JobReleased)
assert event.job_id == job_id
assert event.task_id == f"{__name__}:dummy_async_job"
assert event.schedule_id is None
assert event.scheduled_start is None
assert event.started_at > acquired_at
assert event.scheduler_id == scheduler.identity

# The scheduler was stopped
Expand Down Expand Up @@ -424,7 +432,8 @@ async def test_scheduled_job_missed_deadline(
self, raw_datastore: DataStore, timezone: ZoneInfo
) -> None:
send, receive = create_memory_object_stream[Event](4)
trigger = DateTrigger(datetime.now(timezone) - timedelta(seconds=1))
one_second_in_past = datetime.now(timezone) - timedelta(seconds=1)
trigger = DateTrigger(one_second_in_past)
async with AsyncScheduler(data_store=raw_datastore) as scheduler:
await scheduler.add_schedule(
dummy_async_job, trigger, misfire_grace_time=0, id="foo"
Expand Down Expand Up @@ -462,6 +471,8 @@ async def test_scheduled_job_missed_deadline(
assert event.job_id == job_id
assert event.task_id == "test_schedulers:dummy_async_job"
assert event.schedule_id == "foo"
assert event.scheduled_start == one_second_in_past
assert event.started_at is None
assert event.outcome is JobOutcome.missed_start_deadline

# The scheduler was stopped
Expand Down
3 changes: 3 additions & 0 deletions tests/test_serializers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import datetime, timezone
from uuid import uuid4

import pytest
Expand All @@ -26,6 +27,8 @@
task_id="task",
schedule_id="schedule",
outcome=JobOutcome.success,
scheduled_start=datetime.now(timezone.utc),
started_at=datetime.now(timezone.utc),
),
id="job_released",
),
Expand Down

0 comments on commit 8feb98f

Please sign in to comment.