Skip to content

Commit

Permalink
Added automatic clean-up of expired job results and finished schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Nov 26, 2023
1 parent 01523ba commit 56274e4
Show file tree
Hide file tree
Showing 10 changed files with 396 additions and 120 deletions.
8 changes: 8 additions & 0 deletions docs/userguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,14 @@ When **distributed** event brokers (that is, other than the default one) are bei
events other than the ones relating to the life cycles of schedulers and workers, will
be sent to all schedulers and workers connected to that event broker.

Clean-up of expired jobs and schedules
======================================

Expired job results and finished schedules are, by default, automatically cleaned up by
each running scheduler on 15 minute intervals (counting from the scheduler's start
time). This can be adjusted (or disabled entirely) through the ``cleanup_interval``
configuration option.

Deployment
==========

Expand Down
9 changes: 9 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ Version history
To find out how to migrate your application from a previous version of
APScheduler, see the :doc:`migration section <migration>`.

**UNRELEASED**

- **BREAKING** Added the ``cleanup()`` scheduler method and a configuration option
(``cleanup_interval``). A corresponding abstract method was added to the ``DataStore``
class. This method purges expired job results and schedules that have exhausted their
triggers and have no more associated jobs running. Previously, schedules were
automatically deleted instantly once their triggers could no longer produce any fire
times.

**4.0.0a4**

- **BREAKING** Renamed any leftover fields named ``executor`` to ``job_executor``
Expand Down
43 changes: 36 additions & 7 deletions src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
create_task_group,
get_cancelled_exc_class,
move_on_after,
sleep,
)
from anyio.abc import TaskGroup, TaskStatus
from attr.validators import instance_of

from .. import JobAdded, SerializationError, TaskLookupError
from .._context import current_async_scheduler, current_job
from .._converters import as_timedelta
from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState, SchedulerRole
from .._events import (
Event,
Expand Down Expand Up @@ -81,6 +83,8 @@ class AsyncScheduler:
:param event_broker: the event broker to use for publishing an subscribing events
:param max_concurrent_jobs: Maximum number of jobs the worker will run at once
:param role: specifies what the scheduler should be doing when running
:param cleanup_interval: interval (as seconds or timedelta) between automatic
calls to :meth:`cleanup` – ``None`` to disable automatic clean-up
"""

data_store: DataStore = attrs.field(
Expand All @@ -97,6 +101,9 @@ class AsyncScheduler:
job_executors: MutableMapping[str, JobExecutor] = attrs.field(
kw_only=True, factory=dict
)
cleanup_interval: timedelta | None = attrs.field(
kw_only=True, converter=as_timedelta, default=timedelta(minutes=15)
)
default_job_executor: str | None = attrs.field(kw_only=True, default=None)
logger: Logger = attrs.field(kw_only=True, default=getLogger(__name__))

Expand Down Expand Up @@ -169,11 +176,23 @@ def _check_initialized(self) -> None:
"than run_until_stopped()."
)

async def _cleanup_loop(self) -> None:
delay = self.cleanup_interval.total_seconds()
assert delay > 0
while self._state in (RunState.starting, RunState.started):
await self.cleanup()
await sleep(delay)

@property
def state(self) -> RunState:
"""The current running state of the scheduler."""
return self._state

async def cleanup(self) -> None:
"""Clean up expired job results and finished schedules."""
await self.data_store.cleanup()
self.logger.info("Cleaned up expired job results and finished schedules")

def subscribe(
self,
callback: Callable[[Event], Any],
Expand Down Expand Up @@ -540,8 +559,9 @@ async def get_job_result(self, job_id: UUID, *, wait: bool = True) -> JobResult:
:param job_id: the ID of the job
:param wait: if ``True``, wait until the job has ended (one way or another),
``False`` to raise an exception if the result is not yet available
:raises JobLookupError: if ``wait=False`` and the job result does not exist in
the data store
:raises JobLookupError: if neither the job or its result exist in the data
store, or the job exists but the result is not ready yet and ``wait=False``
is set
"""
self._check_initialized()
Expand All @@ -552,13 +572,14 @@ def listener(event: JobReleased) -> None:
wait_event.set()

with self.event_broker.subscribe(listener, {JobReleased}):
result = await self.data_store.get_job_result(job_id)
if result:
job_exists = bool(await self.data_store.get_jobs([job_id]))
if result := await self.data_store.get_job_result(job_id):
return result
elif not wait:
raise JobLookupError(job_id)

await wait_event.wait()
if job_exists and wait:
await wait_event.wait()
else:
raise JobLookupError(job_id)

return await self.data_store.get_job_result(job_id)

Expand Down Expand Up @@ -668,6 +689,14 @@ async def run_until_stopped(
self._scheduler_cancel_scope = task_group.cancel_scope
exit_stack.callback(setattr, self, "_scheduler_cancel_scope", None)

# Start periodic cleanups
if self.cleanup_interval:
task_group.start_soon(self._cleanup_loop)
self.logger.debug(
"Started internal cleanup loop with interval: %s",
self.cleanup_interval,
)

# Start processing due schedules, if configured to do so
if self.role in (SchedulerRole.scheduler, SchedulerRole.both):
await task_group.start(self._process_schedules)
Expand Down
6 changes: 6 additions & 0 deletions src/apscheduler/_schedulers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
*,
identity: str | None = None,
role: SchedulerRole = SchedulerRole.both,
cleanup_interval: float | timedelta | None = None,
job_executors: MutableMapping[str, JobExecutor] | None = None,
default_job_executor: str | None = None,
logger: Logger | None = None,
Expand All @@ -55,6 +56,7 @@ def __init__(
identity=identity,
role=role,
job_executors=job_executors,
cleanup_interval=cleanup_interval,
default_job_executor=default_job_executor,
logger=logger or logging.getLogger(__name__),
**kwargs,
Expand Down Expand Up @@ -129,6 +131,10 @@ def _ensure_services_ready(self, exit_stack: ExitStack | None = None) -> None:
self._portal.wrap_async_context_manager(self._async_scheduler)
)

def cleanup(self) -> None:
self._ensure_services_ready()
return self._portal.call(self._async_scheduler.cleanup)

def subscribe(
self,
callback: Callable[[Event], Any],
Expand Down
7 changes: 7 additions & 0 deletions src/apscheduler/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,13 @@ async def get_job_result(self, job_id: UUID) -> JobResult | None:
:return: the result, or ``None`` if the result was not found
"""

@abstractmethod
async def cleanup(self) -> None:
"""
Purge expired job results and finished schedules that have no running jobs
associated with them.
"""


class JobExecutor(metaclass=ABCMeta):
async def start(self, exit_stack: AsyncExitStack) -> None:
Expand Down
76 changes: 54 additions & 22 deletions src/apscheduler/datastores/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ class MemoryDataStore(BaseDataStore):
_jobs_by_task_id: dict[str, set[JobState]] = attrs.Factory(
partial(defaultdict, set)
)
_jobs_by_schedule_id: dict[str, set[JobState]] = attrs.Factory(
partial(defaultdict, set)
)
_job_results: dict[UUID, JobResult] = attrs.Factory(dict)

def _find_schedule_index(self, state: ScheduleState) -> int | None:
Expand Down Expand Up @@ -222,29 +225,22 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
async def release_schedules(
self, scheduler_id: str, schedules: list[Schedule]
) -> None:
# Send update events for schedules that have a next time
finished_schedule_ids: list[str] = []
# Send update events for schedules
for s in schedules:
if s.next_fire_time is not None:
# Remove the schedule
schedule_state = self._schedules_by_id.get(s.id)
index = self._find_schedule_index(schedule_state)
del self._schedules[index]

# Re-add the schedule to its new position
schedule_state.next_fire_time = s.next_fire_time
schedule_state.acquired_by = None
schedule_state.acquired_until = None
insort_right(self._schedules, schedule_state)
event = ScheduleUpdated(
schedule_id=s.id, task_id=s.task_id, next_fire_time=s.next_fire_time
)
await self._event_broker.publish(event)
else:
finished_schedule_ids.append(s.id)
# Remove the schedule
schedule_state = self._schedules_by_id.get(s.id)
index = self._find_schedule_index(schedule_state)
del self._schedules[index]

# Remove schedules that didn't get a new next fire time
await self.remove_schedules(finished_schedule_ids, finished=True)
# Re-add the schedule to its new position
schedule_state.next_fire_time = s.next_fire_time
schedule_state.acquired_by = None
schedule_state.acquired_until = None
insort_right(self._schedules, schedule_state)
event = ScheduleUpdated(
schedule_id=s.id, task_id=s.task_id, next_fire_time=s.next_fire_time
)
await self._event_broker.publish(event)

async def get_next_schedule_run_time(self) -> datetime | None:
return self._schedules[0].next_fire_time if self._schedules else None
Expand All @@ -254,6 +250,8 @@ async def add_job(self, job: Job) -> None:
self._jobs.append(state)
self._jobs_by_id[job.id] = state
self._jobs_by_task_id[job.task_id].add(state)
if job.schedule_id is not None:
self._jobs_by_schedule_id[job.schedule_id].add(state)

event = JobAdded(
job_id=job.id,
Expand Down Expand Up @@ -324,9 +322,43 @@ async def release_job(

# Delete the job
job_state = self._jobs_by_id.pop(result.job_id)
self._jobs_by_task_id[task_id].remove(job_state)

# Remove the job from the jobs belonging to its task
task_jobs = self._jobs_by_task_id[task_id]
task_jobs.remove(job_state)
if not task_jobs:
del self._jobs_by_task_id[task_id]

# If this was a scheduled job, remove the job from the set of jobs belonging to
# this schedule
if job_state.job.schedule_id:
schedule_jobs = self._jobs_by_schedule_id[job_state.job.schedule_id]
schedule_jobs.remove(job_state)
if not schedule_jobs:
del self._jobs_by_schedule_id[job_state.job.schedule_id]

index = self._find_job_index(job_state)
del self._jobs[index]

async def get_job_result(self, job_id: UUID) -> JobResult | None:
return self._job_results.pop(job_id, None)

async def cleanup(self) -> None:
# Clean up expired job results
now = datetime.now(timezone.utc)
expired_job_ids = [
result.job_id
for result in self._job_results.values()
if result.expires_at <= now
]
for job_id in expired_job_ids:
del self._job_results[job_id]

# Clean up finished schedules that have no running jobs
finished_schedule_ids = [
schedule_id
for schedule_id, state in self._schedules_by_id.items()
if state.next_fire_time is None
and schedule_id not in self._jobs_by_schedule_id
]
await self.remove_schedules(finished_schedule_ids, finished=True)
Loading

0 comments on commit 56274e4

Please sign in to comment.