Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job-runner): enable snuba jobs command to run from JSON manifest #6320

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions snuba/cli/jobs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from typing import Tuple
from typing import Any, MutableMapping, Tuple

import click

from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.job_loader import JobLoader
from snuba.manual_jobs.manifest_reader import _ManifestReader

JOB_SPECIFICATION_ERROR_MSG = "Missing job type and/or job id"


@click.group()
Expand All @@ -11,18 +15,37 @@ def jobs() -> None:


@jobs.command()
@click.argument("job_name")
@click.option("--json_manifest", required=True)
@click.option("--job_id")
@click.option(
"--dry_run",
default=True,
)
@click.argument("pairs", nargs=-1)
def run(*, job_name: str, dry_run: bool, pairs: Tuple[str, ...]) -> None:
def run_from_manifest(*, json_manifest: str, job_id: str, dry_run: bool) -> None:
job_specs = _ManifestReader.read(json_manifest)
if job_id not in job_specs.keys():
raise click.ClickException("Provide a valid job id")

job_to_run = JobLoader.get_job_instance(job_specs[job_id], dry_run)
job_to_run.execute()


def _parse_params(pairs: Tuple[str, ...]) -> MutableMapping[Any, Any]:
return {k: v for k, v in (pair.split("=") for pair in pairs)}


kwargs = {}
for pair in pairs:
k, v = pair.split("=")
kwargs[k] = v
@jobs.command()
@click.option("--job_type")
@click.option("--job_id")
@click.option(
"--dry_run",
default=True,
)
@click.argument("pairs", nargs=-1)
def run(*, job_type: str, job_id: str, dry_run: bool, pairs: Tuple[str, ...]) -> None:
if not job_type or not job_id:
raise click.ClickException(JOB_SPECIFICATION_ERROR_MSG)
job_spec = JobSpec(job_id=job_id, job_type=job_type, params=_parse_params(pairs))

job_to_run = JobLoader.get_job_instance(job_name, dry_run, **kwargs)
job_to_run = JobLoader.get_job_instance(job_spec, dry_run)
job_to_run.execute()
21 changes: 17 additions & 4 deletions snuba/manual_jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
import logging
import os
from abc import ABC, abstractmethod
from typing import Any, cast
from dataclasses import dataclass
from typing import Any, MutableMapping, Optional, cast

from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory

logger = logging.getLogger("snuba_init")


@dataclass
class JobSpec:
job_id: str
job_type: str
params: Optional[MutableMapping[Any, Any]]


class Job(ABC, metaclass=RegisteredClass):
def __init__(self, dry_run: bool, **kwargs: Any) -> None:
def __init__(self, job_spec: JobSpec, dry_run: bool) -> None:
self.job_spec = job_spec
self.dry_run = dry_run
for k, v in kwargs.items():
setattr(self, k, v)
if job_spec.params:
for k, v in job_spec.params.items():
setattr(self, k, v)

@abstractmethod
def execute(self) -> None:
Expand Down
20 changes: 12 additions & 8 deletions snuba/manual_jobs/job_loader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from typing import Any, cast
from typing import cast

from snuba.manual_jobs import Job
from snuba.manual_jobs import Job, JobSpec
from snuba.utils.serializable_exception import SerializableException


class NonexistentJobException(SerializableException):
def __init__(self, job_type: str):
super().__init__(f"Job does not exist. Did you make a file {job_type}.py yet?")


class JobLoader:
@staticmethod
def get_job_instance(class_name: str, dry_run: bool, **kwargs: Any) -> "Job":
job_type_class = Job.class_from_name(class_name)
def get_job_instance(job_spec: JobSpec, dry_run: bool) -> "Job":
job_type_class = Job.class_from_name(job_spec.job_type)
if job_type_class is None:
raise Exception(
f"Job does not exist. Did you make a file {class_name}.py yet?"
)
raise NonexistentJobException(job_spec.job_type)

return cast("Job", job_type_class(dry_run=dry_run, **kwargs))
return cast("Job", job_type_class(job_spec, dry_run=dry_run))
26 changes: 21 additions & 5 deletions snuba/manual_jobs/manifest_reader.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
import os
from typing import Any, Sequence
from typing import Mapping

import simplejson

from snuba.manual_jobs import JobSpec


class _ManifestReader:
@staticmethod
def read(filename: str) -> Sequence[Any]:
def read(filename: str) -> Mapping[str, JobSpec]:
local_root = os.path.dirname(__file__)

with open(os.path.join(local_root, filename)) as stream:
contents = simplejson.loads(stream.read())
assert isinstance(contents, Sequence)
return contents

job_specs = {}
for content in contents:
job_id = content["id"]
assert isinstance(job_id, str)
job_type = content["job_type"]
assert isinstance(job_type, str)

job_spec = JobSpec(
job_id=job_id,
job_type=job_type,
params=content.get("params"),
)
job_specs[job_id] = job_spec
return job_specs


def read_jobs_manifest() -> Sequence[Any]:
def read_jobs_manifest() -> Mapping[str, JobSpec]:
return _ManifestReader.read("job_manifest.json")
22 changes: 14 additions & 8 deletions snuba/manual_jobs/toy_job.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from typing import Any

import click

from snuba.manual_jobs import Job
from snuba.manual_jobs import Job, JobSpec, logger


class ToyJob(Job):
def __init__(self, dry_run: bool, **kwargs: Any):
super().__init__(dry_run, **kwargs)
def __init__(
self,
job_spec: JobSpec,
dry_run: bool,
):
super().__init__(job_spec, dry_run)

def _build_query(self) -> str:
if self.dry_run:
Expand All @@ -16,4 +16,10 @@ def _build_query(self) -> str:
return "not dry run query"

def execute(self) -> None:
click.echo("executing query `" + self._build_query() + "`")
logger.info(
"executing job "
+ self.job_spec.job_id
+ " with query `"
+ self._build_query()
+ "`"
)
64 changes: 59 additions & 5 deletions tests/cli/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,71 @@
from click.testing import CliRunner

from snuba.cli.jobs import run
from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest


def test_valid_job() -> None:
def test_cmd_line_valid() -> None:
runner = CliRunner()
result = runner.invoke(run, ["ToyJob", "--dry_run", "True", "k1=v1", "k2=v2"])
result = runner.invoke(
run,
["--job_type", "ToyJob", "--job_id", "0001"],
)

assert result.exit_code == 0


def test_invalid_job() -> None:
def test_invalid_job_errors() -> None:
runner = CliRunner()
result = runner.invoke(
run,
[
"--job_type",
"NonexistentJob",
"--job_id",
"0001",
"--dry_run",
"True",
"k1=v1",
"k2=v2",
],
)

assert result.exit_code == 1


def test_cmd_line_no_job_specification_errors() -> None:
runner = CliRunner()
result = runner.invoke(run, ["--dry_run", "True", "k1=v1", "k2=v2"])
assert result.exit_code == 1
assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n"


def test_cmd_line_no_job_id_errors() -> None:
runner = CliRunner()
result = runner.invoke(
run, ["--job_type", "ToyJob", "--dry_run", "True", "k1=v1", "k2=v2"]
)
assert result.exit_code == 1
assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n"


def test_cmd_line_no_job_type_errors() -> None:
runner = CliRunner()
result = runner.invoke(
run, ["SomeJobThatDoesntExist", "--dry_run", "True", "k1=v1", "k2=v2"]
run, ["--job_id", "0001", "--dry_run", "True", "k1=v1", "k2=v2"]
)
assert result.exit_code == 1
assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n"


def test_json_valid() -> None:
runner = CliRunner()
result = runner.invoke(
run_from_manifest,
[
"--json_manifest",
"job_manifest.json",
"--job_id",
"abc1234",
],
)
assert result.exit_code == 0
Loading