Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job-runner): surface job status to snuba admin #6361

Merged
merged 22 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions snuba/admin/static/manual_jobs/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ function ViewCustomJobs(props: { api: Client }) {
}, []);

function jobSpecsAsRows() {
return Object.entries(jobSpecs).map(([_, spec]) => {
return Object.entries(jobSpecs).map(([_, job_info]) => {
return [
spec.job_id,
spec.job_type,
JSON.stringify(spec.params),
"TODO",
job_info.spec.job_id,
job_info.spec.job_type,
JSON.stringify(job_info.spec.params),
job_info.status,
"TODO",
];
});
Expand Down
5 changes: 4 additions & 1 deletion snuba/admin/static/manual_jobs/types.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ type JobSpec = {
};

type JobSpecMap = {
[key: string]: JobSpec;
[key: string]: {
spec: JobSpec;
status: string;
};
};
4 changes: 2 additions & 2 deletions snuba/admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
get_writable_storage,
)
from snuba.datasets.storages.storage_key import StorageKey
from snuba.manual_jobs.runner import list_job_specs
from snuba.manual_jobs.runner import list_job_specs_with_status
from snuba.migrations.connect import check_for_inactive_replicas
from snuba.migrations.errors import InactiveClickhouseReplica, MigrationError
from snuba.migrations.groups import MigrationGroup, get_group_readiness_state
Expand Down Expand Up @@ -1263,7 +1263,7 @@ def deletes_enabled() -> Response:
@application.route("/job-specs", methods=["GET"])
@check_tool_perms(tools=[AdminTools.MANUAL_JOBS])
def get_job_specs() -> Response:
return make_response(jsonify(list_job_specs()), 200)
return make_response(jsonify(list_job_specs_with_status()), 200)


@application.route("/clickhouse_node_info")
Expand Down
34 changes: 27 additions & 7 deletions snuba/cli/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import click

from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.job_loader import JobLoader
from snuba.manual_jobs.runner import list_job_specs
from snuba.manual_jobs.runner import (
MANIFEST_FILENAME,
get_job_status,
list_job_specs,
run_job,
)

JOB_SPECIFICATION_ERROR_MSG = "Missing job type and/or job id"

Expand All @@ -15,7 +19,19 @@ def jobs() -> None:


@jobs.command()
@click.option("--json_manifest", required=True)
@click.option("--json_manifest", default=MANIFEST_FILENAME)
def list(*, json_manifest: str) -> None:
job_specs = list_job_specs(json_manifest)
click.echo(job_specs)


def _run_job_and_echo_status(job_spec: JobSpec, dry_run: bool) -> None:
status = run_job(job_spec, dry_run)
click.echo(f"resulting job status = {status}")


@jobs.command()
@click.option("--json_manifest", default=MANIFEST_FILENAME)
@click.option("--job_id")
@click.option(
"--dry_run",
Expand All @@ -26,8 +42,7 @@ def run_from_manifest(*, json_manifest: str, job_id: str, dry_run: bool) -> None
if job_id not in job_specs.keys():
raise click.ClickException("Provide a valid job id")

job_to_run = JobLoader.get_job_instance(job_specs[job_id], dry_run)
job_to_run.execute()
_run_job_and_echo_status(job_specs[job_id], dry_run)


def _parse_params(pairs: Tuple[str, ...]) -> MutableMapping[Any, Any]:
Expand All @@ -47,5 +62,10 @@ def run(*, job_type: str, job_id: str, dry_run: bool, pairs: Tuple[str, ...]) ->
raise click.ClickException(JOB_SPECIFICATION_ERROR_MSG)
job_spec = JobSpec(job_id=job_id, job_type=job_type, params=_parse_params(pairs))

job_to_run = JobLoader.get_job_instance(job_spec, dry_run)
job_to_run.execute()
_run_job_and_echo_status(job_spec, dry_run)


@jobs.command()
@click.option("--job_id")
def status(*, job_id: str) -> None:
click.echo(get_job_status(job_id))
6 changes: 3 additions & 3 deletions snuba/manual_jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory

logger = logging.getLogger("snuba_init")
logger = logging.getLogger("snuba.manual_jobs")


@dataclass
class JobSpec:
job_id: str
job_type: str
params: Optional[MutableMapping[Any, Any]]
params: Optional[MutableMapping[Any, Any]] = None


class Job(ABC, metaclass=RegisteredClass):
Expand All @@ -26,7 +26,7 @@ def __init__(self, job_spec: JobSpec, dry_run: bool) -> None:

@abstractmethod
def execute(self) -> None:
pass
raise NotImplementedError

@classmethod
def config_key(cls) -> str:
Expand Down
2 changes: 1 addition & 1 deletion snuba/manual_jobs/job_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def __init__(self, job_type: str):
super().__init__(f"Job does not exist. Did you make a file {job_type}.py yet?")


class JobLoader:
class _JobLoader:
@staticmethod
def get_job_instance(job_spec: JobSpec, dry_run: bool) -> "Job":
job_type_class = Job.class_from_name(job_spec.job_type)
Expand Down
118 changes: 114 additions & 4 deletions snuba/manual_jobs/runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,41 @@
import os
from typing import Any, Mapping
from enum import StrEnum
from typing import Any, Mapping, Sequence, Union

import simplejson
from sentry_sdk import capture_exception

from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.job_loader import _JobLoader
from snuba.redis import RedisClientKey, get_redis_client
from snuba.utils.serializable_exception import SerializableException

_redis_client = get_redis_client(RedisClientKey.MANUAL_JOBS)

def _read_from_path(filename: str) -> Mapping[str, JobSpec]:

class JobStatus(StrEnum):
RUNNING = "running"
FINISHED = "finished"
NOT_STARTED = "not_started"
FAILED = "failed"


class JobLockedException(SerializableException):
def __init__(self, job_id: str):
super().__init__(f"Job {job_id} lock exists, not available to run")


class JobStatusException(SerializableException):
def __init__(self, job_id: str, status: JobStatus):
super().__init__(
f"Job {job_id} has run before, status = {status}, not available to run"
)


MANIFEST_FILENAME = "job_manifest.json"


def _read_manifest_from_path(filename: str) -> Mapping[str, JobSpec]:
local_root = os.path.dirname(__file__)

with open(os.path.join(local_root, filename)) as stream:
Expand Down Expand Up @@ -34,7 +63,88 @@ def _build_job_spec_from_entry(content: Any) -> JobSpec:
return job_spec


def _job_lock_key(job_id: str) -> str:
return f"snuba:manual_jobs:{job_id}:lock"


def _job_status_key(job_id: str) -> str:
return f"snuba:manual_jobs:{job_id}:execution_status"


def _acquire_job_lock(job_id: str) -> bool:
return bool(
_redis_client.set(
name=_job_lock_key(job_id), value=1, nx=True, ex=(24 * 60 * 60)
)
)


def _release_job_lock(job_id: str) -> None:
_redis_client.delete(_job_lock_key(job_id))


def _set_job_status(job_id: str, status: JobStatus) -> JobStatus:
_redis_client.set(name=_job_status_key(job_id), value=status.value)
return status


def _get_job_status_multi(job_ids: Sequence[str]) -> Sequence[JobStatus]:
return [
redis_status.decode() if redis_status is not None else JobStatus.NOT_STARTED
for redis_status in _redis_client.mget(job_ids)
]


def get_job_status(job_id: str) -> JobStatus:
redis_status = _redis_client.get(name=_job_status_key(job_id))
if redis_status is None:
return JobStatus.NOT_STARTED
else:
return JobStatus(redis_status.decode())


def list_job_specs(
jobs_filename: str = "job_manifest.json",
manifest_filename: str = MANIFEST_FILENAME,
) -> Mapping[str, JobSpec]:
return _read_from_path(jobs_filename)
return _read_manifest_from_path(manifest_filename)


def list_job_specs_with_status(
manifest_filename: str = MANIFEST_FILENAME,
) -> Mapping[str, Mapping[str, Union[JobSpec, JobStatus]]]:
specs = list_job_specs(manifest_filename)
job_ids = specs.keys()
statuses = _get_job_status_multi([_job_status_key(job_id) for job_id in job_ids])
return {
job_id: {"spec": specs[job_id], "status": statuses[i]}
for i, job_id in enumerate(job_ids)
}


def run_job(job_spec: JobSpec, dry_run: bool) -> JobStatus:
current_job_status = get_job_status(job_spec.job_id)
if current_job_status is not None and current_job_status != JobStatus.NOT_STARTED:
raise JobStatusException(job_id=job_spec.job_id, status=current_job_status)

have_lock = _acquire_job_lock(job_spec.job_id)
if not have_lock:
raise JobLockedException(job_spec.job_id)

current_job_status = _set_job_status(job_spec.job_id, JobStatus.NOT_STARTED)

job_to_run = _JobLoader.get_job_instance(job_spec, dry_run)

try:
if not dry_run:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.RUNNING)
job_to_run.execute()
if not dry_run:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.FINISHED)
except BaseException:
if not dry_run:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.FAILED)
capture_exception()
finally:
_release_job_lock(job_spec.job_id)

return current_job_status
4 changes: 4 additions & 0 deletions snuba/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class RedisClientKey(Enum):
DLQ = "dlq"
OPTIMIZE = "optimize"
ADMIN_AUTH = "admin_auth"
MANUAL_JOBS = "manual_jobs"


_redis_clients: Mapping[RedisClientKey, RedisClientType] = {
Expand Down Expand Up @@ -141,6 +142,9 @@ class RedisClientKey(Enum):
RedisClientKey.ADMIN_AUTH: _initialize_specialized_redis_cluster(
settings.REDIS_CLUSTERS["admin_auth"]
),
RedisClientKey.MANUAL_JOBS: _initialize_specialized_redis_cluster(
settings.REDIS_CLUSTERS["manual_jobs"]
),
}


Expand Down
2 changes: 2 additions & 0 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class RedisClusters(TypedDict):
dlq: RedisClusterConfig | None
optimize: RedisClusterConfig | None
admin_auth: RedisClusterConfig | None
manual_jobs: RedisClusterConfig | None


REDIS_CLUSTERS: RedisClusters = {
Expand All @@ -186,6 +187,7 @@ class RedisClusters(TypedDict):
"dlq": None,
"optimize": None,
"admin_auth": None,
"manual_jobs": None,
}

# Query Recording Options
Expand Down
1 change: 1 addition & 0 deletions snuba/settings/settings_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
(7, "dlq"),
(8, "optimize"),
(9, "admin_auth"),
(10, "manual_jobs"),
]
}
VALIDATE_DATASET_YAMLS_ON_STARTUP = True
19 changes: 18 additions & 1 deletion tests/cli/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from click.testing import CliRunner

from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest
from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest, status


def test_cmd_line_valid() -> None:
Expand Down Expand Up @@ -69,3 +70,19 @@ def test_json_valid() -> None:
],
)
assert result.exit_code == 0


@pytest.mark.redis_db
def test_jobs_status() -> None:
runner = CliRunner()
runner.invoke(
run_from_manifest,
[
"--json_manifest",
"job_manifest.json",
"--job_id",
"abc1234",
],
)
result = runner.invoke(status, ["--job_id", "abc1234"])
assert result.exit_code == 0
Loading
Loading