diff --git a/snuba/cli/jobs.py b/snuba/cli/jobs.py index f85b8604e3..8f5720bdd2 100644 --- a/snuba/cli/jobs.py +++ b/snuba/cli/jobs.py @@ -2,7 +2,7 @@ import click -from snuba.manual_jobs import JobSpec +from snuba.manual_jobs import JobSpec, get_job_status from snuba.manual_jobs.job_loader import JobLoader from snuba.manual_jobs.manifest_reader import _ManifestReader @@ -49,3 +49,9 @@ def run(*, job_type: str, job_id: str, dry_run: bool, pairs: Tuple[str, ...]) -> job_to_run = JobLoader.get_job_instance(job_spec, dry_run) job_to_run.execute() + + +@jobs.command() +@click.option("--job_id") +def status(*, job_id: str) -> str: + return get_job_status(job_id) diff --git a/snuba/manual_jobs/__init__.py b/snuba/manual_jobs/__init__.py index a4d52e3e89..9191ae0ba5 100644 --- a/snuba/manual_jobs/__init__.py +++ b/snuba/manual_jobs/__init__.py @@ -1,19 +1,27 @@ import logging import os +import typing from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any, MutableMapping, Optional, cast +from snuba.redis import RedisClientKey, get_redis_client from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory logger = logging.getLogger("snuba_init") +redis_client = get_redis_client(RedisClientKey.JOB) +job_status_hash = "snuba-job-status-hash" + +RUNNING = "running" +FINISHED = "finished" +NOT_STARTED = "not started" @dataclass class JobSpec: job_id: str job_type: str - params: Optional[MutableMapping[Any, Any]] + params: Optional[MutableMapping[Any, Any]] = None class Job(ABC, metaclass=RegisteredClass): @@ -23,9 +31,15 @@ def __init__(self, job_spec: JobSpec, dry_run: bool) -> None: if job_spec.params: for k, v in job_spec.params.items(): setattr(self, k, v) + self._set_status(NOT_STARTED) - @abstractmethod def execute(self) -> None: + self._set_status(RUNNING) + self._execute() + self._set_status(FINISHED) + + @abstractmethod + def _execute(self) -> None: pass @classmethod @@ -36,6 +50,17 @@ def config_key(cls) -> str: def get_from_name(cls, name: str) -> "Job": return cast("Job", cls.class_from_name(name)) + def _set_status(self, status: str) -> None: + redis_client.hset(job_status_hash, self.job_spec.job_id, status) + + +def get_job_status(job_id: str) -> str: + status = redis_client.hget(job_status_hash, job_id) + if status is None: + raise KeyError + else: + return typing.cast(str, status.decode("utf-8")) + import_submodules_in_directory( os.path.dirname(os.path.realpath(__file__)), "snuba.manual_jobs" diff --git a/snuba/manual_jobs/toy_job.py b/snuba/manual_jobs/toy_job.py index a70c829345..65caa5595b 100644 --- a/snuba/manual_jobs/toy_job.py +++ b/snuba/manual_jobs/toy_job.py @@ -15,7 +15,7 @@ def _build_query(self) -> str: else: return "not dry run query" - def execute(self) -> None: + def _execute(self) -> None: logger.info( "executing job " + self.job_spec.job_id diff --git a/snuba/redis.py b/snuba/redis.py index 575948018e..771e4aea69 100644 --- a/snuba/redis.py +++ b/snuba/redis.py @@ -114,6 +114,7 @@ class RedisClientKey(Enum): DLQ = "dlq" OPTIMIZE = "optimize" ADMIN_AUTH = "admin_auth" + JOB = "job" _redis_clients: Mapping[RedisClientKey, RedisClientType] = { @@ -141,6 +142,9 @@ class RedisClientKey(Enum): RedisClientKey.ADMIN_AUTH: _initialize_specialized_redis_cluster( settings.REDIS_CLUSTERS["admin_auth"] ), + RedisClientKey.JOB: _initialize_specialized_redis_cluster( + settings.REDIS_CLUSTERS["job"] + ), } diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index f681b4dd99..063219ca84 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -175,6 +175,7 @@ class RedisClusters(TypedDict): dlq: RedisClusterConfig | None optimize: RedisClusterConfig | None admin_auth: RedisClusterConfig | None + job: RedisClusterConfig | None REDIS_CLUSTERS: RedisClusters = { @@ -186,6 +187,7 @@ class RedisClusters(TypedDict): "dlq": None, "optimize": None, "admin_auth": None, + "job": None, } # Query Recording Options diff --git a/snuba/settings/settings_test.py b/snuba/settings/settings_test.py index c39978f00e..c015219b01 100644 --- a/snuba/settings/settings_test.py +++ b/snuba/settings/settings_test.py @@ -60,6 +60,7 @@ (7, "dlq"), (8, "optimize"), (9, "admin_auth"), + (10, "job"), ] } VALIDATE_DATASET_YAMLS_ON_STARTUP = True diff --git a/tests/cli/test_jobs.py b/tests/cli/test_jobs.py index 841fe1f7c5..6cd04641f5 100644 --- a/tests/cli/test_jobs.py +++ b/tests/cli/test_jobs.py @@ -1,6 +1,7 @@ +import pytest from click.testing import CliRunner -from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest +from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest, status def test_cmd_line_valid() -> None: @@ -69,3 +70,19 @@ def test_json_valid() -> None: ], ) assert result.exit_code == 0 + + +@pytest.mark.redis_db +def test_jobs_status() -> None: + runner = CliRunner() + runner.invoke( + run_from_manifest, + [ + "--json_manifest", + "job_manifest.json", + "--job_id", + "abc1234", + ], + ) + result = runner.invoke(status, ["--job_id", "abc1234"]) + assert result.exit_code == 0 diff --git a/tests/jobs/test_job.py b/tests/jobs/test_job.py new file mode 100644 index 0000000000..4476cface2 --- /dev/null +++ b/tests/jobs/test_job.py @@ -0,0 +1,27 @@ +import pytest + +from snuba.manual_jobs import FINISHED, NOT_STARTED, JobSpec, get_job_status +from snuba.manual_jobs.toy_job import ToyJob + +JOB_ID = "abc1234" +test_job_spec = JobSpec(job_id=JOB_ID, job_type="ToyJob") + +# @pytest.fixture(autouse=True) +# def setup_test_job() -> Job: +# return ToyJob(test_job_spec, dry_run=True) + + +@pytest.mark.redis_db +def test_job_status_changes_to_finished() -> None: + # test_job = setup_test_job + test_job = ToyJob(test_job_spec, dry_run=True) + assert get_job_status(JOB_ID) == NOT_STARTED + test_job.execute() + assert get_job_status(JOB_ID) == FINISHED + + +@pytest.mark.redis_db +def test_job_status_with_invalid_job_id() -> None: + assert get_job_status(JOB_ID) == NOT_STARTED + with pytest.raises(KeyError): + get_job_status("invalid_job_id")