Skip to content

Commit

Permalink
Added scheduler methods for job export/import
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Nov 24, 2024
1 parent 40431e5 commit 50f67c8
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ APScheduler, see the :doc:`migration section <migration>`.
- Dropped support for Python 3.6 and 3.7
- Added support for ``ZoneInfo`` time zones and deprecated support for pytz time zones
- Added ``CalendarIntervalTrigger``, backported from the 4.x series
- Added the ability to export and import jobs via ``scheduler.export_jobs()`` and
``scheduler.import_jobs()``, respectively
- Removed the dependency on ``six``
- Changed ``ProcessPoolExecutor`` to spawn new subprocesses from scratch instead of
forking on all platform
Expand Down
105 changes: 104 additions & 1 deletion src/apscheduler/schedulers/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import sys
import warnings
from abc import ABCMeta, abstractmethod
from collections.abc import MutableMapping
from collections.abc import Mapping, MutableMapping
from contextlib import ExitStack
from datetime import datetime, timedelta
from importlib.metadata import entry_points
from logging import getLogger
Expand Down Expand Up @@ -44,6 +45,8 @@
asint,
astimezone,
maybe_ref,
obj_to_ref,
ref_to_obj,
undefined,
)

Expand Down Expand Up @@ -781,6 +784,106 @@ def print_jobs(self, jobstore=None, out=None):
else:
print(" No scheduled jobs", file=out)

def export_jobs(self, outfile, jobstore=None):
"""
Export stored jobs as JSON.
:param outfile: either a file object opened in text write mode ("w"), or a path
to the target file
:param jobstore: alias of the job store to export jobs from (if omitted, export
from all configured job stores)
"""
import json
import pickle
from base64 import b64encode

from apscheduler import version

if self.state == STATE_STOPPED:
raise RuntimeError(
"the scheduler must have been started for job export to work"
)

def encode_with_pickle(obj):
return b64encode(pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)).decode("ascii")

def json_default(obj):
if hasattr(obj, "__getstate__") and hasattr(obj, "__setstate__"):
state = obj.__getstate__()
if isinstance(state, Mapping):
return {
"__apscheduler_class__": obj_to_ref(obj.__class__),
"__apscheduler_state__": state,
}

return {"__apscheduler_pickle__": encode_with_pickle(obj)}

with self._jobstores_lock:
all_jobs = [
job
for alias, store in self._jobstores.items()
for job in store.get_all_jobs()
if jobstore in (None, alias)
]

with ExitStack() as stack:
if not hasattr(outfile, "write"):
outfile = stack.enter_context(open(outfile, "w"))

json.dump(
{
"version": 1,
"scheduler_version": version,
"jobs": [job.__getstate__() for job in all_jobs],
},
outfile,
default=json_default,
)

def import_jobs(self, infile, jobstore="default"):
"""
Import jobs previously exported via :meth:`export_jobs.
:param infile: either a file object opened in text read mode ("r") or a path to
a JSON file containing previously exported jobs
:param jobstore: the alias of the job store to import the jobs to
"""
import json
import pickle
from base64 import b64decode

def json_object_hook(dct):
if pickle_data := dct.get("__apscheduler_pickle__"):
return pickle.loads(b64decode(pickle_data))

if obj_class := dct.get("__apscheduler_class__"):
if obj_state := dct.get("__apscheduler_state__"):
obj_class = ref_to_obj(obj_class)
obj = object.__new__(obj_class)
obj.__setstate__(obj_state)
return obj

return dct

jobstore = self._jobstores[jobstore]
with ExitStack() as stack:
if not hasattr(infile, "read"):
infile = stack.enter_context(open(infile))

data = json.load(infile, object_hook=json_object_hook)
if not isinstance(data, dict):
raise ValueError()

if (version := data.get("version", None)) != 1:
raise ValueError(f"unrecognized version: {version}")

for job_state in data["jobs"]:
job = object.__new__(Job)
job.__setstate__(job_state)
jobstore.add_job(job)

@abstractmethod
def wakeup(self):
"""
Expand Down
76 changes: 76 additions & 0 deletions tests/test_schedulers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
import logging
import pickle
from copy import deepcopy
Expand All @@ -10,6 +11,7 @@

import pytest

from apscheduler import version
from apscheduler.events import (
EVENT_ALL,
EVENT_ALL_JOBS_REMOVED,
Expand Down Expand Up @@ -735,6 +737,80 @@ def test_print_jobs(self, scheduler, start_scheduler, jobstore):
"""
)

def test_export_import_jobs(self, scheduler):
scheduler.start(paused=True)
scheduler.add_job(
print,
"cron",
name="cronjob",
args=["foo", "bar"],
kwargs={"end": ""},
hour="*/3",
)
scheduler.add_job(
print,
"date",
name="datejob",
args=("foo", "bar"),
kwargs={"end": ""},
run_date="2024-11-24 14:04:00+02:00",
)
scheduler.add_job(
print,
"interval",
name="intervaljob",
args=("foo", "bar"),
kwargs={"end": ""},
weeks=3,
start_date="2024-11-24 14:04:00+02:00",
)
buffer = StringIO()
scheduler.export_jobs(buffer)
data = json.loads(buffer.getvalue())
assert isinstance(data, dict)
assert data["version"] == 1
assert data["scheduler_version"] == version

new_scheduler = DummyScheduler()
new_scheduler.start(paused=True)
buffer.seek(0)
new_scheduler.import_jobs(buffer)
jobs = new_scheduler.get_jobs()
jobs.sort(key=lambda job: job.name)

job = jobs.pop(0)
assert job.name == "cronjob"
assert job.args == ["foo", "bar"]
assert job.kwargs == {"end": ""}
assert (
repr(job.trigger) == "<CronTrigger (hour='*/3', timezone='Europe/Berlin')>"
)

job = jobs.pop(0)
assert job.name == "datejob"
assert job.args == ["foo", "bar"]
assert job.kwargs == {"end": ""}
assert repr(job.trigger) == (
"<DateTrigger (run_date='2024-11-24 14:04:00 UTC+02:00')>"
)

job = jobs.pop(0)
assert job.name == "intervaljob"
assert job.args == ["foo", "bar"]
assert job.kwargs == {"end": ""}
assert repr(job.trigger) == (
"<IntervalTrigger (interval=datetime.timedelta(days=21), "
"start_date='2024-11-24 14:04:00 UTC+02:00', "
"timezone='Europe/Berlin')>"
)

def test_export_jobs_scheduler_not_started(self, scheduler):
with pytest.raises(
RuntimeError,
match="the scheduler must have been started for job export to work",
):
scheduler.export_jobs(StringIO())

@pytest.mark.parametrize(
"config",
[
Expand Down

0 comments on commit 50f67c8

Please sign in to comment.