Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachel Chen authored and Rachel Chen committed Sep 20, 2024
1 parent f030a4d commit c0270df
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 5 deletions.
8 changes: 7 additions & 1 deletion snuba/cli/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import click

from snuba.manual_jobs import JobSpec
from snuba.manual_jobs import JobSpec, get_job_status
from snuba.manual_jobs.job_loader import JobLoader
from snuba.manual_jobs.manifest_reader import _ManifestReader

Expand Down Expand Up @@ -49,3 +49,9 @@ def run(*, job_type: str, job_id: str, dry_run: bool, pairs: Tuple[str, ...]) ->

job_to_run = JobLoader.get_job_instance(job_spec, dry_run)
job_to_run.execute()


@jobs.command()
@click.option("--job_id")
def status(*, job_id: str) -> str:
return get_job_status(job_id)
29 changes: 27 additions & 2 deletions snuba/manual_jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import logging
import os
import typing
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, MutableMapping, Optional, cast

from snuba.redis import RedisClientKey, get_redis_client
from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory

logger = logging.getLogger("snuba_init")
redis_client = get_redis_client(RedisClientKey.JOB)
job_status_hash = "snuba-job-status-hash"

RUNNING = "running"
FINISHED = "finished"
NOT_STARTED = "not started"


@dataclass
class JobSpec:
job_id: str
job_type: str
params: Optional[MutableMapping[Any, Any]]
params: Optional[MutableMapping[Any, Any]] = None


class Job(ABC, metaclass=RegisteredClass):
Expand All @@ -23,9 +31,15 @@ def __init__(self, job_spec: JobSpec, dry_run: bool) -> None:
if job_spec.params:
for k, v in job_spec.params.items():
setattr(self, k, v)
self._set_status(NOT_STARTED)

@abstractmethod
def execute(self) -> None:
self._set_status(RUNNING)
self._execute()
self._set_status(FINISHED)

@abstractmethod
def _execute(self) -> None:
pass

@classmethod
Expand All @@ -36,6 +50,17 @@ def config_key(cls) -> str:
def get_from_name(cls, name: str) -> "Job":
return cast("Job", cls.class_from_name(name))

def _set_status(self, status: str) -> None:
redis_client.hset(job_status_hash, self.job_spec.job_id, status)


def get_job_status(job_id: str) -> str:
status = redis_client.hget(job_status_hash, job_id)
if status is None:
raise KeyError
else:
return typing.cast(str, status.decode("utf-8"))


import_submodules_in_directory(
os.path.dirname(os.path.realpath(__file__)), "snuba.manual_jobs"
Expand Down
2 changes: 1 addition & 1 deletion snuba/manual_jobs/toy_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def _build_query(self) -> str:
else:
return "not dry run query"

def execute(self) -> None:
def _execute(self) -> None:
logger.info(
"executing job "
+ self.job_spec.job_id
Expand Down
4 changes: 4 additions & 0 deletions snuba/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class RedisClientKey(Enum):
DLQ = "dlq"
OPTIMIZE = "optimize"
ADMIN_AUTH = "admin_auth"
JOB = "job"


_redis_clients: Mapping[RedisClientKey, RedisClientType] = {
Expand Down Expand Up @@ -141,6 +142,9 @@ class RedisClientKey(Enum):
RedisClientKey.ADMIN_AUTH: _initialize_specialized_redis_cluster(
settings.REDIS_CLUSTERS["admin_auth"]
),
RedisClientKey.JOB: _initialize_specialized_redis_cluster(
settings.REDIS_CLUSTERS["job"]
),
}


Expand Down
2 changes: 2 additions & 0 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class RedisClusters(TypedDict):
dlq: RedisClusterConfig | None
optimize: RedisClusterConfig | None
admin_auth: RedisClusterConfig | None
job: RedisClusterConfig | None


REDIS_CLUSTERS: RedisClusters = {
Expand All @@ -186,6 +187,7 @@ class RedisClusters(TypedDict):
"dlq": None,
"optimize": None,
"admin_auth": None,
"job": None,
}

# Query Recording Options
Expand Down
1 change: 1 addition & 0 deletions snuba/settings/settings_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
(7, "dlq"),
(8, "optimize"),
(9, "admin_auth"),
(10, "job"),
]
}
VALIDATE_DATASET_YAMLS_ON_STARTUP = True
19 changes: 18 additions & 1 deletion tests/cli/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from click.testing import CliRunner

from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest
from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest, status


def test_cmd_line_valid() -> None:
Expand Down Expand Up @@ -69,3 +70,19 @@ def test_json_valid() -> None:
],
)
assert result.exit_code == 0


@pytest.mark.redis_db
def test_jobs_status() -> None:
runner = CliRunner()
runner.invoke(
run_from_manifest,
[
"--json_manifest",
"job_manifest.json",
"--job_id",
"abc1234",
],
)
result = runner.invoke(status, ["--job_id", "abc1234"])
assert result.exit_code == 0
27 changes: 27 additions & 0 deletions tests/jobs/test_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pytest

from snuba.manual_jobs import FINISHED, NOT_STARTED, JobSpec, get_job_status
from snuba.manual_jobs.toy_job import ToyJob

JOB_ID = "abc1234"
test_job_spec = JobSpec(job_id=JOB_ID, job_type="ToyJob")

# @pytest.fixture(autouse=True)
# def setup_test_job() -> Job:
# return ToyJob(test_job_spec, dry_run=True)


@pytest.mark.redis_db
def test_job_status_changes_to_finished() -> None:
# test_job = setup_test_job
test_job = ToyJob(test_job_spec, dry_run=True)
assert get_job_status(JOB_ID) == NOT_STARTED
test_job.execute()
assert get_job_status(JOB_ID) == FINISHED


@pytest.mark.redis_db
def test_job_status_with_invalid_job_id() -> None:
assert get_job_status(JOB_ID) == NOT_STARTED
with pytest.raises(KeyError):
get_job_status("invalid_job_id")

0 comments on commit c0270df

Please sign in to comment.