Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job-runner): surface job status to snuba admin #6361

Merged
merged 22 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions snuba/admin/static/manual_jobs/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ function ViewCustomJobs(props: { api: Client }) {
}, []);

function jobSpecsAsRows() {
return Object.entries(jobSpecs).map(([_, spec]) => {
return Object.entries(jobSpecs).map(([_, job_info]) => {
return [
spec.job_id,
spec.job_type,
JSON.stringify(spec.params),
"TODO",
job_info.spec.job_id,
job_info.spec.job_type,
JSON.stringify(job_info.spec.params),
job_info.status,
"TODO",
];
});
Expand Down
5 changes: 4 additions & 1 deletion snuba/admin/static/manual_jobs/types.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ type JobSpec = {
};

type JobSpecMap = {
[key: string]: JobSpec;
[key: string]: {
spec: JobSpec;
status: string;
};
};
4 changes: 2 additions & 2 deletions snuba/admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
get_writable_storage,
)
from snuba.datasets.storages.storage_key import StorageKey
from snuba.manual_jobs.runner import list_job_specs
from snuba.manual_jobs.runner import list_job_specs_with_status
from snuba.migrations.connect import check_for_inactive_replicas
from snuba.migrations.errors import InactiveClickhouseReplica, MigrationError
from snuba.migrations.groups import MigrationGroup, get_group_readiness_state
Expand Down Expand Up @@ -1263,7 +1263,7 @@ def deletes_enabled() -> Response:
@application.route("/job-specs", methods=["GET"])
@check_tool_perms(tools=[AdminTools.MANUAL_JOBS])
def get_job_specs() -> Response:
return make_response(jsonify(list_job_specs()), 200)
return make_response(jsonify(list_job_specs_with_status()), 200)


@application.route("/clickhouse_node_info")
Expand Down
31 changes: 28 additions & 3 deletions snuba/manual_jobs/runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from enum import StrEnum
from typing import Any, Mapping, Optional
from typing import Any, Mapping, Sequence, Union

import simplejson
from sentry_sdk import capture_exception
Expand Down Expand Up @@ -89,9 +89,20 @@ def _set_job_status(job_id: str, status: JobStatus) -> JobStatus:
return status


def get_job_status(job_id: str) -> Optional[JobStatus]:
def _get_job_status_multi(job_ids: Sequence[str]) -> Sequence[JobStatus]:
return [
redis_status.decode() if redis_status is not None else JobStatus.NOT_STARTED
for redis_status in _redis_client.mget(job_ids)
]


def get_job_status(job_id: str) -> JobStatus:
redis_status = _redis_client.get(name=_build_job_status_key(job_id))
return JobStatus(redis_status.decode("utf-8")) if redis_status else redis_status
return (
JobStatus(redis_status.decode("utf-8"))
if redis_status
else JobStatus.NOT_STARTED
)


def list_job_specs(
Expand All @@ -100,6 +111,20 @@ def list_job_specs(
return _read_manifest_from_path(manifest_filename)


def list_job_specs_with_status(
manifest_filename: str = MANIFEST_FILENAME,
) -> Mapping[str, Mapping[str, Union[JobSpec, JobStatus]]]:
specs = list_job_specs(manifest_filename)
job_ids = specs.keys()
statuses = _get_job_status_multi(
[_build_job_status_key(job_id) for job_id in job_ids]
)
return {
job_id: {"spec": specs[job_id], "status": statuses[i]}
for i, job_id in enumerate(job_ids)
}


def run_job(job_spec: JobSpec, dry_run: bool) -> JobStatus:
current_job_status = get_job_status(job_spec.job_id)
if current_job_status is not None and current_job_status != JobStatus.NOT_STARTED:
Expand Down
8 changes: 4 additions & 4 deletions tests/manual_jobs/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def execute(self) -> None:

@pytest.mark.redis_db
def test_job_status_changes_to_finished() -> None:
assert get_job_status(JOB_ID) is None
assert get_job_status(JOB_ID) == JobStatus.NOT_STARTED
run_job(test_job_spec, False)
assert get_job_status(JOB_ID) == JobStatus.FINISHED
with pytest.raises(JobStatusException):
Expand All @@ -50,7 +50,7 @@ def test_job_status_changes_to_finished() -> None:
def test_job_with_exception_causes_failure() -> None:
with patch.object(_JobLoader, "get_job_instance") as MockGetInstance:
MockGetInstance.return_value = FailJob()
assert get_job_status(JOB_ID) is None
assert get_job_status(JOB_ID) == JobStatus.NOT_STARTED
run_job(test_job_spec, False)
assert get_job_status(JOB_ID) == JobStatus.FAILED

Expand All @@ -60,7 +60,7 @@ def test_slow_job_stay_running() -> None:
with patch.object(_JobLoader, "get_job_instance") as MockGetInstance:
job = SlowJob()
MockGetInstance.return_value = job
assert get_job_status(JOB_ID) is None
assert get_job_status(JOB_ID) == JobStatus.NOT_STARTED
t = Thread(
target=run_job, name="slow-background-job", args=[test_job_spec, False]
)
Expand All @@ -72,7 +72,7 @@ def test_slow_job_stay_running() -> None:

@pytest.mark.redis_db
def test_job_status_with_invalid_job_id() -> None:
assert get_job_status("invalid_job_id") is None
assert get_job_status("invalid_job_id") == JobStatus.NOT_STARTED


@pytest.mark.redis_db
Expand Down
Loading