Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job-runner): track job status #6332

Merged
merged 17 commits into from
Oct 1, 2024
Merged
8 changes: 7 additions & 1 deletion snuba/cli/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import click

from snuba.manual_jobs import JobSpec
from snuba.manual_jobs import JobSpec, get_job_status
from snuba.manual_jobs.job_loader import JobLoader
from snuba.manual_jobs.manifest_reader import _ManifestReader

Expand Down Expand Up @@ -49,3 +49,9 @@ def run(*, job_type: str, job_id: str, dry_run: bool, pairs: Tuple[str, ...]) ->

job_to_run = JobLoader.get_job_instance(job_spec, dry_run)
job_to_run.execute()


@jobs.command()
@click.option("--job_id")
def status(*, job_id: str) -> str:
return get_job_status(job_id)
29 changes: 27 additions & 2 deletions snuba/manual_jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import logging
import os
import typing
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, MutableMapping, Optional, cast

from snuba.redis import RedisClientKey, get_redis_client
from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory

logger = logging.getLogger("snuba_init")
redis_client = get_redis_client(RedisClientKey.JOB)
job_status_hash = "snuba-job-status-hash"

RUNNING = "running"
FINISHED = "finished"
NOT_STARTED = "not started"


@dataclass
class JobSpec:
job_id: str
job_type: str
params: Optional[MutableMapping[Any, Any]]
params: Optional[MutableMapping[Any, Any]] = None


class Job(ABC, metaclass=RegisteredClass):
Expand All @@ -23,9 +31,15 @@ def __init__(self, job_spec: JobSpec, dry_run: bool) -> None:
if job_spec.params:
for k, v in job_spec.params.items():
setattr(self, k, v)
self._set_status(NOT_STARTED)

@abstractmethod
def execute(self) -> None:
self._set_status(RUNNING)
self._execute()
self._set_status(FINISHED)
onewland marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
def _execute(self) -> None:
pass

@classmethod
Expand All @@ -36,6 +50,17 @@ def config_key(cls) -> str:
def get_from_name(cls, name: str) -> "Job":
return cast("Job", cls.class_from_name(name))

def _set_status(self, status: str) -> None:
redis_client.hset(job_status_hash, self.job_spec.job_id, status)


def get_job_status(job_id: str) -> str:
status = redis_client.hget(job_status_hash, job_id)
if status is None:
raise KeyError
else:
return typing.cast(str, status.decode("utf-8"))


import_submodules_in_directory(
os.path.dirname(os.path.realpath(__file__)), "snuba.manual_jobs"
Expand Down
2 changes: 1 addition & 1 deletion snuba/manual_jobs/toy_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def _build_query(self) -> str:
else:
return "not dry run query"

def execute(self) -> None:
def _execute(self) -> None:
logger.info(
"executing job "
+ self.job_spec.job_id
Expand Down
4 changes: 4 additions & 0 deletions snuba/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class RedisClientKey(Enum):
DLQ = "dlq"
OPTIMIZE = "optimize"
ADMIN_AUTH = "admin_auth"
JOB = "job"


_redis_clients: Mapping[RedisClientKey, RedisClientType] = {
Expand Down Expand Up @@ -141,6 +142,9 @@ class RedisClientKey(Enum):
RedisClientKey.ADMIN_AUTH: _initialize_specialized_redis_cluster(
settings.REDIS_CLUSTERS["admin_auth"]
),
RedisClientKey.JOB: _initialize_specialized_redis_cluster(
settings.REDIS_CLUSTERS["job"]
),
}


Expand Down
2 changes: 2 additions & 0 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class RedisClusters(TypedDict):
dlq: RedisClusterConfig | None
optimize: RedisClusterConfig | None
admin_auth: RedisClusterConfig | None
job: RedisClusterConfig | None


REDIS_CLUSTERS: RedisClusters = {
Expand All @@ -186,6 +187,7 @@ class RedisClusters(TypedDict):
"dlq": None,
"optimize": None,
"admin_auth": None,
"job": None,
}

# Query Recording Options
Expand Down
1 change: 1 addition & 0 deletions snuba/settings/settings_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
(7, "dlq"),
(8, "optimize"),
(9, "admin_auth"),
(10, "job"),
]
}
VALIDATE_DATASET_YAMLS_ON_STARTUP = True
19 changes: 18 additions & 1 deletion tests/cli/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from click.testing import CliRunner

from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest
from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest, status


def test_cmd_line_valid() -> None:
Expand Down Expand Up @@ -69,3 +70,19 @@ def test_json_valid() -> None:
],
)
assert result.exit_code == 0


@pytest.mark.redis_db
def test_jobs_status() -> None:
runner = CliRunner()
runner.invoke(
run_from_manifest,
[
"--json_manifest",
"job_manifest.json",
"--job_id",
"abc1234",
],
)
result = runner.invoke(status, ["--job_id", "abc1234"])
assert result.exit_code == 0
27 changes: 27 additions & 0 deletions tests/jobs/test_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pytest

from snuba.manual_jobs import FINISHED, NOT_STARTED, JobSpec, get_job_status
from snuba.manual_jobs.toy_job import ToyJob

JOB_ID = "abc1234"
test_job_spec = JobSpec(job_id=JOB_ID, job_type="ToyJob")

# @pytest.fixture(autouse=True)
onewland marked this conversation as resolved.
Show resolved Hide resolved
# def setup_test_job() -> Job:
# return ToyJob(test_job_spec, dry_run=True)


@pytest.mark.redis_db
def test_job_status_changes_to_finished() -> None:
# test_job = setup_test_job
test_job = ToyJob(test_job_spec, dry_run=True)
assert get_job_status(JOB_ID) == NOT_STARTED
test_job.execute()
assert get_job_status(JOB_ID) == FINISHED


@pytest.mark.redis_db
def test_job_status_with_invalid_job_id() -> None:
assert get_job_status(JOB_ID) == NOT_STARTED
with pytest.raises(KeyError):
get_job_status("invalid_job_id")
Loading