Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New model spec metadata format with runtime and model resource information #268

Merged
merged 9 commits into from
Aug 27, 2023
21 changes: 21 additions & 0 deletions _internal/build_model_spec_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from nos import hub
from nos.common import TaskType
from nos.common.spec import ModelResources, ModelSpec, ModelSpecMetadata, _metadata_path
from nos.logging import logger


spec: ModelSpec = hub.load_spec(task=TaskType.IMAGE_EMBEDDING, model_name="openai/clip")
spec._metadata = ModelSpecMetadata(
name=spec.name,
task=spec.task,
resources={
"cpu": ModelResources(runtime="cpu", device="cpu", device_memory=2 * 1024**3, cpus=2),
"gpu": ModelResources(runtime="gpu", device="cuda", device_memory=2 * 1024**3, cpus=1),
},
)

path = _metadata_path(spec)
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
spec._metadata.to_json(path)
logger.info(f"Saved metadata to {path}")
153 changes: 151 additions & 2 deletions nos/common/spec.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import copy
import inspect
from dataclasses import field
import json
from dataclasses import asdict, field
from functools import cached_property
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, get_args, get_origin

from pydantic import validator
Expand All @@ -10,6 +12,8 @@
from nos.common.exceptions import NosInputValidationException
from nos.common.tasks import TaskType
from nos.common.types import Batch, EmbeddingSpec, ImageSpec, ImageT, TensorSpec, TensorT # noqa: F401
from nos.constants import NOS_MODELS_DIR
from nos.logging import logger
from nos.protoc import import_module


Expand Down Expand Up @@ -135,7 +139,16 @@ class FunctionSignature:

def __repr__(self) -> str:
"""Return the function signature representation."""
return f"FunctionSignature(inputs={self.inputs}, outputs={self.outputs}, func_or_cls={self.func_or_cls}, init_args={self.init_args}, init_kwargs={self.init_kwargs}, method_name={self.method_name})"
inputs_str = "\n".join([f"{k}={v}" for k, v in self.inputs.items()])
outputs_str = "\n".join([f"{k}={v}" for k, v in self.outputs.items()])
return (
f"""FunctionSignature\n"""
f"""\tfunc_or_cls={self.func_or_cls}\n"""
f"""\tinit_args={self.init_args}, init_kwargs={self.init_kwargs}\n"""
f"""\tmethod_name={self.method_name}\n"""
f"""\tinputs={inputs_str}\n"""
f"""\toutputs={outputs_str}\n"""
)

@staticmethod
def validate(inputs: Dict[str, Any], sig: Dict[str, FunctionSignatureType]) -> Dict[str, Any]:
Expand Down Expand Up @@ -189,6 +202,119 @@ def from_packages(cls, packages: List[str]) -> Dict[str, Any]:
return cls(conda={"dependencies": ["pip", {"pip": packages}]})


@dataclass
class ModelResources:
"""Model resources (device/host memory etc)."""

runtime: str = "cpu"
"""Runtime type (cpu, gpu, trt-runtime, etc).
See `nos.server._runtime.InferenceServiceRuntime` for the list of supported runtimes.
"""
device: str = "cpu"
"""Device type (cpu, cuda, mps, neuron, etc)."""
device_memory: Union[int, str] = field(default=512 * 1024**2)
"""Device memory (defaults to 512 MB)."""
cpus: float = 0
"""Number of CPUs (defaults to 0 CPUs)."""
memory: Union[int, str] = field(default=256 * 1024**2)
"""Host memory (defaults to 256 MB)"""

@validator("runtime")
def _validate_runtime(cls, runtime: str) -> str:
"""Validate the runtime."""
from nos.server._runtime import InferenceServiceRuntime

# Check if runtime is subset of supported runtimes.
if runtime not in InferenceServiceRuntime.configs.keys():
raise ValueError(f"Invalid runtime, runtime={runtime}.")
return runtime

@validator("device")
def _validate_device(cls, device: str) -> str:
"""Validate the device."""
from nos.server._runtime import NOS_SUPPORTED_DEVICES

if device not in NOS_SUPPORTED_DEVICES:
err_msg = f"Invalid device provided, device={device}. Use one of {NOS_SUPPORTED_DEVICES}."
logger.error(err_msg)
raise ValueError(err_msg)
return device

@validator("device_memory")
def _validate_device_memory(cls, device_memory: Union[int, str]) -> int:
"""Validate the device memory."""
if isinstance(device_memory, str):
raise NotImplementedError()

if device_memory < 256 * 1024**2 or device_memory > 128 * 1024**3:
err_msg = f"Invalid device memory provided, device_memory={device_memory / 1024**2} MB. Provide a value between 256 MB and 128 GB."
logger.error(err_msg)
raise ValueError(err_msg)
return device_memory

@validator("cpus")
def _validate_cpus(cls, cpus: Union[float, str]) -> float:
"""Validate the number of CPUs."""
if isinstance(cpus, str):
raise NotImplementedError()

if cpus < 0.0 or cpus > 128.0:
err_msg = f"Invalid number of CPUs provided, cpus={cpus}. Provide a value between 0 and 128."
logger.error(err_msg)
raise ValueError(err_msg)
return cpus

@validator("memory")
def _validate_memory(cls, memory: Union[int, str]) -> int:
"""Validate the host memory."""
if isinstance(memory, str):
raise NotImplementedError()

if memory < 256 * 1024**2 or memory > 128 * 1024**3:
err_msg = f"Invalid device memory provided, memory={memory / 1024**2} MB. Provide a value between 256 MB and 128 GB."
logger.error(err_msg)
raise ValueError(err_msg)
return memory


@dataclass
class ModelSpecMetadata:
"""Model specification metadata.

The metadata contains the model profiles, metrics, etc.
"""

name: str
"""Model identifier."""
task: TaskType
"""Task type (e.g. image_embedding, image_generation, object_detection_2d, etc)."""
resources: Dict[str, ModelResources] = field(default_factory=dict)
"""Model resource limits (device/host memory, etc)."""
"""Key is the runtime type (cpu, gpu, trt-runtime, etc)."""

def __repr__(self) -> str:
return f"""ModelSpecMetadata(name={self.name}, task={self.task}, """ f"""resources={self.resources})"""

def to_json(self, filename: str) -> Dict[str, Any]:
"""Convert the model spec to json."""
specd = asdict(self)
with open(filename, "w") as f:
json.dump(specd, f, indent=4)
return specd

@classmethod
def from_json(cls, filename: str) -> "ModelSpecMetadata":
"""Convert the model spec from json."""
with open(filename, "r") as f:
specd = json.load(f)
return cls(**specd)


def _metadata_path(spec: "ModelSpec") -> str:
"""Return the metadata path for a model."""
return NOS_MODELS_DIR / f"metadata/{spec.id}/metadata.json"


@dataclass
class ModelSpec:
"""Model specification for the registry.
Expand All @@ -205,6 +331,29 @@ class ModelSpec:
"""Model function signature."""
runtime_env: RuntimeEnv = None
"""Runtime environment with custom packages."""
_metadata: ModelSpecMetadata = field(init=False, default=None)
"""Model specification metadata. The contents of the metadata (profiles, metrics, etc)
are specified in a separate file."""

class Config:
"""Custom configuration to enable private attributes."""

underscore_attrs_are_private: bool = True

def __repr__(self):
return f"""ModelSpec(name={self.name}, task={self.task})""" f"""\n {self.signature}"""

@cached_property
def metadata(self) -> ModelSpecMetadata:
try:
path = _metadata_path(self)
if not path.exists():
raise FileNotFoundError(f"Model metadata not found. [path={path}]")
metadata = ModelSpecMetadata.from_json(str(path))
logger.info(f"Loaded model metadata [name={self.name}, path={path}, metadata={metadata}]")
except Exception:
metadata = None
return metadata

@staticmethod
def get_id(model_name: str, task: TaskType = None) -> str:
Expand Down
2 changes: 1 addition & 1 deletion nos/common/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum


class TaskType(Enum):
class TaskType(str, Enum):
"""Task types."""

OBJECT_DETECTION_2D = "object_detection_2d"
Expand Down
21 changes: 14 additions & 7 deletions nos/managers/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ class ModelHandle:
# Fetch the next result from the queue
>> response = model.get()

# Submit a task to the model handle,
# this will add results to the queue
>> model_handle.submit(**model_inputs)
# Fetch the next result from the queue
>> response = model_handle.get()

# Cleanup model resources
>> model_handle.cleanup()
```
Expand All @@ -77,7 +83,6 @@ class ModelHandle:
"""Ray actor pool."""
_results_queue: ModelResultQueue = field(init=False, default_factory=ModelResultQueue)
"""Queue to fetch results from the actor pool."""

_actor_profile: Dict[str, Any] = field(init=False, default=None)
"""Actor profile."""

Expand Down Expand Up @@ -113,13 +118,11 @@ def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle":
logger.debug(f"Waiting for pending futures to complete before scaling [name={self.spec.name}].")
while self._actor_pool.has_next():
self._fetch_next()
logger.debug(f"Removing actor pool [replicas={len(self._actors)}].")
del self._actor_pool
self._actor_pool = None
logger.debug(f"Scaling model [name={self.spec.name}].")

if replicas == len(self._actors):
logger.debug(f"Model already scaled appropriately [name={self.spec.name}, replicas={replicas}].")
return self
elif replicas > len(self._actors):
self._actors += [self.get_actor(self.spec) for _ in range(replicas - len(self._actors))]
logger.debug(f"Scaling up model [name={self.spec.name}, replicas={replicas}].")
Expand All @@ -135,6 +138,11 @@ def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle":
self.num_replicas = replicas
self._results_queue.resize(self.num_replicas * 2)

# Re-create the actor pool
logger.debug(f"Removing actor pool [replicas={len(self._actors)}].")
del self._actor_pool
self._actor_pool = None

# Re-create the actor pool
logger.debug(f"Re-creating actor pool [name={self.spec.name}, replicas={replicas}].")
self._actor_pool = ray.util.ActorPool(self._actors)
Expand Down Expand Up @@ -173,7 +181,6 @@ def get_actor(cls, spec: ModelSpec) -> Union[ray.remote, ray.actor.ActorHandle]:

# Get the actor options from the model spec
actor_options = cls._actor_options(spec)

actor_cls = ray.remote(**actor_options)(model_cls)

# Check if the model class has the required method
Expand Down Expand Up @@ -332,8 +339,8 @@ def __contains__(self, spec: ModelSpec) -> bool:
"""
return spec.id in self.handlers

def get(self, model_spec: ModelSpec) -> ModelHandle:
"""Get a model handle from the manager.
def load(self, model_spec: ModelSpec) -> ModelHandle:
"""Load a model handle from the manager using the model specification.

Create a new model handle if it does not exist,
else return an existing handle.
Expand Down
2 changes: 2 additions & 0 deletions nos/server/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

NOS_SUPPORTED_DEVICES = ("cpu", "cuda", "mps", "neuron")

NOS_SUPPORTED_DEVICES = ("cpu", "cuda", "mps", "neuron")


@dataclass
class InferenceServiceRuntimeConfig:
Expand Down
2 changes: 1 addition & 1 deletion nos/server/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def execute(self, model_name: str, task: TaskType = None, inputs: Dict[str, Any]
# Initialize the model (if not already initialized)
# This call should also evict models and garbage collect if
# too many models are loaded are loaded simultaneously.
model_handle: ModelHandle = self.model_manager.get(model_spec)
model_handle: ModelHandle = self.model_manager.load(model_spec)

# Get the model handle and call it remotely (with model spec, actor handle)
st = time.perf_counter()
Expand Down
14 changes: 7 additions & 7 deletions tests/managers/test_model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ def test_model_manager(manager): # noqa: F811
# This should not raise any OOM errors as models are evicted
# from the manager's cache.
for idx, spec in enumerate(hub.list()):
# Note: `manager.get()` is a wrapper around `manager.add()`
# Note: `manager.load()` is a wrapper around `manager.add()`
# and creates a single replica of the model.
handler: ModelHandle = manager.get(spec)
handler: ModelHandle = manager.load(spec)
assert handler is not None
assert isinstance(handler, ModelHandle)

Expand All @@ -46,7 +46,7 @@ def test_model_manager(manager): # noqa: F811

# Test noop with model manager
spec = hub.load_spec("noop/process-images", task=TaskType.CUSTOM)
noop: ModelHandle = manager.get(spec)
noop: ModelHandle = manager.load(spec)
assert noop is not None
assert isinstance(noop, ModelHandle)

Expand Down Expand Up @@ -131,7 +131,7 @@ def __call__(self, images: Union[np.ndarray, List[np.ndarray]], n: int = 1) -> n

# Check if the model can be loaded with the ModelManager
# Note: This will be executed as a Ray actor within a custom runtime env.
model_handle = manager.get(spec)
model_handle = manager.load(spec)
assert model_handle is not None
assert isinstance(model_handle, ModelHandle)

Expand Down Expand Up @@ -159,7 +159,7 @@ def test_model_manager_noop_inference(manager): # noqa: F811
from nos.common import tqdm

spec = hub.load_spec("noop/process-images", task=TaskType.CUSTOM)
noop: ModelHandle = manager.get(spec)
noop: ModelHandle = manager.load(spec)
assert noop is not None
assert isinstance(noop, ModelHandle)

Expand Down Expand Up @@ -203,7 +203,7 @@ def noop_gen(_noop, _pbar, B):
assert len(result) == B

logger.debug(f"NoOp ({replicas}): {noop}")
pbar = tqdm(duration=10, unit_scale=B, desc=f"noop async [B={B}, replicas={noop.num_replicas}]", total=0)
pbar = tqdm(duration=5, unit_scale=B, desc=f"noop async [B={B}, replicas={noop.num_replicas}]", total=0)

# warmup: submit()
for result in noop_gen(noop, tqdm(duration=1, disable=True), B):
Expand Down Expand Up @@ -262,7 +262,7 @@ def test_model_manager_inference(manager): # noqa: F811
spec = hub.load_spec(model_name, task=task)

# Add the model to the manager (or via `manager.add()`)
model: ModelHandle = manager.get(spec)
model: ModelHandle = manager.load(spec)
assert model is not None

# Benchmark: for each image-shape and batch-size
Expand Down
4 changes: 2 additions & 2 deletions tests/server/test_inference_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_model_manager(ray_executor: RayExecutor): # noqa: F811
# This should not raise any OOM errors as models are evicted
# from the manager's cache.
for idx, spec in enumerate(hub.list()):
handler: ModelHandle = manager.get(spec)
handler: ModelHandle = manager.load(spec)
assert handler is not None
assert isinstance(handler, ModelHandle)

Expand All @@ -64,7 +64,7 @@ def test_model_manager_inference(ray_executor: RayExecutor): # noqa: F811
spec = hub.load_spec("openai/clip-vit-base-patch32", task=TaskType.IMAGE_EMBEDDING)

# Add the model to the manager (or via `manager.add()`)
handle: ModelHandle = manager.get(spec)
handle: ModelHandle = manager.load(spec)
assert handle is not None

img = Image.open(NOS_TEST_IMAGE)
Expand Down
Loading