Skip to content

Commit

Permalink
Add support for Kueue scheduling options to RayJob runner (#24)
Browse files Browse the repository at this point in the history
* feat(rayjob): Support Kueue scheduling options in RayJob runner

* chore: Separate out `util` module into package
  • Loading branch information
AdrianoKF authored Apr 30, 2024
1 parent 214cbce commit de1a4cc
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 126 deletions.
4 changes: 3 additions & 1 deletion src/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from jobs.assembler.renderers import RENDERERS
from jobs.image import Image
from jobs.types import AnyPath, K8sResourceKind
from jobs.util import remove_none_values, run_command, to_rational
from jobs.utils.helpers import remove_none_values
from jobs.utils.math import to_rational
from jobs.utils.processes import run_command


class BuildMode(enum.Enum):
Expand Down
2 changes: 1 addition & 1 deletion src/jobs/runner/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from jobs import Image, Job
from jobs.job import DockerResourceOptions
from jobs.runner.base import Runner, _make_executor_command
from jobs.util import remove_none_values
from jobs.utils.helpers import remove_none_values


class DockerRunner(Runner):
Expand Down
60 changes: 5 additions & 55 deletions src/jobs/runner/kueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
from jobs import Image, Job
from jobs.runner.base import Runner, _make_executor_command
from jobs.types import K8sResourceKind
from jobs.util import (
KubernetesNamespaceMixin,
remove_none_values,
sanitize_rfc1123_domain_name,
)
from jobs.utils.kubernetes import KubernetesNamespaceMixin, sanitize_rfc1123_domain_name
from jobs.utils.kueue import kueue_scheduling_labels


class KueueRunner(Runner, KubernetesNamespaceMixin):
Expand All @@ -19,61 +16,14 @@ def __init__(self, **kwargs: str) -> None:
self._queue = kwargs.get("local_queue", "user-queue")

def _make_job_crd(self, job: Job, image: Image, namespace: str) -> client.V1Job:
def _assert_kueue_localqueue(name: str) -> bool:
try:
_ = client.CustomObjectsApi().get_namespaced_custom_object(
"kueue.x-k8s.io",
"v1beta1",
namespace,
"localqueues",
name,
)
return True
except client.exceptions.ApiException:
return False

def _assert_kueue_workloadpriorityclass(name: str) -> bool:
try:
_ = client.CustomObjectsApi().get_cluster_custom_object(
"kueue.x-k8s.io",
"v1beta1",
"workloadpriorityclasses",
name,
)
return True
except client.exceptions.ApiException:
return False

if not job.options:
raise ValueError("Job options must be specified")

sched_opts = job.options.scheduling
if sched_opts:
if queue := sched_opts.queue_name:
if not _assert_kueue_localqueue(queue):
raise ValueError(
f"Specified Kueue local queue does not exist: {queue!r}"
)
if pc := sched_opts.priority_class:
if not _assert_kueue_workloadpriorityclass(pc):
raise ValueError(
f"Specified Kueue workload priority class does not exist: {pc!r}"
)
scheduling_labels = kueue_scheduling_labels(job, self.namespace)

metadata = client.V1ObjectMeta(
generate_name=sanitize_rfc1123_domain_name(job.name),
labels=remove_none_values(
{
"kueue.x-k8s.io/queue-name": (
sched_opts.queue_name
if sched_opts and sched_opts.queue_name
else None
),
"kueue.x-k8s.io/priority-class": (
sched_opts.priority_class if sched_opts else None
),
}
),
labels=scheduling_labels,
)

# Job container
Expand All @@ -87,7 +37,7 @@ def _assert_kueue_workloadpriorityclass(name: str) -> bool:
"requests": res.to_kubernetes(kind=K8sResourceKind.REQUESTS),
"limits": res.to_kubernetes(kind=K8sResourceKind.LIMITS),
}
if job.options and (res := job.options.resources)
if (res := job.options.resources)
else None
),
)
Expand Down
9 changes: 6 additions & 3 deletions src/jobs/runner/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from jobs.job import RayResourceOptions
from jobs.runner.base import Runner, _make_executor_command
from jobs.types import K8sResourceKind, NoOptions
from jobs.util import KubernetesNamespaceMixin, sanitize_rfc1123_domain_name
from jobs.utils.kubernetes import KubernetesNamespaceMixin, sanitize_rfc1123_domain_name
from jobs.utils.kueue import kueue_scheduling_labels


class RayClusterRunner(Runner):
Expand Down Expand Up @@ -93,8 +94,7 @@ class RayJobRunner(Runner, KubernetesNamespaceMixin):
def __init__(self, **kwargs):
super().__init__(**kwargs)

@staticmethod
def _create_ray_job(job: Job, image: Image) -> dict:
def _create_ray_job(self, job: Job, image: Image) -> dict:
"""Create a ``RayJob`` Kubernetes resource for the Kuberay operator."""

if job.options is None:
Expand All @@ -104,6 +104,8 @@ def _create_ray_job(job: Job, image: Image) -> dict:
if not res_opts:
raise ValueError("Job resource options must be set")

scheduling_labels = kueue_scheduling_labels(job, self.namespace)

runtime_env = {
"working_dir": "/home/ray/app",
}
Expand All @@ -115,6 +117,7 @@ def _create_ray_job(job: Job, image: Image) -> dict:
"kind": "RayJob",
"metadata": {
"name": sanitize_rfc1123_domain_name(job_id),
"labels": scheduling_labels,
},
"spec": {
"jobId": job_id,
Expand Down
Empty file added src/jobs/utils/__init__.py
Empty file.
11 changes: 11 additions & 0 deletions src/jobs/utils/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from __future__ import annotations

from typing import Any, Mapping, TypeVar, cast

T = TypeVar("T", bound=Mapping[str, Any])


def remove_none_values(d: T) -> T:
"""Remove all keys with a ``None`` value from a dict."""
filtered_dict = {k: v for k, v in d.items() if v is not None}
return cast(T, filtered_dict)
26 changes: 26 additions & 0 deletions src/jobs/utils/kubernetes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

import kubernetes


def sanitize_rfc1123_domain_name(s: str) -> str:
"""Sanitize a string to be compliant with RFC 1123 domain name
Note: Any invalid characters are replaced with dashes."""

# TODO: This is obviously wildly incomplete
return s.replace("_", "-")


class KubernetesNamespaceMixin:
"""Determine the desired or current Kubernetes namespace."""

def __init__(self, **kwargs):
kubernetes.config.load_config()
self._namespace: str | None = kwargs.get("namespace")

@property
def namespace(self) -> str:
_, active_context = kubernetes.config.list_kube_config_contexts()
current_namespace = active_context["context"].get("namespace")
return self._namespace or current_namespace
67 changes: 67 additions & 0 deletions src/jobs/utils/kueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Mapping, cast

from kubernetes import client

from jobs.job import Job
from jobs.utils.helpers import remove_none_values


def assert_kueue_localqueue(namespace: str, name: str) -> bool:
"""Check the existence of a Kueue `LocalQueue` in a namespace."""
try:
_ = client.CustomObjectsApi().get_namespaced_custom_object(
"kueue.x-k8s.io",
"v1beta1",
namespace,
"localqueues",
name,
)
return True
except client.exceptions.ApiException:
return False


def assert_kueue_workloadpriorityclass(name: str) -> bool:
"""Check the existence of a Kueue `WorkloadPriorityClass` in the cluster."""
try:
_ = client.CustomObjectsApi().get_cluster_custom_object(
"kueue.x-k8s.io",
"v1beta1",
"workloadpriorityclasses",
name,
)
return True
except client.exceptions.ApiException:
return False


def kueue_scheduling_labels(job: Job, namespace: str) -> Mapping[str, str]:
"""Determine the Kubernetes labels controlling Kueue features such as queues and priority for a job."""

if not job.options:
return {}
if not (sched_opts := job.options.scheduling):
return {}

if queue := sched_opts.queue_name:
if not assert_kueue_localqueue(namespace, queue):
raise ValueError(f"Specified Kueue local queue does not exist: {queue!r}")
if pc := sched_opts.priority_class:
if not assert_kueue_workloadpriorityclass(pc):
raise ValueError(
f"Specified Kueue workload priority class does not exist: {pc!r}"
)

return cast(
Mapping[str, str],
remove_none_values(
{
"kueue.x-k8s.io/queue-name": (
sched_opts.queue_name if sched_opts else None
),
"kueue.x-k8s.io/priority-class": (
sched_opts.priority_class if sched_opts else None
),
}
),
)
33 changes: 33 additions & 0 deletions src/jobs/utils/math.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import annotations

import re


def to_rational(s: str) -> float:
"""Convert a number with optional SI/binary unit to floating-point"""

matches = re.match(r"(?P<magnitude>[+\-]?\d*[.,]?\d+)(?P<suffix>[a-zA-Z]*)", s)
if not matches:
raise ValueError(f"Could not parse {s}")
magnitude = float(matches.group("magnitude"))
suffix = matches.group("suffix")

factor = {
# SI / Metric
"m": 1e-3,
"k": 1e3,
"M": 1e6,
"G": 1e9,
"T": 1e12,
# Binary
"Ki": 2**10,
"Mi": 2**20,
"Gi": 2**30,
"Ti": 2**40,
# default
"": 1.0,
}.get(suffix)
if factor is None:
raise ValueError(f"unknown unit suffix: {suffix}")

return factor * magnitude
66 changes: 1 addition & 65 deletions src/jobs/util.py → src/jobs/utils/processes.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,15 @@
from __future__ import annotations

import re
import shlex
import subprocess
import sys
import threading
import time
from io import TextIOBase
from typing import Any, Iterable, Mapping, TextIO, TypeVar, cast

import kubernetes
from typing import Iterable, Mapping, TextIO

from jobs.types import AnyPath

T = TypeVar("T", bound=Mapping[str, Any])


def to_rational(s: str) -> float:
"""Convert a number with optional SI/binary unit to floating-point"""

matches = re.match(r"(?P<magnitude>[+\-]?\d*[.,]?\d+)(?P<suffix>[a-zA-Z]*)", s)
if not matches:
raise ValueError(f"Could not parse {s}")
magnitude = float(matches.group("magnitude"))
suffix = matches.group("suffix")

factor = {
# SI / Metric
"m": 1e-3,
"k": 1e3,
"M": 1e6,
"G": 1e9,
"T": 1e12,
# Binary
"Ki": 2**10,
"Mi": 2**20,
"Gi": 2**30,
"Ti": 2**40,
# default
"": 1.0,
}.get(suffix)
if factor is None:
raise ValueError(f"unknown unit suffix: {suffix}")

return factor * magnitude


def remove_none_values(d: T) -> T:
"""Remove all keys with a ``None`` value from a dict."""
filtered_dict = {k: v for k, v in d.items() if v is not None}
return cast(T, filtered_dict)


def sanitize_rfc1123_domain_name(s: str) -> str:
"""Sanitize a string to be compliant with RFC 1123 domain name
Note: Any invalid characters are replaced with dashes."""

# TODO: This is obviously wildly incomplete
return s.replace("_", "-")


def run_command(
command: str,
Expand Down Expand Up @@ -157,17 +107,3 @@ def _reader(
read_stderr.join()

return process.returncode, stdout, stderr, output


class KubernetesNamespaceMixin:
"""Determine the desired or current Kubernetes namespace."""

def __init__(self, **kwargs):
kubernetes.config.load_config()
self._namespace: str | None = kwargs.get("namespace")

@property
def namespace(self) -> str:
_, active_context = kubernetes.config.list_kube_config_contexts()
current_namespace = active_context["context"].get("namespace")
return self._namespace or current_namespace
3 changes: 2 additions & 1 deletion tests/unit/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest

from jobs.util import remove_none_values, to_rational
from jobs.utils.helpers import remove_none_values
from jobs.utils.math import to_rational


@pytest.mark.parametrize(
Expand Down

0 comments on commit de1a4cc

Please sign in to comment.