Skip to content

Commit

Permalink
scheduler: implement custom RunScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
yashlamba committed May 24, 2024
1 parent c53a8ee commit 19abf92
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 1 deletion.
11 changes: 10 additions & 1 deletion invenio_jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from inspect import signature

from celery import current_app as current_celery_app
from celery.schedules import crontab
from invenio_accounts.models import User
from invenio_db import db
from sqlalchemy.dialects import postgresql
Expand Down Expand Up @@ -40,12 +41,20 @@ class Job(db.Model, Timestamp):
default_args = db.Column(JSON, default=lambda: dict(), nullable=True)
schedule = db.Column(JSON, nullable=True)

last_run_at = db.Column(db.DateTime, nullable=True)

# TODO: See if we move this to an API class
@property
def last_run(self):
"""Last run of the job."""
return self.runs.order_by(Run.created.desc()).first()

@property
def parsed_schedule(self):
# TODO For testing purpose, will be updated
if "crontab" in self.schedule:
return crontab(**self.schedule["crontab"])


class RunStatusEnum(enum.Enum):
"""Enumeration of a run's possible states."""
Expand All @@ -70,7 +79,7 @@ class Run(db.Model, Timestamp):
started_by = db.relationship(User)

started_at = db.Column(db.DateTime, nullable=True)
finished_at = db.Column(db.DateTime, nullable=False)
finished_at = db.Column(db.DateTime, nullable=True)

status = db.Column(
ChoiceType(RunStatusEnum, impl=db.String(1)),
Expand Down
120 changes: 120 additions & 0 deletions invenio_jobs/services/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-

Check failure on line 1 in invenio_jobs/services/scheduler.py

View workflow job for this annotation

GitHub Actions / Tests (3.9, postgresql14, opensearch2, 18.x)

pydocstyle-check /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:1 at module level: D100: Missing docstring in public module /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:22 in public class `JobEntry`: D101: Missing docstring in public class /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:27 in public method `__init__`: D107: Missing docstring in __init__ /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:56 in public method `from_job`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:69 in public class `RunScheduler`: D101: Missing docstring in public class /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:78 in public method `schedule`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:81 in public method `setup_schedule`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:84 in public method `reserve`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:88 in public method `apply_entry`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:101 in public method `sync`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:111 in public method `create_run`: D102: Missing docstring in public method

Check failure on line 1 in invenio_jobs/services/scheduler.py

View workflow job for this annotation

GitHub Actions / Tests (3.9, postgresql14, opensearch2, 20.x)

pydocstyle-check /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:1 at module level: D100: Missing docstring in public module /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:22 in public class `JobEntry`: D101: Missing docstring in public class /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:27 in public method `__init__`: D107: Missing docstring in __init__ /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:56 in public method `from_job`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:69 in public class `RunScheduler`: D101: Missing docstring in public class /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:78 in public method `schedule`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:81 in public method `setup_schedule`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:84 in public method `reserve`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:88 in public method `apply_entry`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:101 in public method `sync`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:111 in public method `create_run`: D102: Missing docstring in public method

Check failure on line 1 in invenio_jobs/services/scheduler.py

View workflow job for this annotation

GitHub Actions / Tests (3.12, postgresql14, opensearch2, 20.x)

pydocstyle-check /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:1 at module level: D100: Missing docstring in public module /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:22 in public class `JobEntry`: D101: Missing docstring in public class /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:27 in public method `__init__`: D107: Missing docstring in __init__ /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:56 in public method `from_job`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:69 in public class `RunScheduler`: D101: Missing docstring in public class /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:78 in public method `schedule`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:81 in public method `setup_schedule`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:84 in public method `reserve`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:88 in public method `apply_entry`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:101 in public method `sync`: D102: Missing docstring in public method /home/runner/work/invenio-jobs/invenio-jobs/invenio_jobs/services/scheduler.py:111 in public method `create_run`: D102: Missing docstring in public method
#
# Copyright (C) 2024 CERN.
#
# Invenio-Jobs is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

import traceback
from typing import Any

from celery.beat import ScheduleEntry, Scheduler, debug, error, info
from celery.schedules import crontab
from invenio_access.permissions import system_identity
from invenio_app.factory import create_api
from invenio_db import db

from invenio_jobs.models import Job, Run

app = create_api()


class JobEntry(ScheduleEntry):

#: Job ID
job_id = None

def __init__(
self,
job_id=None,
name=None,
task=None,
last_run_at=None,
total_run_count=None,
schedule=None,
args=...,
kwargs=None,
options=None,
relative=False,
app=None,
):
self.job_id = job_id
super().__init__(
name,
task,
last_run_at,
total_run_count,
schedule,
args,
kwargs,
options,
relative,
app,
)

@classmethod
def from_job(cls, job):
return cls(
job_id=job.id,
name=job.title,
schedule=job.parsed_schedule,
args=job.default_args,
task=job.task,
kwargs={},
options={},
last_run_at=job.last_run_at,
)


class RunScheduler(Scheduler):
Entry = JobEntry
entries = {}

def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize the database scheduler."""
Scheduler.__init__(self, *args, **kwargs)

@property
def schedule(self):
return self.entries

def setup_schedule(self):
self.sync()

def reserve(self, entry):
new_entry = self.schedule[entry.job_id] = next(entry)
return new_entry

def apply_entry(self, entry, producer=None):
info("Scheduler: Sending due task %s (%s)", entry.name, entry.task)
try:
self.create_run(entry)
result = self.apply_async(entry, producer=producer, advance=False)
except Exception as exc: # pylint: disable=broad-except
error("Message Error: %s\n%s", exc, traceback.format_stack(), exc_info=True)
else:
if result and hasattr(result, "id"):
debug("%s sent. id->%s", entry.task, result.id)
else:
debug("%s sent.", entry.task)

def sync(self):
with app.app_context():
for job_id, entry in self.schedule.items():
job = Job.query.filter_by(id=job_id).one()
job.last_run_at = (entry.last_run_at,)
db.session.commit()
jobs = Job.query.filter(Job.active == True).all()
for job in jobs:
self.entries[job.id] = JobEntry.from_job(job)

def create_run(self, entry):
with app.app_context():
run = Run()
job = Job.query.filter_by(id=entry.job_id).one()
run.job = job
# run.started_by = started_by or "system"
run.args = entry.args
# run.queue = entry.default_queue # TODO Not working/considered for now
# run.commit()
db.session.commit()

0 comments on commit 19abf92

Please sign in to comment.