diff --git a/_internal/build_model_spec_metadata.py b/_internal/build_model_spec_metadata.py new file mode 100644 index 00000000..853a015c --- /dev/null +++ b/_internal/build_model_spec_metadata.py @@ -0,0 +1,21 @@ +from nos import hub +from nos.common import TaskType +from nos.common.spec import ModelResources, ModelSpec, ModelSpecMetadata, _metadata_path +from nos.logging import logger + + +spec: ModelSpec = hub.load_spec(task=TaskType.IMAGE_EMBEDDING, model_name="openai/clip") +spec._metadata = ModelSpecMetadata( + name=spec.name, + task=spec.task, + resources={ + "cpu": ModelResources(runtime="cpu", device="cpu", device_memory=2 * 1024**3, cpus=2), + "gpu": ModelResources(runtime="gpu", device="cuda", device_memory=2 * 1024**3, cpus=1), + }, +) + +path = _metadata_path(spec) +if not path.exists(): + path.parent.mkdir(parents=True, exist_ok=True) + spec._metadata.to_json(path) + logger.info(f"Saved metadata to {path}") diff --git a/nos/common/spec.py b/nos/common/spec.py index b28d93c6..55b9df8c 100644 --- a/nos/common/spec.py +++ b/nos/common/spec.py @@ -1,6 +1,8 @@ import copy import inspect -from dataclasses import field +import json +from dataclasses import asdict, field +from functools import cached_property from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, get_args, get_origin from pydantic import validator @@ -10,6 +12,8 @@ from nos.common.exceptions import NosInputValidationException from nos.common.tasks import TaskType from nos.common.types import Batch, EmbeddingSpec, ImageSpec, ImageT, TensorSpec, TensorT # noqa: F401 +from nos.constants import NOS_MODELS_DIR +from nos.logging import logger from nos.protoc import import_module @@ -135,7 +139,16 @@ class FunctionSignature: def __repr__(self) -> str: """Return the function signature representation.""" - return f"FunctionSignature(inputs={self.inputs}, outputs={self.outputs}, func_or_cls={self.func_or_cls}, init_args={self.init_args}, init_kwargs={self.init_kwargs}, method_name={self.method_name})" + inputs_str = "\n".join([f"{k}={v}" for k, v in self.inputs.items()]) + outputs_str = "\n".join([f"{k}={v}" for k, v in self.outputs.items()]) + return ( + f"""FunctionSignature\n""" + f"""\tfunc_or_cls={self.func_or_cls}\n""" + f"""\tinit_args={self.init_args}, init_kwargs={self.init_kwargs}\n""" + f"""\tmethod_name={self.method_name}\n""" + f"""\tinputs={inputs_str}\n""" + f"""\toutputs={outputs_str}\n""" + ) @staticmethod def validate(inputs: Dict[str, Any], sig: Dict[str, FunctionSignatureType]) -> Dict[str, Any]: @@ -189,6 +202,119 @@ def from_packages(cls, packages: List[str]) -> Dict[str, Any]: return cls(conda={"dependencies": ["pip", {"pip": packages}]}) +@dataclass +class ModelResources: + """Model resources (device/host memory etc).""" + + runtime: str = "cpu" + """Runtime type (cpu, gpu, trt-runtime, etc). + See `nos.server._runtime.InferenceServiceRuntime` for the list of supported runtimes. + """ + device: str = "cpu" + """Device type (cpu, cuda, mps, neuron, etc).""" + device_memory: Union[int, str] = field(default=512 * 1024**2) + """Device memory (defaults to 512 MB).""" + cpus: float = 0 + """Number of CPUs (defaults to 0 CPUs).""" + memory: Union[int, str] = field(default=256 * 1024**2) + """Host memory (defaults to 256 MB)""" + + @validator("runtime") + def _validate_runtime(cls, runtime: str) -> str: + """Validate the runtime.""" + from nos.server._runtime import InferenceServiceRuntime + + # Check if runtime is subset of supported runtimes. + if runtime not in InferenceServiceRuntime.configs.keys(): + raise ValueError(f"Invalid runtime, runtime={runtime}.") + return runtime + + @validator("device") + def _validate_device(cls, device: str) -> str: + """Validate the device.""" + from nos.server._runtime import NOS_SUPPORTED_DEVICES + + if device not in NOS_SUPPORTED_DEVICES: + err_msg = f"Invalid device provided, device={device}. Use one of {NOS_SUPPORTED_DEVICES}." + logger.error(err_msg) + raise ValueError(err_msg) + return device + + @validator("device_memory") + def _validate_device_memory(cls, device_memory: Union[int, str]) -> int: + """Validate the device memory.""" + if isinstance(device_memory, str): + raise NotImplementedError() + + if device_memory < 256 * 1024**2 or device_memory > 128 * 1024**3: + err_msg = f"Invalid device memory provided, device_memory={device_memory / 1024**2} MB. Provide a value between 256 MB and 128 GB." + logger.error(err_msg) + raise ValueError(err_msg) + return device_memory + + @validator("cpus") + def _validate_cpus(cls, cpus: Union[float, str]) -> float: + """Validate the number of CPUs.""" + if isinstance(cpus, str): + raise NotImplementedError() + + if cpus < 0.0 or cpus > 128.0: + err_msg = f"Invalid number of CPUs provided, cpus={cpus}. Provide a value between 0 and 128." + logger.error(err_msg) + raise ValueError(err_msg) + return cpus + + @validator("memory") + def _validate_memory(cls, memory: Union[int, str]) -> int: + """Validate the host memory.""" + if isinstance(memory, str): + raise NotImplementedError() + + if memory < 256 * 1024**2 or memory > 128 * 1024**3: + err_msg = f"Invalid device memory provided, memory={memory / 1024**2} MB. Provide a value between 256 MB and 128 GB." + logger.error(err_msg) + raise ValueError(err_msg) + return memory + + +@dataclass +class ModelSpecMetadata: + """Model specification metadata. + + The metadata contains the model profiles, metrics, etc. + """ + + name: str + """Model identifier.""" + task: TaskType + """Task type (e.g. image_embedding, image_generation, object_detection_2d, etc).""" + resources: Dict[str, ModelResources] = field(default_factory=dict) + """Model resource limits (device/host memory, etc).""" + """Key is the runtime type (cpu, gpu, trt-runtime, etc).""" + + def __repr__(self) -> str: + return f"""ModelSpecMetadata(name={self.name}, task={self.task}, """ f"""resources={self.resources})""" + + def to_json(self, filename: str) -> Dict[str, Any]: + """Convert the model spec to json.""" + specd = asdict(self) + with open(filename, "w") as f: + json.dump(specd, f, indent=4) + return specd + + @classmethod + def from_json(cls, filename: str) -> "ModelSpecMetadata": + """Convert the model spec from json.""" + with open(filename, "r") as f: + specd = json.load(f) + return cls(**specd) + + +def _metadata_path(spec: "ModelSpec") -> str: + """Return the metadata path for a model.""" + return NOS_MODELS_DIR / f"metadata/{spec.id}/metadata.json" + + @dataclass class ModelSpec: """Model specification for the registry. @@ -205,6 +331,29 @@ class ModelSpec: """Model function signature.""" runtime_env: RuntimeEnv = None """Runtime environment with custom packages.""" + _metadata: ModelSpecMetadata = field(init=False, default=None) + """Model specification metadata. The contents of the metadata (profiles, metrics, etc) + are specified in a separate file.""" + + class Config: + """Custom configuration to enable private attributes.""" + + underscore_attrs_are_private: bool = True + + def __repr__(self): + return f"""ModelSpec(name={self.name}, task={self.task})""" f"""\n {self.signature}""" + + @cached_property + def metadata(self) -> ModelSpecMetadata: + try: + path = _metadata_path(self) + if not path.exists(): + raise FileNotFoundError(f"Model metadata not found. [path={path}]") + metadata = ModelSpecMetadata.from_json(str(path)) + logger.info(f"Loaded model metadata [name={self.name}, path={path}, metadata={metadata}]") + except Exception: + metadata = None + return metadata @staticmethod def get_id(model_name: str, task: TaskType = None) -> str: diff --git a/nos/common/tasks.py b/nos/common/tasks.py index 57244d91..09a632be 100644 --- a/nos/common/tasks.py +++ b/nos/common/tasks.py @@ -1,7 +1,7 @@ from enum import Enum -class TaskType(Enum): +class TaskType(str, Enum): """Task types.""" OBJECT_DETECTION_2D = "object_detection_2d" diff --git a/nos/managers/model.py b/nos/managers/model.py index a0860f85..5fb05691 100644 --- a/nos/managers/model.py +++ b/nos/managers/model.py @@ -62,6 +62,12 @@ class ModelHandle: # Fetch the next result from the queue >> response = model.get() + # Submit a task to the model handle, + # this will add results to the queue + >> model_handle.submit(**model_inputs) + # Fetch the next result from the queue + >> response = model_handle.get() + # Cleanup model resources >> model_handle.cleanup() ``` @@ -77,7 +83,6 @@ class ModelHandle: """Ray actor pool.""" _results_queue: ModelResultQueue = field(init=False, default_factory=ModelResultQueue) """Queue to fetch results from the actor pool.""" - _actor_profile: Dict[str, Any] = field(init=False, default=None) """Actor profile.""" @@ -113,13 +118,11 @@ def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle": logger.debug(f"Waiting for pending futures to complete before scaling [name={self.spec.name}].") while self._actor_pool.has_next(): self._fetch_next() - logger.debug(f"Removing actor pool [replicas={len(self._actors)}].") - del self._actor_pool - self._actor_pool = None logger.debug(f"Scaling model [name={self.spec.name}].") if replicas == len(self._actors): logger.debug(f"Model already scaled appropriately [name={self.spec.name}, replicas={replicas}].") + return self elif replicas > len(self._actors): self._actors += [self.get_actor(self.spec) for _ in range(replicas - len(self._actors))] logger.debug(f"Scaling up model [name={self.spec.name}, replicas={replicas}].") @@ -135,6 +138,11 @@ def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle": self.num_replicas = replicas self._results_queue.resize(self.num_replicas * 2) + # Re-create the actor pool + logger.debug(f"Removing actor pool [replicas={len(self._actors)}].") + del self._actor_pool + self._actor_pool = None + # Re-create the actor pool logger.debug(f"Re-creating actor pool [name={self.spec.name}, replicas={replicas}].") self._actor_pool = ray.util.ActorPool(self._actors) @@ -173,7 +181,6 @@ def get_actor(cls, spec: ModelSpec) -> Union[ray.remote, ray.actor.ActorHandle]: # Get the actor options from the model spec actor_options = cls._actor_options(spec) - actor_cls = ray.remote(**actor_options)(model_cls) # Check if the model class has the required method @@ -332,8 +339,8 @@ def __contains__(self, spec: ModelSpec) -> bool: """ return spec.id in self.handlers - def get(self, model_spec: ModelSpec) -> ModelHandle: - """Get a model handle from the manager. + def load(self, model_spec: ModelSpec) -> ModelHandle: + """Load a model handle from the manager using the model specification. Create a new model handle if it does not exist, else return an existing handle. diff --git a/nos/server/_runtime.py b/nos/server/_runtime.py index 8b2068ce..83dfe4d4 100644 --- a/nos/server/_runtime.py +++ b/nos/server/_runtime.py @@ -34,6 +34,8 @@ NOS_SUPPORTED_DEVICES = ("cpu", "cuda", "mps", "neuron") +NOS_SUPPORTED_DEVICES = ("cpu", "cuda", "mps", "neuron") + @dataclass class InferenceServiceRuntimeConfig: diff --git a/nos/server/_service.py b/nos/server/_service.py index b8bb64b8..bd66d8e5 100644 --- a/nos/server/_service.py +++ b/nos/server/_service.py @@ -94,7 +94,7 @@ def execute(self, model_name: str, task: TaskType = None, inputs: Dict[str, Any] # Initialize the model (if not already initialized) # This call should also evict models and garbage collect if # too many models are loaded are loaded simultaneously. - model_handle: ModelHandle = self.model_manager.get(model_spec) + model_handle: ModelHandle = self.model_manager.load(model_spec) # Get the model handle and call it remotely (with model spec, actor handle) st = time.perf_counter() diff --git a/tests/managers/test_model_manager.py b/tests/managers/test_model_manager.py index 9bc01638..f2f1a850 100644 --- a/tests/managers/test_model_manager.py +++ b/tests/managers/test_model_manager.py @@ -31,9 +31,9 @@ def test_model_manager(manager): # noqa: F811 # This should not raise any OOM errors as models are evicted # from the manager's cache. for idx, spec in enumerate(hub.list()): - # Note: `manager.get()` is a wrapper around `manager.add()` + # Note: `manager.load()` is a wrapper around `manager.add()` # and creates a single replica of the model. - handler: ModelHandle = manager.get(spec) + handler: ModelHandle = manager.load(spec) assert handler is not None assert isinstance(handler, ModelHandle) @@ -46,7 +46,7 @@ def test_model_manager(manager): # noqa: F811 # Test noop with model manager spec = hub.load_spec("noop/process-images", task=TaskType.CUSTOM) - noop: ModelHandle = manager.get(spec) + noop: ModelHandle = manager.load(spec) assert noop is not None assert isinstance(noop, ModelHandle) @@ -131,7 +131,7 @@ def __call__(self, images: Union[np.ndarray, List[np.ndarray]], n: int = 1) -> n # Check if the model can be loaded with the ModelManager # Note: This will be executed as a Ray actor within a custom runtime env. - model_handle = manager.get(spec) + model_handle = manager.load(spec) assert model_handle is not None assert isinstance(model_handle, ModelHandle) @@ -159,7 +159,7 @@ def test_model_manager_noop_inference(manager): # noqa: F811 from nos.common import tqdm spec = hub.load_spec("noop/process-images", task=TaskType.CUSTOM) - noop: ModelHandle = manager.get(spec) + noop: ModelHandle = manager.load(spec) assert noop is not None assert isinstance(noop, ModelHandle) @@ -203,7 +203,7 @@ def noop_gen(_noop, _pbar, B): assert len(result) == B logger.debug(f"NoOp ({replicas}): {noop}") - pbar = tqdm(duration=10, unit_scale=B, desc=f"noop async [B={B}, replicas={noop.num_replicas}]", total=0) + pbar = tqdm(duration=5, unit_scale=B, desc=f"noop async [B={B}, replicas={noop.num_replicas}]", total=0) # warmup: submit() for result in noop_gen(noop, tqdm(duration=1, disable=True), B): @@ -262,7 +262,7 @@ def test_model_manager_inference(manager): # noqa: F811 spec = hub.load_spec(model_name, task=task) # Add the model to the manager (or via `manager.add()`) - model: ModelHandle = manager.get(spec) + model: ModelHandle = manager.load(spec) assert model is not None # Benchmark: for each image-shape and batch-size diff --git a/tests/server/test_inference_service.py b/tests/server/test_inference_service.py index 2118f287..575738be 100644 --- a/tests/server/test_inference_service.py +++ b/tests/server/test_inference_service.py @@ -44,7 +44,7 @@ def test_model_manager(ray_executor: RayExecutor): # noqa: F811 # This should not raise any OOM errors as models are evicted # from the manager's cache. for idx, spec in enumerate(hub.list()): - handler: ModelHandle = manager.get(spec) + handler: ModelHandle = manager.load(spec) assert handler is not None assert isinstance(handler, ModelHandle) @@ -64,7 +64,7 @@ def test_model_manager_inference(ray_executor: RayExecutor): # noqa: F811 spec = hub.load_spec("openai/clip-vit-base-patch32", task=TaskType.IMAGE_EMBEDDING) # Add the model to the manager (or via `manager.add()`) - handle: ModelHandle = manager.get(spec) + handle: ModelHandle = manager.load(spec) assert handle is not None img = Image.open(NOS_TEST_IMAGE)