From c0270dfc90b0145a103aae3d0f49e2cdc81303f3 Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Wed, 18 Sep 2024 16:10:31 -0700 Subject: [PATCH 01/14] c --- snuba/cli/jobs.py | 8 +++++++- snuba/manual_jobs/__init__.py | 29 +++++++++++++++++++++++++++-- snuba/manual_jobs/toy_job.py | 2 +- snuba/redis.py | 4 ++++ snuba/settings/__init__.py | 2 ++ snuba/settings/settings_test.py | 1 + tests/cli/test_jobs.py | 19 ++++++++++++++++++- tests/jobs/test_job.py | 27 +++++++++++++++++++++++++++ 8 files changed, 87 insertions(+), 5 deletions(-) create mode 100644 tests/jobs/test_job.py diff --git a/snuba/cli/jobs.py b/snuba/cli/jobs.py index f85b8604e3..8f5720bdd2 100644 --- a/snuba/cli/jobs.py +++ b/snuba/cli/jobs.py @@ -2,7 +2,7 @@ import click -from snuba.manual_jobs import JobSpec +from snuba.manual_jobs import JobSpec, get_job_status from snuba.manual_jobs.job_loader import JobLoader from snuba.manual_jobs.manifest_reader import _ManifestReader @@ -49,3 +49,9 @@ def run(*, job_type: str, job_id: str, dry_run: bool, pairs: Tuple[str, ...]) -> job_to_run = JobLoader.get_job_instance(job_spec, dry_run) job_to_run.execute() + + +@jobs.command() +@click.option("--job_id") +def status(*, job_id: str) -> str: + return get_job_status(job_id) diff --git a/snuba/manual_jobs/__init__.py b/snuba/manual_jobs/__init__.py index a4d52e3e89..9191ae0ba5 100644 --- a/snuba/manual_jobs/__init__.py +++ b/snuba/manual_jobs/__init__.py @@ -1,19 +1,27 @@ import logging import os +import typing from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any, MutableMapping, Optional, cast +from snuba.redis import RedisClientKey, get_redis_client from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory logger = logging.getLogger("snuba_init") +redis_client = get_redis_client(RedisClientKey.JOB) +job_status_hash = "snuba-job-status-hash" + +RUNNING = "running" +FINISHED = "finished" +NOT_STARTED = "not started" @dataclass class JobSpec: job_id: str job_type: str - params: Optional[MutableMapping[Any, Any]] + params: Optional[MutableMapping[Any, Any]] = None class Job(ABC, metaclass=RegisteredClass): @@ -23,9 +31,15 @@ def __init__(self, job_spec: JobSpec, dry_run: bool) -> None: if job_spec.params: for k, v in job_spec.params.items(): setattr(self, k, v) + self._set_status(NOT_STARTED) - @abstractmethod def execute(self) -> None: + self._set_status(RUNNING) + self._execute() + self._set_status(FINISHED) + + @abstractmethod + def _execute(self) -> None: pass @classmethod @@ -36,6 +50,17 @@ def config_key(cls) -> str: def get_from_name(cls, name: str) -> "Job": return cast("Job", cls.class_from_name(name)) + def _set_status(self, status: str) -> None: + redis_client.hset(job_status_hash, self.job_spec.job_id, status) + + +def get_job_status(job_id: str) -> str: + status = redis_client.hget(job_status_hash, job_id) + if status is None: + raise KeyError + else: + return typing.cast(str, status.decode("utf-8")) + import_submodules_in_directory( os.path.dirname(os.path.realpath(__file__)), "snuba.manual_jobs" diff --git a/snuba/manual_jobs/toy_job.py b/snuba/manual_jobs/toy_job.py index a70c829345..65caa5595b 100644 --- a/snuba/manual_jobs/toy_job.py +++ b/snuba/manual_jobs/toy_job.py @@ -15,7 +15,7 @@ def _build_query(self) -> str: else: return "not dry run query" - def execute(self) -> None: + def _execute(self) -> None: logger.info( "executing job " + self.job_spec.job_id diff --git a/snuba/redis.py b/snuba/redis.py index 575948018e..771e4aea69 100644 --- a/snuba/redis.py +++ b/snuba/redis.py @@ -114,6 +114,7 @@ class RedisClientKey(Enum): DLQ = "dlq" OPTIMIZE = "optimize" ADMIN_AUTH = "admin_auth" + JOB = "job" _redis_clients: Mapping[RedisClientKey, RedisClientType] = { @@ -141,6 +142,9 @@ class RedisClientKey(Enum): RedisClientKey.ADMIN_AUTH: _initialize_specialized_redis_cluster( settings.REDIS_CLUSTERS["admin_auth"] ), + RedisClientKey.JOB: _initialize_specialized_redis_cluster( + settings.REDIS_CLUSTERS["job"] + ), } diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index f681b4dd99..063219ca84 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -175,6 +175,7 @@ class RedisClusters(TypedDict): dlq: RedisClusterConfig | None optimize: RedisClusterConfig | None admin_auth: RedisClusterConfig | None + job: RedisClusterConfig | None REDIS_CLUSTERS: RedisClusters = { @@ -186,6 +187,7 @@ class RedisClusters(TypedDict): "dlq": None, "optimize": None, "admin_auth": None, + "job": None, } # Query Recording Options diff --git a/snuba/settings/settings_test.py b/snuba/settings/settings_test.py index c39978f00e..c015219b01 100644 --- a/snuba/settings/settings_test.py +++ b/snuba/settings/settings_test.py @@ -60,6 +60,7 @@ (7, "dlq"), (8, "optimize"), (9, "admin_auth"), + (10, "job"), ] } VALIDATE_DATASET_YAMLS_ON_STARTUP = True diff --git a/tests/cli/test_jobs.py b/tests/cli/test_jobs.py index 841fe1f7c5..6cd04641f5 100644 --- a/tests/cli/test_jobs.py +++ b/tests/cli/test_jobs.py @@ -1,6 +1,7 @@ +import pytest from click.testing import CliRunner -from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest +from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest, status def test_cmd_line_valid() -> None: @@ -69,3 +70,19 @@ def test_json_valid() -> None: ], ) assert result.exit_code == 0 + + +@pytest.mark.redis_db +def test_jobs_status() -> None: + runner = CliRunner() + runner.invoke( + run_from_manifest, + [ + "--json_manifest", + "job_manifest.json", + "--job_id", + "abc1234", + ], + ) + result = runner.invoke(status, ["--job_id", "abc1234"]) + assert result.exit_code == 0 diff --git a/tests/jobs/test_job.py b/tests/jobs/test_job.py new file mode 100644 index 0000000000..4476cface2 --- /dev/null +++ b/tests/jobs/test_job.py @@ -0,0 +1,27 @@ +import pytest + +from snuba.manual_jobs import FINISHED, NOT_STARTED, JobSpec, get_job_status +from snuba.manual_jobs.toy_job import ToyJob + +JOB_ID = "abc1234" +test_job_spec = JobSpec(job_id=JOB_ID, job_type="ToyJob") + +# @pytest.fixture(autouse=True) +# def setup_test_job() -> Job: +# return ToyJob(test_job_spec, dry_run=True) + + +@pytest.mark.redis_db +def test_job_status_changes_to_finished() -> None: + # test_job = setup_test_job + test_job = ToyJob(test_job_spec, dry_run=True) + assert get_job_status(JOB_ID) == NOT_STARTED + test_job.execute() + assert get_job_status(JOB_ID) == FINISHED + + +@pytest.mark.redis_db +def test_job_status_with_invalid_job_id() -> None: + assert get_job_status(JOB_ID) == NOT_STARTED + with pytest.raises(KeyError): + get_job_status("invalid_job_id") From d03131feb5f5107971b9be4b2fd653de5ecd5eea Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 27 Sep 2024 09:52:57 -0700 Subject: [PATCH 02/14] [wip] refactor of status tracking --- snuba/cli/jobs.py | 27 ++++++++---- snuba/manual_jobs/__init__.py | 31 ++----------- snuba/manual_jobs/job_loader.py | 2 +- snuba/manual_jobs/runner.py | 77 +++++++++++++++++++++++++++++++-- snuba/manual_jobs/toy_job.py | 2 +- tests/jobs/test_job.py | 3 +- 6 files changed, 98 insertions(+), 44 deletions(-) diff --git a/snuba/cli/jobs.py b/snuba/cli/jobs.py index cb45448994..0241baab76 100644 --- a/snuba/cli/jobs.py +++ b/snuba/cli/jobs.py @@ -2,9 +2,13 @@ import click -from snuba.manual_jobs import JobSpec, get_job_status -from snuba.manual_jobs.job_loader import JobLoader -from snuba.manual_jobs.runner import list_job_specs +from snuba.manual_jobs import JobSpec +from snuba.manual_jobs.runner import ( + MANIFEST_FILENAME, + get_job_status, + list_job_specs, + run_job, +) JOB_SPECIFICATION_ERROR_MSG = "Missing job type and/or job id" @@ -15,7 +19,14 @@ def jobs() -> None: @jobs.command() -@click.option("--json_manifest", required=True) +@click.option("--json_manifest", default=MANIFEST_FILENAME) +def list(*, json_manifest: str) -> None: + job_specs = list_job_specs(json_manifest) + click.echo(job_specs) + + +@jobs.command() +@click.option("--json_manifest", default=MANIFEST_FILENAME) @click.option("--job_id") @click.option( "--dry_run", @@ -26,8 +37,7 @@ def run_from_manifest(*, json_manifest: str, job_id: str, dry_run: bool) -> None if job_id not in job_specs.keys(): raise click.ClickException("Provide a valid job id") - job_to_run = JobLoader.get_job_instance(job_specs[job_id], dry_run) - job_to_run.execute() + run_job(job_specs[job_id], dry_run) def _parse_params(pairs: Tuple[str, ...]) -> MutableMapping[Any, Any]: @@ -47,11 +57,10 @@ def run(*, job_type: str, job_id: str, dry_run: bool, pairs: Tuple[str, ...]) -> raise click.ClickException(JOB_SPECIFICATION_ERROR_MSG) job_spec = JobSpec(job_id=job_id, job_type=job_type, params=_parse_params(pairs)) - job_to_run = JobLoader.get_job_instance(job_spec, dry_run) - job_to_run.execute() + run_job(job_spec, dry_run) @jobs.command() @click.option("--job_id") def status(*, job_id: str) -> str: - return get_job_status(job_id) + click.echo(get_job_status(job_id)) diff --git a/snuba/manual_jobs/__init__.py b/snuba/manual_jobs/__init__.py index 9191ae0ba5..270e8281c3 100644 --- a/snuba/manual_jobs/__init__.py +++ b/snuba/manual_jobs/__init__.py @@ -1,20 +1,12 @@ import logging import os -import typing from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any, MutableMapping, Optional, cast -from snuba.redis import RedisClientKey, get_redis_client from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory -logger = logging.getLogger("snuba_init") -redis_client = get_redis_client(RedisClientKey.JOB) -job_status_hash = "snuba-job-status-hash" - -RUNNING = "running" -FINISHED = "finished" -NOT_STARTED = "not started" +logger = logging.getLogger("snuba.manual_jobs") @dataclass @@ -31,16 +23,10 @@ def __init__(self, job_spec: JobSpec, dry_run: bool) -> None: if job_spec.params: for k, v in job_spec.params.items(): setattr(self, k, v) - self._set_status(NOT_STARTED) - - def execute(self) -> None: - self._set_status(RUNNING) - self._execute() - self._set_status(FINISHED) @abstractmethod - def _execute(self) -> None: - pass + def execute(self) -> None: + raise NotImplementedError @classmethod def config_key(cls) -> str: @@ -50,17 +36,6 @@ def config_key(cls) -> str: def get_from_name(cls, name: str) -> "Job": return cast("Job", cls.class_from_name(name)) - def _set_status(self, status: str) -> None: - redis_client.hset(job_status_hash, self.job_spec.job_id, status) - - -def get_job_status(job_id: str) -> str: - status = redis_client.hget(job_status_hash, job_id) - if status is None: - raise KeyError - else: - return typing.cast(str, status.decode("utf-8")) - import_submodules_in_directory( os.path.dirname(os.path.realpath(__file__)), "snuba.manual_jobs" diff --git a/snuba/manual_jobs/job_loader.py b/snuba/manual_jobs/job_loader.py index 45e4080d68..6bf72155aa 100644 --- a/snuba/manual_jobs/job_loader.py +++ b/snuba/manual_jobs/job_loader.py @@ -9,7 +9,7 @@ def __init__(self, job_type: str): super().__init__(f"Job does not exist. Did you make a file {job_type}.py yet?") -class JobLoader: +class _JobLoader: @staticmethod def get_job_instance(job_spec: JobSpec, dry_run: bool) -> "Job": job_type_class = Job.class_from_name(job_spec.job_type) diff --git a/snuba/manual_jobs/runner.py b/snuba/manual_jobs/runner.py index f8cb70044d..901e5e9ad6 100644 --- a/snuba/manual_jobs/runner.py +++ b/snuba/manual_jobs/runner.py @@ -1,12 +1,29 @@ import os -from typing import Any, Mapping +from enum import Enum +from typing import Any, Mapping, Optional import simplejson +from sentry_sdk import capture_exception from snuba.manual_jobs import JobSpec +from snuba.manual_jobs.job_loader import _JobLoader +from snuba.redis import RedisClientKey, get_redis_client +from snuba.utils.serializable_exception import SerializableException +redis_client = get_redis_client(RedisClientKey.JOB) -def _read_from_path(filename: str) -> Mapping[str, JobSpec]: + +class JobStatus(Enum): + RUNNING = "running" + FINISHED = "finished" + NOT_STARTED = "not_started" + FAILED = "failed" + + +MANIFEST_FILENAME = "job_manifest.json" + + +def _read_manifest_from_path(filename: str) -> Mapping[str, JobSpec]: local_root = os.path.dirname(__file__) with open(os.path.join(local_root, filename)) as stream: @@ -34,7 +51,59 @@ def _build_job_spec_from_entry(content: Any) -> JobSpec: return job_spec +def _job_lock_key(job_id: str) -> str: + return f"snuba:manual_jobs:{job_id}:lock" + + +def _job_status_key(job_id: str) -> str: + return f"snuba:manual_jobs:{job_id}:execution_status" + + +def _acquire_job_lock(job_id: str) -> bool: + return redis_client.set( + name=_job_lock_key(job_id), value=1, nx=True, ex=(24 * 60 * 60) + ) + + +def _release_job_lock(job_id: str) -> bool: + redis_client.delete(_job_lock_key(job_id)) + + +def _set_job_status(job_id: str, status: JobStatus) -> bool: + return redis_client.set(name=_job_status_key(job_id), value=status.name) + + +def get_job_status(job_id: str) -> Optional[JobStatus]: + return redis_client.get(name=_job_status_key(job_id)) + + def list_job_specs( - jobs_filename: str = "job_manifest.json", + manifest_filename: str = MANIFEST_FILENAME, ) -> Mapping[str, JobSpec]: - return _read_from_path(jobs_filename) + return _read_manifest_from_path(manifest_filename) + + +def run_job(job_spec: JobSpec, dry_run: bool): + current_job_status = get_job_status(job_spec.job_id) + if current_job_status is not None and current_job_status != JobStatus.NOT_STARTED: + raise SerializableException( + f"attempting to run job that has been started, status = {current_job_status}" + ) + + have_lock = _acquire_job_lock(job_spec.job_id) + if not have_lock: + raise SerializableException("could not acquire lock, job already running") + + _set_job_status(job_spec.job_id, JobStatus.NOT_STARTED) + + job_to_run = _JobLoader.get_job_instance(job_spec, dry_run) + + try: + _set_job_status(job_spec.job_id, JobStatus.RUNNING) + job_to_run.execute() + _set_job_status(job_spec.job_id, JobStatus.FINISHED) + except BaseException: + _set_job_status(job_spec.job_id, JobStatus.FAILED) + capture_exception() + finally: + _release_job_lock(job_spec.job_id) diff --git a/snuba/manual_jobs/toy_job.py b/snuba/manual_jobs/toy_job.py index 65caa5595b..a70c829345 100644 --- a/snuba/manual_jobs/toy_job.py +++ b/snuba/manual_jobs/toy_job.py @@ -15,7 +15,7 @@ def _build_query(self) -> str: else: return "not dry run query" - def _execute(self) -> None: + def execute(self) -> None: logger.info( "executing job " + self.job_spec.job_id diff --git a/tests/jobs/test_job.py b/tests/jobs/test_job.py index 4476cface2..e264e7b9e1 100644 --- a/tests/jobs/test_job.py +++ b/tests/jobs/test_job.py @@ -1,6 +1,7 @@ import pytest -from snuba.manual_jobs import FINISHED, NOT_STARTED, JobSpec, get_job_status +from snuba.manual_jobs import JobSpec, get_job_status +from snuba.manual_jobs.runner import FINISHED, NOT_STARTED from snuba.manual_jobs.toy_job import ToyJob JOB_ID = "abc1234" From 34a50fe3f4ad4311fa98c00585eb02bee5aeca56 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 27 Sep 2024 10:27:11 -0700 Subject: [PATCH 03/14] fix typing, tests --- snuba/cli/jobs.py | 11 ++++++--- snuba/manual_jobs/runner.py | 43 +++++++++++++++++++++------------ snuba/redis.py | 6 ++--- snuba/settings/__init__.py | 4 +-- snuba/settings/settings_test.py | 2 +- tests/jobs/test_job.py | 21 +++++----------- 6 files changed, 47 insertions(+), 40 deletions(-) diff --git a/snuba/cli/jobs.py b/snuba/cli/jobs.py index 0241baab76..a5f996074e 100644 --- a/snuba/cli/jobs.py +++ b/snuba/cli/jobs.py @@ -25,6 +25,11 @@ def list(*, json_manifest: str) -> None: click.echo(job_specs) +def _run_job_and_echo_status(job_spec: JobSpec, dry_run: bool) -> None: + status = run_job(job_spec, dry_run) + click.echo(f"resulting job status = {status}") + + @jobs.command() @click.option("--json_manifest", default=MANIFEST_FILENAME) @click.option("--job_id") @@ -37,7 +42,7 @@ def run_from_manifest(*, json_manifest: str, job_id: str, dry_run: bool) -> None if job_id not in job_specs.keys(): raise click.ClickException("Provide a valid job id") - run_job(job_specs[job_id], dry_run) + _run_job_and_echo_status(job_specs[job_id], dry_run) def _parse_params(pairs: Tuple[str, ...]) -> MutableMapping[Any, Any]: @@ -57,10 +62,10 @@ def run(*, job_type: str, job_id: str, dry_run: bool, pairs: Tuple[str, ...]) -> raise click.ClickException(JOB_SPECIFICATION_ERROR_MSG) job_spec = JobSpec(job_id=job_id, job_type=job_type, params=_parse_params(pairs)) - run_job(job_spec, dry_run) + _run_job_and_echo_status(job_spec, dry_run) @jobs.command() @click.option("--job_id") -def status(*, job_id: str) -> str: +def status(*, job_id: str) -> None: click.echo(get_job_status(job_id)) diff --git a/snuba/manual_jobs/runner.py b/snuba/manual_jobs/runner.py index 901e5e9ad6..dc313bf620 100644 --- a/snuba/manual_jobs/runner.py +++ b/snuba/manual_jobs/runner.py @@ -1,5 +1,5 @@ import os -from enum import Enum +from enum import StrEnum from typing import Any, Mapping, Optional import simplejson @@ -10,10 +10,10 @@ from snuba.redis import RedisClientKey, get_redis_client from snuba.utils.serializable_exception import SerializableException -redis_client = get_redis_client(RedisClientKey.JOB) +_redis_client = get_redis_client(RedisClientKey.MANUAL_JOBS) -class JobStatus(Enum): +class JobStatus(StrEnum): RUNNING = "running" FINISHED = "finished" NOT_STARTED = "not_started" @@ -60,21 +60,28 @@ def _job_status_key(job_id: str) -> str: def _acquire_job_lock(job_id: str) -> bool: - return redis_client.set( - name=_job_lock_key(job_id), value=1, nx=True, ex=(24 * 60 * 60) + return bool( + _redis_client.set( + name=_job_lock_key(job_id), value=1, nx=True, ex=(24 * 60 * 60) + ) ) -def _release_job_lock(job_id: str) -> bool: - redis_client.delete(_job_lock_key(job_id)) +def _release_job_lock(job_id: str) -> None: + _redis_client.delete(_job_lock_key(job_id)) -def _set_job_status(job_id: str, status: JobStatus) -> bool: - return redis_client.set(name=_job_status_key(job_id), value=status.name) +def _set_job_status(job_id: str, status: JobStatus) -> JobStatus: + _redis_client.set(name=_job_status_key(job_id), value=status.value) + return status def get_job_status(job_id: str) -> Optional[JobStatus]: - return redis_client.get(name=_job_status_key(job_id)) + redis_status = _redis_client.get(name=_job_status_key(job_id)) + if redis_status is None: + return redis_status + else: + return JobStatus(redis_status.decode("utf-8")) def list_job_specs( @@ -83,7 +90,7 @@ def list_job_specs( return _read_manifest_from_path(manifest_filename) -def run_job(job_spec: JobSpec, dry_run: bool): +def run_job(job_spec: JobSpec, dry_run: bool) -> JobStatus: current_job_status = get_job_status(job_spec.job_id) if current_job_status is not None and current_job_status != JobStatus.NOT_STARTED: raise SerializableException( @@ -92,18 +99,22 @@ def run_job(job_spec: JobSpec, dry_run: bool): have_lock = _acquire_job_lock(job_spec.job_id) if not have_lock: - raise SerializableException("could not acquire lock, job already running") + raise SerializableException( + "could not acquire lock, another thread is attempting to run job" + ) - _set_job_status(job_spec.job_id, JobStatus.NOT_STARTED) + current_job_status = _set_job_status(job_spec.job_id, JobStatus.NOT_STARTED) job_to_run = _JobLoader.get_job_instance(job_spec, dry_run) try: - _set_job_status(job_spec.job_id, JobStatus.RUNNING) + current_job_status = _set_job_status(job_spec.job_id, JobStatus.RUNNING) job_to_run.execute() - _set_job_status(job_spec.job_id, JobStatus.FINISHED) + current_job_status = _set_job_status(job_spec.job_id, JobStatus.FINISHED) except BaseException: - _set_job_status(job_spec.job_id, JobStatus.FAILED) + current_job_status = _set_job_status(job_spec.job_id, JobStatus.FAILED) capture_exception() finally: _release_job_lock(job_spec.job_id) + + return current_job_status diff --git a/snuba/redis.py b/snuba/redis.py index 771e4aea69..df0c21e98f 100644 --- a/snuba/redis.py +++ b/snuba/redis.py @@ -114,7 +114,7 @@ class RedisClientKey(Enum): DLQ = "dlq" OPTIMIZE = "optimize" ADMIN_AUTH = "admin_auth" - JOB = "job" + MANUAL_JOBS = "manual_jobs" _redis_clients: Mapping[RedisClientKey, RedisClientType] = { @@ -142,8 +142,8 @@ class RedisClientKey(Enum): RedisClientKey.ADMIN_AUTH: _initialize_specialized_redis_cluster( settings.REDIS_CLUSTERS["admin_auth"] ), - RedisClientKey.JOB: _initialize_specialized_redis_cluster( - settings.REDIS_CLUSTERS["job"] + RedisClientKey.MANUAL_JOBS: _initialize_specialized_redis_cluster( + settings.REDIS_CLUSTERS["manual_jobs"] ), } diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index 063219ca84..310587f5d7 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -175,7 +175,7 @@ class RedisClusters(TypedDict): dlq: RedisClusterConfig | None optimize: RedisClusterConfig | None admin_auth: RedisClusterConfig | None - job: RedisClusterConfig | None + manual_jobs: RedisClusterConfig | None REDIS_CLUSTERS: RedisClusters = { @@ -187,7 +187,7 @@ class RedisClusters(TypedDict): "dlq": None, "optimize": None, "admin_auth": None, - "job": None, + "manual_jobs": None, } # Query Recording Options diff --git a/snuba/settings/settings_test.py b/snuba/settings/settings_test.py index c015219b01..70ca081e43 100644 --- a/snuba/settings/settings_test.py +++ b/snuba/settings/settings_test.py @@ -60,7 +60,7 @@ (7, "dlq"), (8, "optimize"), (9, "admin_auth"), - (10, "job"), + (10, "manual_jobs"), ] } VALIDATE_DATASET_YAMLS_ON_STARTUP = True diff --git a/tests/jobs/test_job.py b/tests/jobs/test_job.py index e264e7b9e1..294ec7d2b1 100644 --- a/tests/jobs/test_job.py +++ b/tests/jobs/test_job.py @@ -1,28 +1,19 @@ import pytest -from snuba.manual_jobs import JobSpec, get_job_status -from snuba.manual_jobs.runner import FINISHED, NOT_STARTED -from snuba.manual_jobs.toy_job import ToyJob +from snuba.manual_jobs import JobSpec +from snuba.manual_jobs.runner import JobStatus, get_job_status, run_job JOB_ID = "abc1234" test_job_spec = JobSpec(job_id=JOB_ID, job_type="ToyJob") -# @pytest.fixture(autouse=True) -# def setup_test_job() -> Job: -# return ToyJob(test_job_spec, dry_run=True) - @pytest.mark.redis_db def test_job_status_changes_to_finished() -> None: - # test_job = setup_test_job - test_job = ToyJob(test_job_spec, dry_run=True) - assert get_job_status(JOB_ID) == NOT_STARTED - test_job.execute() - assert get_job_status(JOB_ID) == FINISHED + assert get_job_status(JOB_ID) is None + run_job(test_job_spec, False) + assert get_job_status(JOB_ID) == JobStatus.FINISHED @pytest.mark.redis_db def test_job_status_with_invalid_job_id() -> None: - assert get_job_status(JOB_ID) == NOT_STARTED - with pytest.raises(KeyError): - get_job_status("invalid_job_id") + assert get_job_status("invalid_job_id") is None From 1ba1350aa938223c3a513a10355524e2a26077ac Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 27 Sep 2024 10:52:17 -0700 Subject: [PATCH 04/14] add failure status test --- snuba/manual_jobs/runner.py | 9 ++++++--- tests/jobs/test_job.py | 23 ++++++++++++++++++++++- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/snuba/manual_jobs/runner.py b/snuba/manual_jobs/runner.py index dc313bf620..841f5831f5 100644 --- a/snuba/manual_jobs/runner.py +++ b/snuba/manual_jobs/runner.py @@ -108,11 +108,14 @@ def run_job(job_spec: JobSpec, dry_run: bool) -> JobStatus: job_to_run = _JobLoader.get_job_instance(job_spec, dry_run) try: - current_job_status = _set_job_status(job_spec.job_id, JobStatus.RUNNING) + if not dry_run: + current_job_status = _set_job_status(job_spec.job_id, JobStatus.RUNNING) job_to_run.execute() - current_job_status = _set_job_status(job_spec.job_id, JobStatus.FINISHED) + if not dry_run: + current_job_status = _set_job_status(job_spec.job_id, JobStatus.FINISHED) except BaseException: - current_job_status = _set_job_status(job_spec.job_id, JobStatus.FAILED) + if not dry_run: + current_job_status = _set_job_status(job_spec.job_id, JobStatus.FAILED) capture_exception() finally: _release_job_lock(job_spec.job_id) diff --git a/tests/jobs/test_job.py b/tests/jobs/test_job.py index 294ec7d2b1..051975a487 100644 --- a/tests/jobs/test_job.py +++ b/tests/jobs/test_job.py @@ -1,12 +1,24 @@ +from unittest.mock import patch + import pytest -from snuba.manual_jobs import JobSpec +from snuba.manual_jobs import Job, JobSpec +from snuba.manual_jobs.job_loader import _JobLoader from snuba.manual_jobs.runner import JobStatus, get_job_status, run_job +from snuba.utils.serializable_exception import SerializableException JOB_ID = "abc1234" test_job_spec = JobSpec(job_id=JOB_ID, job_type="ToyJob") +class FailJob(Job): + def __init__(self): + pass + + def execute(self): + raise SerializableException("Intended failure") + + @pytest.mark.redis_db def test_job_status_changes_to_finished() -> None: assert get_job_status(JOB_ID) is None @@ -14,6 +26,15 @@ def test_job_status_changes_to_finished() -> None: assert get_job_status(JOB_ID) == JobStatus.FINISHED +@pytest.mark.redis_db +def test_job_with_exception_causes_failure() -> None: + with patch.object(_JobLoader, "get_job_instance") as MockGetInstace: + MockGetInstace.return_value = FailJob() + assert get_job_status(JOB_ID) is None + run_job(test_job_spec, False) + assert get_job_status(JOB_ID) == JobStatus.FAILED + + @pytest.mark.redis_db def test_job_status_with_invalid_job_id() -> None: assert get_job_status("invalid_job_id") is None From 4c35fb9e516494fb52c88665f44a0d5d2a8fe91b Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 27 Sep 2024 10:53:52 -0700 Subject: [PATCH 05/14] typo --- tests/jobs/test_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/jobs/test_job.py b/tests/jobs/test_job.py index 051975a487..445684506c 100644 --- a/tests/jobs/test_job.py +++ b/tests/jobs/test_job.py @@ -28,8 +28,8 @@ def test_job_status_changes_to_finished() -> None: @pytest.mark.redis_db def test_job_with_exception_causes_failure() -> None: - with patch.object(_JobLoader, "get_job_instance") as MockGetInstace: - MockGetInstace.return_value = FailJob() + with patch.object(_JobLoader, "get_job_instance") as MockGetInstance: + MockGetInstance.return_value = FailJob() assert get_job_status(JOB_ID) is None run_job(test_job_spec, False) assert get_job_status(JOB_ID) == JobStatus.FAILED From d433ef4c1f1655f66d61e289dcdd2c1889e59c8d Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 27 Sep 2024 11:59:32 -0700 Subject: [PATCH 06/14] add test for slower job staying in RUNNING --- tests/jobs/test_job.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/jobs/test_job.py b/tests/jobs/test_job.py index 445684506c..7d695fad40 100644 --- a/tests/jobs/test_job.py +++ b/tests/jobs/test_job.py @@ -1,3 +1,5 @@ +from threading import Thread +from time import sleep from unittest.mock import patch import pytest @@ -19,6 +21,15 @@ def execute(self): raise SerializableException("Intended failure") +class SlowJob(Job): + def __init__(self): + self.stop = False + + def execute(self): + while not self.stop: + sleep(0.005) + + @pytest.mark.redis_db def test_job_status_changes_to_finished() -> None: assert get_job_status(JOB_ID) is None @@ -35,6 +46,21 @@ def test_job_with_exception_causes_failure() -> None: assert get_job_status(JOB_ID) == JobStatus.FAILED +@pytest.mark.redis_db +def test_slow_job_stay_running() -> None: + with patch.object(_JobLoader, "get_job_instance") as MockGetInstance: + job = SlowJob() + MockGetInstance.return_value = job + assert get_job_status(JOB_ID) is None + t = Thread( + target=run_job, name="slow-background-job", args=[test_job_spec, False] + ) + t.start() + sleep(0.1) + assert get_job_status(JOB_ID) == JobStatus.RUNNING + job.stop = True + + @pytest.mark.redis_db def test_job_status_with_invalid_job_id() -> None: assert get_job_status("invalid_job_id") is None From 722a5cc95019575716a7741717eca9516de583c2 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 27 Sep 2024 13:25:20 -0700 Subject: [PATCH 07/14] add test for job lock --- snuba/manual_jobs/runner.py | 9 ++++++--- tests/jobs/test_job.py | 15 ++++++++++++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/snuba/manual_jobs/runner.py b/snuba/manual_jobs/runner.py index 841f5831f5..15e969cd7d 100644 --- a/snuba/manual_jobs/runner.py +++ b/snuba/manual_jobs/runner.py @@ -20,6 +20,11 @@ class JobStatus(StrEnum): FAILED = "failed" +class JobLockedException(SerializableException): + def __init__(self, job_id: str): + super().__init__(f"Job {job_id} lock exists, not able to run") + + MANIFEST_FILENAME = "job_manifest.json" @@ -99,9 +104,7 @@ def run_job(job_spec: JobSpec, dry_run: bool) -> JobStatus: have_lock = _acquire_job_lock(job_spec.job_id) if not have_lock: - raise SerializableException( - "could not acquire lock, another thread is attempting to run job" - ) + raise JobLockedException(job_spec.job_id) current_job_status = _set_job_status(job_spec.job_id, JobStatus.NOT_STARTED) diff --git a/tests/jobs/test_job.py b/tests/jobs/test_job.py index 7d695fad40..25e6a9a2b1 100644 --- a/tests/jobs/test_job.py +++ b/tests/jobs/test_job.py @@ -6,7 +6,13 @@ from snuba.manual_jobs import Job, JobSpec from snuba.manual_jobs.job_loader import _JobLoader -from snuba.manual_jobs.runner import JobStatus, get_job_status, run_job +from snuba.manual_jobs.runner import ( + JobLockedException, + JobStatus, + _acquire_job_lock, + get_job_status, + run_job, +) from snuba.utils.serializable_exception import SerializableException JOB_ID = "abc1234" @@ -64,3 +70,10 @@ def test_slow_job_stay_running() -> None: @pytest.mark.redis_db def test_job_status_with_invalid_job_id() -> None: assert get_job_status("invalid_job_id") is None + + +@pytest.mark.redis_db +def test_job_lock() -> None: + _acquire_job_lock(JOB_ID) + with pytest.raises(JobLockedException): + run_job(test_job_spec, False) From 80cc0f4d862a5a4793574a27d5139b5c50e0a297 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 27 Sep 2024 13:29:08 -0700 Subject: [PATCH 08/14] exception cleanup --- snuba/manual_jobs/runner.py | 13 +++++++++---- tests/jobs/test_job.py | 3 +++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/snuba/manual_jobs/runner.py b/snuba/manual_jobs/runner.py index 15e969cd7d..030d1d8360 100644 --- a/snuba/manual_jobs/runner.py +++ b/snuba/manual_jobs/runner.py @@ -22,7 +22,14 @@ class JobStatus(StrEnum): class JobLockedException(SerializableException): def __init__(self, job_id: str): - super().__init__(f"Job {job_id} lock exists, not able to run") + super().__init__(f"Job {job_id} lock exists, not available to run") + + +class JobStatusException(SerializableException): + def __init__(self, job_id: str, status: JobStatus): + super().__init__( + f"Job {job_id} has run before, status = {status}, not available to run" + ) MANIFEST_FILENAME = "job_manifest.json" @@ -98,9 +105,7 @@ def list_job_specs( def run_job(job_spec: JobSpec, dry_run: bool) -> JobStatus: current_job_status = get_job_status(job_spec.job_id) if current_job_status is not None and current_job_status != JobStatus.NOT_STARTED: - raise SerializableException( - f"attempting to run job that has been started, status = {current_job_status}" - ) + raise JobStatusException(job_id=job_spec.job_id, status=current_job_status) have_lock = _acquire_job_lock(job_spec.job_id) if not have_lock: diff --git a/tests/jobs/test_job.py b/tests/jobs/test_job.py index 25e6a9a2b1..a0cd66367f 100644 --- a/tests/jobs/test_job.py +++ b/tests/jobs/test_job.py @@ -9,6 +9,7 @@ from snuba.manual_jobs.runner import ( JobLockedException, JobStatus, + JobStatusException, _acquire_job_lock, get_job_status, run_job, @@ -41,6 +42,8 @@ def test_job_status_changes_to_finished() -> None: assert get_job_status(JOB_ID) is None run_job(test_job_spec, False) assert get_job_status(JOB_ID) == JobStatus.FINISHED + with pytest.raises(JobStatusException): + run_job(test_job_spec, False) @pytest.mark.redis_db From 610b53587bd9fcb172d0319f3986f41697ef2644 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 27 Sep 2024 13:32:35 -0700 Subject: [PATCH 09/14] rename test directory --- tests/{jobs => manual_jobs}/test_job.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{jobs => manual_jobs}/test_job.py (100%) diff --git a/tests/jobs/test_job.py b/tests/manual_jobs/test_job.py similarity index 100% rename from tests/jobs/test_job.py rename to tests/manual_jobs/test_job.py From ca0cde0e3af9d3a1613f9a10cde82083fed59075 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 27 Sep 2024 13:40:03 -0700 Subject: [PATCH 10/14] typing annotations --- tests/manual_jobs/test_job.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/manual_jobs/test_job.py b/tests/manual_jobs/test_job.py index a0cd66367f..e011eb590c 100644 --- a/tests/manual_jobs/test_job.py +++ b/tests/manual_jobs/test_job.py @@ -21,18 +21,18 @@ class FailJob(Job): - def __init__(self): + def __init__(self) -> None: pass - def execute(self): + def execute(self) -> None: raise SerializableException("Intended failure") class SlowJob(Job): - def __init__(self): + def __init__(self) -> None: self.stop = False - def execute(self): + def execute(self) -> None: while not self.stop: sleep(0.005) From 607e343151fe237406abdf1b4b33efa08be5a496 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Mon, 30 Sep 2024 11:35:59 -0700 Subject: [PATCH 11/14] load redis for all tests that run jobs --- tests/cli/test_jobs.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/cli/test_jobs.py b/tests/cli/test_jobs.py index 6cd04641f5..e37affa10d 100644 --- a/tests/cli/test_jobs.py +++ b/tests/cli/test_jobs.py @@ -4,6 +4,7 @@ from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest, status +@pytest.mark.redis_db def test_cmd_line_valid() -> None: runner = CliRunner() result = runner.invoke( @@ -14,6 +15,7 @@ def test_cmd_line_valid() -> None: assert result.exit_code == 0 +@pytest.mark.redis_db def test_invalid_job_errors() -> None: runner = CliRunner() result = runner.invoke( @@ -33,6 +35,7 @@ def test_invalid_job_errors() -> None: assert result.exit_code == 1 +@pytest.mark.redis_db def test_cmd_line_no_job_specification_errors() -> None: runner = CliRunner() result = runner.invoke(run, ["--dry_run", "True", "k1=v1", "k2=v2"]) @@ -40,6 +43,7 @@ def test_cmd_line_no_job_specification_errors() -> None: assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n" +@pytest.mark.redis_db def test_cmd_line_no_job_id_errors() -> None: runner = CliRunner() result = runner.invoke( @@ -49,6 +53,7 @@ def test_cmd_line_no_job_id_errors() -> None: assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n" +@pytest.mark.redis_db def test_cmd_line_no_job_type_errors() -> None: runner = CliRunner() result = runner.invoke( @@ -58,6 +63,7 @@ def test_cmd_line_no_job_type_errors() -> None: assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n" +@pytest.mark.redis_db def test_json_valid() -> None: runner = CliRunner() result = runner.invoke( From 8cc8ca002c1a6e0531114a6cf1e754302248afd3 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Tue, 1 Oct 2024 09:34:08 -0700 Subject: [PATCH 12/14] method rename --- snuba/manual_jobs/runner.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/snuba/manual_jobs/runner.py b/snuba/manual_jobs/runner.py index 030d1d8360..a6fef33607 100644 --- a/snuba/manual_jobs/runner.py +++ b/snuba/manual_jobs/runner.py @@ -63,33 +63,33 @@ def _build_job_spec_from_entry(content: Any) -> JobSpec: return job_spec -def _job_lock_key(job_id: str) -> str: +def _build_job_lock_key(job_id: str) -> str: return f"snuba:manual_jobs:{job_id}:lock" -def _job_status_key(job_id: str) -> str: +def _build_job_status_key(job_id: str) -> str: return f"snuba:manual_jobs:{job_id}:execution_status" def _acquire_job_lock(job_id: str) -> bool: return bool( _redis_client.set( - name=_job_lock_key(job_id), value=1, nx=True, ex=(24 * 60 * 60) + name=_build_job_lock_key(job_id), value=1, nx=True, ex=(24 * 60 * 60) ) ) def _release_job_lock(job_id: str) -> None: - _redis_client.delete(_job_lock_key(job_id)) + _redis_client.delete(_build_job_lock_key(job_id)) def _set_job_status(job_id: str, status: JobStatus) -> JobStatus: - _redis_client.set(name=_job_status_key(job_id), value=status.value) + _redis_client.set(name=_build_job_status_key(job_id), value=status.value) return status def get_job_status(job_id: str) -> Optional[JobStatus]: - redis_status = _redis_client.get(name=_job_status_key(job_id)) + redis_status = _redis_client.get(name=_build_job_status_key(job_id)) if redis_status is None: return redis_status else: From 5cce7a163c3c71548301207d96b0ec789ac26851 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Tue, 1 Oct 2024 09:45:21 -0700 Subject: [PATCH 13/14] raise exception on set_job_status() if redis set fails --- snuba/manual_jobs/runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/snuba/manual_jobs/runner.py b/snuba/manual_jobs/runner.py index a6fef33607..2c3c9160d2 100644 --- a/snuba/manual_jobs/runner.py +++ b/snuba/manual_jobs/runner.py @@ -84,7 +84,8 @@ def _release_job_lock(job_id: str) -> None: def _set_job_status(job_id: str, status: JobStatus) -> JobStatus: - _redis_client.set(name=_build_job_status_key(job_id), value=status.value) + if not _redis_client.set(name=_build_job_status_key(job_id), value=status.value): + raise SerializableException(f"Failed to set job status {status} on {job_id}") return status From 5b4156d375d798024d92a23808b94999bb5c6452 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Tue, 1 Oct 2024 10:22:03 -0700 Subject: [PATCH 14/14] style change --- snuba/manual_jobs/runner.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/snuba/manual_jobs/runner.py b/snuba/manual_jobs/runner.py index 2c3c9160d2..b0705c8706 100644 --- a/snuba/manual_jobs/runner.py +++ b/snuba/manual_jobs/runner.py @@ -91,10 +91,7 @@ def _set_job_status(job_id: str, status: JobStatus) -> JobStatus: def get_job_status(job_id: str) -> Optional[JobStatus]: redis_status = _redis_client.get(name=_build_job_status_key(job_id)) - if redis_status is None: - return redis_status - else: - return JobStatus(redis_status.decode("utf-8")) + return JobStatus(redis_status.decode("utf-8")) if redis_status else redis_status def list_job_specs(