Skip to content

Commit

Permalink
feat(job-runner): enable snuba jobs command to run from JSON manifest (
Browse files Browse the repository at this point in the history
…#6320)

#6288

Co-authored-by: Rachel Chen <rachelchen@PL6VFX9HP4.attlocal.net>
  • Loading branch information
xurui-c and Rachel Chen committed Sep 18, 2024
1 parent 8aa02cf commit f030a4d
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 39 deletions.
41 changes: 32 additions & 9 deletions snuba/cli/jobs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from typing import Tuple
from typing import Any, MutableMapping, Tuple

import click

from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.job_loader import JobLoader
from snuba.manual_jobs.manifest_reader import _ManifestReader

JOB_SPECIFICATION_ERROR_MSG = "Missing job type and/or job id"


@click.group()
Expand All @@ -11,18 +15,37 @@ def jobs() -> None:


@jobs.command()
@click.argument("job_name")
@click.option("--json_manifest", required=True)
@click.option("--job_id")
@click.option(
"--dry_run",
default=True,
)
@click.argument("pairs", nargs=-1)
def run(*, job_name: str, dry_run: bool, pairs: Tuple[str, ...]) -> None:
def run_from_manifest(*, json_manifest: str, job_id: str, dry_run: bool) -> None:
job_specs = _ManifestReader.read(json_manifest)
if job_id not in job_specs.keys():
raise click.ClickException("Provide a valid job id")

job_to_run = JobLoader.get_job_instance(job_specs[job_id], dry_run)
job_to_run.execute()


def _parse_params(pairs: Tuple[str, ...]) -> MutableMapping[Any, Any]:
return {k: v for k, v in (pair.split("=") for pair in pairs)}


kwargs = {}
for pair in pairs:
k, v = pair.split("=")
kwargs[k] = v
@jobs.command()
@click.option("--job_type")
@click.option("--job_id")
@click.option(
"--dry_run",
default=True,
)
@click.argument("pairs", nargs=-1)
def run(*, job_type: str, job_id: str, dry_run: bool, pairs: Tuple[str, ...]) -> None:
if not job_type or not job_id:
raise click.ClickException(JOB_SPECIFICATION_ERROR_MSG)
job_spec = JobSpec(job_id=job_id, job_type=job_type, params=_parse_params(pairs))

job_to_run = JobLoader.get_job_instance(job_name, dry_run, **kwargs)
job_to_run = JobLoader.get_job_instance(job_spec, dry_run)
job_to_run.execute()
21 changes: 17 additions & 4 deletions snuba/manual_jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
import logging
import os
from abc import ABC, abstractmethod
from typing import Any, cast
from dataclasses import dataclass
from typing import Any, MutableMapping, Optional, cast

from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory

logger = logging.getLogger("snuba_init")


@dataclass
class JobSpec:
job_id: str
job_type: str
params: Optional[MutableMapping[Any, Any]]


class Job(ABC, metaclass=RegisteredClass):
def __init__(self, dry_run: bool, **kwargs: Any) -> None:
def __init__(self, job_spec: JobSpec, dry_run: bool) -> None:
self.job_spec = job_spec
self.dry_run = dry_run
for k, v in kwargs.items():
setattr(self, k, v)
if job_spec.params:
for k, v in job_spec.params.items():
setattr(self, k, v)

@abstractmethod
def execute(self) -> None:
Expand Down
20 changes: 12 additions & 8 deletions snuba/manual_jobs/job_loader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from typing import Any, cast
from typing import cast

from snuba.manual_jobs import Job
from snuba.manual_jobs import Job, JobSpec
from snuba.utils.serializable_exception import SerializableException


class NonexistentJobException(SerializableException):
def __init__(self, job_type: str):
super().__init__(f"Job does not exist. Did you make a file {job_type}.py yet?")


class JobLoader:
@staticmethod
def get_job_instance(class_name: str, dry_run: bool, **kwargs: Any) -> "Job":
job_type_class = Job.class_from_name(class_name)
def get_job_instance(job_spec: JobSpec, dry_run: bool) -> "Job":
job_type_class = Job.class_from_name(job_spec.job_type)
if job_type_class is None:
raise Exception(
f"Job does not exist. Did you make a file {class_name}.py yet?"
)
raise NonexistentJobException(job_spec.job_type)

return cast("Job", job_type_class(dry_run=dry_run, **kwargs))
return cast("Job", job_type_class(job_spec, dry_run=dry_run))
26 changes: 21 additions & 5 deletions snuba/manual_jobs/manifest_reader.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
import os
from typing import Any, Sequence
from typing import Mapping

import simplejson

from snuba.manual_jobs import JobSpec


class _ManifestReader:
@staticmethod
def read(filename: str) -> Sequence[Any]:
def read(filename: str) -> Mapping[str, JobSpec]:
local_root = os.path.dirname(__file__)

with open(os.path.join(local_root, filename)) as stream:
contents = simplejson.loads(stream.read())
assert isinstance(contents, Sequence)
return contents

job_specs = {}
for content in contents:
job_id = content["id"]
assert isinstance(job_id, str)
job_type = content["job_type"]
assert isinstance(job_type, str)

job_spec = JobSpec(
job_id=job_id,
job_type=job_type,
params=content.get("params"),
)
job_specs[job_id] = job_spec
return job_specs


def read_jobs_manifest() -> Sequence[Any]:
def read_jobs_manifest() -> Mapping[str, JobSpec]:
return _ManifestReader.read("job_manifest.json")
22 changes: 14 additions & 8 deletions snuba/manual_jobs/toy_job.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from typing import Any

import click

from snuba.manual_jobs import Job
from snuba.manual_jobs import Job, JobSpec, logger


class ToyJob(Job):
def __init__(self, dry_run: bool, **kwargs: Any):
super().__init__(dry_run, **kwargs)
def __init__(
self,
job_spec: JobSpec,
dry_run: bool,
):
super().__init__(job_spec, dry_run)

def _build_query(self) -> str:
if self.dry_run:
Expand All @@ -16,4 +16,10 @@ def _build_query(self) -> str:
return "not dry run query"

def execute(self) -> None:
click.echo("executing query `" + self._build_query() + "`")
logger.info(
"executing job "
+ self.job_spec.job_id
+ " with query `"
+ self._build_query()
+ "`"
)
64 changes: 59 additions & 5 deletions tests/cli/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,71 @@
from click.testing import CliRunner

from snuba.cli.jobs import run
from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest


def test_valid_job() -> None:
def test_cmd_line_valid() -> None:
runner = CliRunner()
result = runner.invoke(run, ["ToyJob", "--dry_run", "True", "k1=v1", "k2=v2"])
result = runner.invoke(
run,
["--job_type", "ToyJob", "--job_id", "0001"],
)

assert result.exit_code == 0


def test_invalid_job() -> None:
def test_invalid_job_errors() -> None:
runner = CliRunner()
result = runner.invoke(
run,
[
"--job_type",
"NonexistentJob",
"--job_id",
"0001",
"--dry_run",
"True",
"k1=v1",
"k2=v2",
],
)

assert result.exit_code == 1


def test_cmd_line_no_job_specification_errors() -> None:
runner = CliRunner()
result = runner.invoke(run, ["--dry_run", "True", "k1=v1", "k2=v2"])
assert result.exit_code == 1
assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n"


def test_cmd_line_no_job_id_errors() -> None:
runner = CliRunner()
result = runner.invoke(
run, ["--job_type", "ToyJob", "--dry_run", "True", "k1=v1", "k2=v2"]
)
assert result.exit_code == 1
assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n"


def test_cmd_line_no_job_type_errors() -> None:
runner = CliRunner()
result = runner.invoke(
run, ["SomeJobThatDoesntExist", "--dry_run", "True", "k1=v1", "k2=v2"]
run, ["--job_id", "0001", "--dry_run", "True", "k1=v1", "k2=v2"]
)
assert result.exit_code == 1
assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n"


def test_json_valid() -> None:
runner = CliRunner()
result = runner.invoke(
run_from_manifest,
[
"--json_manifest",
"job_manifest.json",
"--job_id",
"abc1234",
],
)
assert result.exit_code == 0

0 comments on commit f030a4d

Please sign in to comment.