From 8bcbf56ac14333df4b3b50820bd7d0c92d92cc9b Mon Sep 17 00:00:00 2001 From: Sudeep Pillai Date: Tue, 18 Jul 2023 19:52:21 -0700 Subject: [PATCH 1/9] Improved `ModelHandle` with support for multiple replicas and task execution - rename `remote()` to `__call__()` to keep API consistent with models - added tests for batched execution with new `ModelHandle.submit()` API - added tests for `ModelHandle.scale()` with multiple replicas --- nos/managers/model.py | 70 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 58 insertions(+), 12 deletions(-) diff --git a/nos/managers/model.py b/nos/managers/model.py index a0860f85..4538e004 100644 --- a/nos/managers/model.py +++ b/nos/managers/model.py @@ -54,13 +54,16 @@ class ModelHandle: >> model = ModelHandle(spec, num_replicas=1) # Call the task immediately - >> response = model(**model_inputs) + >> response = model_handle(**model_inputs) + + # Submit a task to the model handle + >> response_ref = model_handle.submit(**model_inputs) # Submit a task to the model handle, # this will add results to the queue - >> model.submit(**model_inputs) + >> model_handle.submit(**model_inputs) # Fetch the next result from the queue - >> response = model.get() + >> response = model_handle.get() # Cleanup model resources >> model_handle.cleanup() @@ -77,7 +80,8 @@ class ModelHandle: """Ray actor pool.""" _results_queue: ModelResultQueue = field(init=False, default_factory=ModelResultQueue) """Queue to fetch results from the actor pool.""" - + _results_queue_size: int = field(init=False, default=None) + """Maximum results queue size.""" _actor_profile: Dict[str, Any] = field(init=False, default=None) """Actor profile.""" @@ -85,13 +89,13 @@ def __post_init__(self): """Initialize the actor handles.""" self._actors = [self.get_actor(self.spec) for _ in range(self.num_replicas)] self._actor_pool = ray.util.ActorPool(self._actors) - self._results_queue.resize(self.num_replicas * 2) + self._results_queue_size = 2 * self.num_replicas def __repr__(self) -> str: assert len(self._actors) == self.num_replicas opts = self._actor_options(self.spec) opts_str = ", ".join([f"{k}={v}" for k, v in opts.items()]) - return f"ModelHandle(name={self.spec.name}, replicas={len(self._actors)}, qsize={self.results._size}, opts=({opts_str}))" + return f"ModelHandle(name={self.spec.name}, replicas={len(self._actors)}, qsize={self._results_queue_size}, opts=({opts_str}))" def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle": """Scale the model handle to a new number of replicas. @@ -120,23 +124,27 @@ def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle": if replicas == len(self._actors): logger.debug(f"Model already scaled appropriately [name={self.spec.name}, replicas={replicas}].") + return self elif replicas > len(self._actors): self._actors += [self.get_actor(self.spec) for _ in range(replicas - len(self._actors))] logger.debug(f"Scaling up model [name={self.spec.name}, replicas={replicas}].") else: actors_to_remove = self._actors[replicas:] for actor in actors_to_remove: - ray.kill(actor) + ray.kill(actor.actor) self._actors = self._actors[:replicas] logger.debug(f"Scaling down model [name={self.spec.name}, replicas={replicas}].") # Update repicas and queue size self.num_replicas = replicas - self._results_queue.resize(self.num_replicas * 2) # Re-create the actor pool logger.debug(f"Re-creating actor pool [name={self.spec.name}, replicas={replicas}].") + self._results_queue_size = 2 * self.num_replicas + + # Re-create the actor pool + del self._actor_pool self._actor_pool = ray.util.ActorPool(self._actors) assert len(self._actors) == replicas, "Model scaling failed." gc.collect() @@ -213,12 +221,50 @@ def __call__(self, *args, **kwargs) -> Any: Returns: Model response. """ - assert len(self._actors) >= 1, "Model should have atleast one replica." if self.num_replicas > 1: logger.warning("Model has >1 replicas, use `.submit()` instead to fully utilize them.") - actor_method_func = getattr(self._actors[0], self.spec.signature.method_name) - response_ref: ray.ObjectRef = actor_method_func.remote(**kwargs) - return ray.get(response_ref) + self.submit(*args, **kwargs) + self._fetch_next() + return self.get() + + def submit(self, *args, **kwargs) -> None: + """Submit a task to the actor pool. + + Args: + *args: Model arguments. + **kwargs: Model keyword arguments. + """ + assert not len(self._actor_pool._pending_submits), "Pending submits should be empty." + + # Submit the task to the actor pool, leveraging all replicas + self._actor_pool.submit(lambda a, v: getattr(a, self.spec.signature.method_name).remote(**v), kwargs) + + # If there are pending submissions due to the actor pool being fully utilized, + # fetch the next result from the actor pool and put it in the queue. + if len(self._actor_pool._pending_submits): + self._fetch_next() + assert not len(self._actor_pool._pending_submits), "Pending submits should be empty." + + def has_next(self) -> bool: + """Check if the handle has a result in the queue.""" + return self._actor_pool.has_next() or len(self._results_queue) + + def get(self) -> Any: + """Get the next result from the actor pool queue or by the object reference.""" + if not len(self._results_queue): + self._results_queue.put(self._actor_pool.get_next()) + return self._results_queue.get() + + def full(self) -> bool: + """Check if the results queue is full.""" + return len(self._results_queue) >= self._results_queue_size + + def _fetch_next(self) -> None: + """Fetch results from the actor pool.""" + self._results_queue.put(self._actor_pool.get_next()) + if len(self._results_queue) > self._results_queue_size: + logger.warning("Results queue full, dropping results. Use `.get()` to get results.") + self._results_queue.get() def submit(self, *args, **kwargs) -> str: """Submit a task to the actor pool. From 68fd398017e8ed09d80d03f64f16348599d96a36 Mon Sep 17 00:00:00 2001 From: Sudeep Pillai Date: Wed, 19 Jul 2023 13:25:57 -0700 Subject: [PATCH 2/9] Updated model manager tests for various image-based tasks --- nos/managers/model.py | 14 ++++++++------ nos/server/_runtime.py | 2 ++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/nos/managers/model.py b/nos/managers/model.py index 4538e004..86764979 100644 --- a/nos/managers/model.py +++ b/nos/managers/model.py @@ -54,10 +54,13 @@ class ModelHandle: >> model = ModelHandle(spec, num_replicas=1) # Call the task immediately - >> response = model_handle(**model_inputs) + >> response = model(**model_inputs) - # Submit a task to the model handle - >> response_ref = model_handle.submit(**model_inputs) + # Submit a task to the model handle, + # this will add results to the queue + >> model.submit(**model_inputs) + # Fetch the next result from the queue + >> response = model.get() # Submit a task to the model handle, # this will add results to the queue @@ -89,7 +92,7 @@ def __post_init__(self): """Initialize the actor handles.""" self._actors = [self.get_actor(self.spec) for _ in range(self.num_replicas)] self._actor_pool = ray.util.ActorPool(self._actors) - self._results_queue_size = 2 * self.num_replicas + self._results_queue_size = self.num_replicas def __repr__(self) -> str: assert len(self._actors) == self.num_replicas @@ -131,7 +134,7 @@ def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle": else: actors_to_remove = self._actors[replicas:] for actor in actors_to_remove: - ray.kill(actor.actor) + ray.kill(actor) self._actors = self._actors[:replicas] logger.debug(f"Scaling down model [name={self.spec.name}, replicas={replicas}].") @@ -181,7 +184,6 @@ def get_actor(cls, spec: ModelSpec) -> Union[ray.remote, ray.actor.ActorHandle]: # Get the actor options from the model spec actor_options = cls._actor_options(spec) - actor_cls = ray.remote(**actor_options)(model_cls) # Check if the model class has the required method diff --git a/nos/server/_runtime.py b/nos/server/_runtime.py index 8b2068ce..83dfe4d4 100644 --- a/nos/server/_runtime.py +++ b/nos/server/_runtime.py @@ -34,6 +34,8 @@ NOS_SUPPORTED_DEVICES = ("cpu", "cuda", "mps", "neuron") +NOS_SUPPORTED_DEVICES = ("cpu", "cuda", "mps", "neuron") + @dataclass class InferenceServiceRuntimeConfig: From 00602d1e8acbb5210845ebc68a12ec922c25a063 Mon Sep 17 00:00:00 2001 From: Sudeep Pillai Date: Wed, 19 Jul 2023 13:42:33 -0700 Subject: [PATCH 3/9] `scale()` handles pending futures gracefully, and warns user --- nos/managers/model.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/nos/managers/model.py b/nos/managers/model.py index 86764979..634ad445 100644 --- a/nos/managers/model.py +++ b/nos/managers/model.py @@ -111,6 +111,19 @@ def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle": raise NotImplementedError("Automatic scaling not implemented.") if not isinstance(replicas, int): raise ValueError(f"Invalid replicas: {replicas}") + + # Check if there are any pending submits + # on the actor pool, and wait until they are complete / added + # to the results queue. + if self._actor_pool.has_next() or len(self._actor_pool._pending_submits): + logger.warning(f"Pending futures detected, this may result in dropped queue items [name={self.spec.name}]") + logger.debug(f"Waiting for pending futures to complete before scaling [name={self.spec.name}].") + while self._actor_pool.has_next(): + self._fetch_next() + logger.debug(f"Removing actor pool [replicas={len(self._actors)}].") + del self._actor_pool + self._actor_pool = None + logger.debug(f"Scaling model [name={self.spec.name}].") # Check if there are any pending submits # on the actor pool, and wait until they are complete / added @@ -147,7 +160,7 @@ def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle": self._results_queue_size = 2 * self.num_replicas # Re-create the actor pool - del self._actor_pool + logger.debug(f"Re-creating actor pool [name={self.spec.name}, replicas={replicas}].") self._actor_pool = ray.util.ActorPool(self._actors) assert len(self._actors) == replicas, "Model scaling failed." gc.collect() From 25ce4ff86c9e0400554a7313b53a36611c64b9de Mon Sep 17 00:00:00 2001 From: Sudeep Pillai Date: Wed, 19 Jul 2023 14:11:19 -0700 Subject: [PATCH 4/9] Fix `test_model_manager_errors` that does not raise error any more with `num_replicas>1` --- nos/managers/model.py | 4 ++-- tests/managers/test_model_manager.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nos/managers/model.py b/nos/managers/model.py index 634ad445..1f09d376 100644 --- a/nos/managers/model.py +++ b/nos/managers/model.py @@ -111,10 +111,10 @@ def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle": raise NotImplementedError("Automatic scaling not implemented.") if not isinstance(replicas, int): raise ValueError(f"Invalid replicas: {replicas}") - + # Check if there are any pending submits # on the actor pool, and wait until they are complete / added - # to the results queue. + # to the results queue. if self._actor_pool.has_next() or len(self._actor_pool._pending_submits): logger.warning(f"Pending futures detected, this may result in dropped queue items [name={self.spec.name}]") logger.debug(f"Waiting for pending futures to complete before scaling [name={self.spec.name}].") diff --git a/tests/managers/test_model_manager.py b/tests/managers/test_model_manager.py index 9bc01638..bd058801 100644 --- a/tests/managers/test_model_manager.py +++ b/tests/managers/test_model_manager.py @@ -203,7 +203,7 @@ def noop_gen(_noop, _pbar, B): assert len(result) == B logger.debug(f"NoOp ({replicas}): {noop}") - pbar = tqdm(duration=10, unit_scale=B, desc=f"noop async [B={B}, replicas={noop.num_replicas}]", total=0) + pbar = tqdm(duration=5, unit_scale=B, desc=f"noop async [B={B}, replicas={noop.num_replicas}]", total=0) # warmup: submit() for result in noop_gen(noop, tqdm(duration=1, disable=True), B): From e334e83c9adb7b837fd6838bb3b20e9b03f22ccb Mon Sep 17 00:00:00 2001 From: Sudeep Pillai Date: Thu, 20 Jul 2023 15:49:07 -0700 Subject: [PATCH 5/9] Load nos model specification from profile --- nos/common/spec.py | 90 ++++++++++++++++++++++++++++ nos/managers/model.py | 2 + tests/managers/test_model_manager.py | 2 + 3 files changed, 94 insertions(+) diff --git a/nos/common/spec.py b/nos/common/spec.py index b28d93c6..0c6b9e61 100644 --- a/nos/common/spec.py +++ b/nos/common/spec.py @@ -2,10 +2,14 @@ import inspect from dataclasses import field from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, get_args, get_origin +from functools import cached_property +import json from pydantic import validator from pydantic.dataclasses import dataclass +from nos.constants import NOS_PATH +from nos.logging import logger from nos.common.cloudpickle import dumps, loads from nos.common.exceptions import NosInputValidationException from nos.common.tasks import TaskType @@ -189,6 +193,73 @@ def from_packages(cls, packages: List[str]) -> Dict[str, Any]: return cls(conda={"dependencies": ["pip", {"pip": packages}]}) +@dataclass +class ModelResources: + """Model resources (device/host memory etc).""" + device: str = "cpu" + """Device type (cpu, cuda, mps, neuron, etc).""" + device_mem: int = 32 * 1024**2 + """Device memory (defaults to 32 MB).""" + num_cpus: float = 0 + """Number of CPUs (defaults to 0 CPUs).""" + + @validator("device") + def _validate_device(cls, device: str) -> str: + """Validate the device.""" + from nos.server._runtime import NOS_SUPPORTED_DEVICES + + if device not in NOS_SUPPORTED_DEVICES: + err_msg = f"Invalid device provided, device={device}. Use one of {NOS_SUPPORTED_DEVICES}." + logger.error(err_msg) + raise ValueError(err_msg) + return device + + @validator("device_mem") + def _validate_device_mem(cls, device_mem: int) -> int: + """Validate the device memory.""" + if device_mem <= 32 * 1024**2 or device_mem > 128 * 1024**3: + err_msg = f"Invalid device memory provided, device_mem={device_mem / 1024**2} MB. Provide a value between 32 MB and 128 GB." + logger.error(err_msg) + raise ValueError(err_msg) + return device_mem + + @validator("num_cpus") + def _validate_num_cpus(cls, num_cpus: float) -> float: + """Validate the number of CPUs.""" + if num_cpus < 0. or num_cpus > 128.: + err_msg = f"Invalid number of CPUs provided, num_cpus={num_cpus}. Provide a value between 0 and 128." + logger.error(err_msg) + raise ValueError(err_msg) + return num_cpus + + +@dataclass +class ModelSpecMetadata: + """Model specification metadata. + + The metadata contains the model profiles, metrics, etc. + """ + name: str + """Model identifier.""" + task: TaskType + """Task type (e.g. image_embedding, image_generation, object_detection_2d, etc).""" + runtime: str + """Runtime (e.g. cpu, gpu, trt, etc). + See `nos.server._runtime.InferenceServiceRuntime` for the list of supported runtimes. + """ + resources: ModelResources + """Model resources (device/host memory, etc).""" + + @validator("runtime") + def _validate_runtime(cls, runtime: str) -> str: + """Validate the runtime.""" + from nos.server._runtime import InferenceServiceRuntime + + if runtime not in InferenceServiceRuntime.configs: + raise ValueError(f"Invalid runtime, runtime={runtime}.") + return runtime + + @dataclass class ModelSpec: """Model specification for the registry. @@ -205,6 +276,25 @@ class ModelSpec: """Model function signature.""" runtime_env: RuntimeEnv = None """Runtime environment with custom packages.""" + _metadata: ModelSpecMetadata = None + """Model specification metadata. The contents of the metadata (profiles, metrics, etc) + are specified in a separate file.""" + + @cached_property + def metadata(self) -> ModelSpecMetadata: + try: + path = NOS_PATH / f"data/models/{self.id}/metadata.json" + with open(str(path), "r") as f: + metadata = ModelSpecMetadata(**json.load(f)) + logger.info(f"Loaded model metadata [name={self.name}, path={path}, metadata={metadata}]") + except FileNotFoundError: + logger.warning(f"Model metadata not found. [path={path}]") + metadata = None + return metadata + + @property + def task(self) -> TaskType: + return self.metadata.task @staticmethod def get_id(model_name: str, task: TaskType = None) -> str: diff --git a/nos/managers/model.py b/nos/managers/model.py index 1f09d376..23ec1fb3 100644 --- a/nos/managers/model.py +++ b/nos/managers/model.py @@ -14,6 +14,7 @@ from ray.util.queue import Queue from nos.common import ModelSpec +from nos.common.profiler import profile_memory from nos.logging import logger @@ -90,6 +91,7 @@ class ModelHandle: def __post_init__(self): """Initialize the actor handles.""" + import pdb; pdb.set_trace() self._actors = [self.get_actor(self.spec) for _ in range(self.num_replicas)] self._actor_pool = ray.util.ActorPool(self._actors) self._results_queue_size = self.num_replicas diff --git a/tests/managers/test_model_manager.py b/tests/managers/test_model_manager.py index bd058801..80151226 100644 --- a/tests/managers/test_model_manager.py +++ b/tests/managers/test_model_manager.py @@ -230,6 +230,7 @@ def noop_gen(_noop, _pbar, B): "torchvision/fasterrcnn_mobilenet_v3_large_320_fpn", [(640, 480), (1280, 960), (2880, 1620)], ), + ] BENCHMARK_WARMUP_SEC = 2 BENCHMARK_DURATION_SEC = 10 @@ -248,6 +249,7 @@ def test_model_manager_inference(manager): # noqa: F811 pd.set_option("display.max_columns", 1000) from PIL import Image + from itertools import product from nos.common import timer, tqdm from nos.constants import NOS_CACHE_DIR From e2d078575bd608f8b6f51ded1770fe36f6a16760 Mon Sep 17 00:00:00 2001 From: Sudeep Pillai Date: Tue, 25 Jul 2023 09:00:04 +0100 Subject: [PATCH 6/9] Improved model spec metadata loading for profiled models --- nos/common/spec.py | 92 ++++++++++++++++++++-------- nos/data/spec.py | 14 +++++ nos/managers/model.py | 5 +- tests/managers/test_model_manager.py | 1 - 4 files changed, 82 insertions(+), 30 deletions(-) create mode 100644 nos/data/spec.py diff --git a/nos/common/spec.py b/nos/common/spec.py index 0c6b9e61..5e3bc407 100644 --- a/nos/common/spec.py +++ b/nos/common/spec.py @@ -5,15 +5,19 @@ from functools import cached_property import json +from dataclasses import field +from functools import cached_property +from typing import Any, Callable, Dict, List, MutableSet, Optional, Tuple, Type, Union, get_args, get_origin + from pydantic import validator from pydantic.dataclasses import dataclass -from nos.constants import NOS_PATH -from nos.logging import logger from nos.common.cloudpickle import dumps, loads from nos.common.exceptions import NosInputValidationException from nos.common.tasks import TaskType from nos.common.types import Batch, EmbeddingSpec, ImageSpec, ImageT, TensorSpec, TensorT # noqa: F401 +from nos.constants import NOS_PATH +from nos.logging import logger from nos.protoc import import_module @@ -139,7 +143,16 @@ class FunctionSignature: def __repr__(self) -> str: """Return the function signature representation.""" - return f"FunctionSignature(inputs={self.inputs}, outputs={self.outputs}, func_or_cls={self.func_or_cls}, init_args={self.init_args}, init_kwargs={self.init_kwargs}, method_name={self.method_name})" + inputs_str = "\n".join([f"{k}={v}" for k, v in self.inputs.items()]) + outputs_str = "\n".join([f"{k}={v}" for k, v in self.outputs.items()]) + return ( + f"""FunctionSignature\n""" + f"""\tfunc_or_cls={self.func_or_cls}\n""" + f"""\tinit_args={self.init_args}, init_kwargs={self.init_kwargs}\n""" + f"""\tmethod_name={self.method_name}\n""" + f"""\tinputs={inputs_str}\n""" + f"""\toutputs={outputs_str}\n""" + ) @staticmethod def validate(inputs: Dict[str, Any], sig: Dict[str, FunctionSignatureType]) -> Dict[str, Any]: @@ -196,12 +209,15 @@ def from_packages(cls, packages: List[str]) -> Dict[str, Any]: @dataclass class ModelResources: """Model resources (device/host memory etc).""" + device: str = "cpu" """Device type (cpu, cuda, mps, neuron, etc).""" - device_mem: int = 32 * 1024**2 - """Device memory (defaults to 32 MB).""" - num_cpus: float = 0 + device_memory: Union[int, str] = field(default=512 * 1024**2) + """Device memory (defaults to 512 MB).""" + cpus: float = 0 """Number of CPUs (defaults to 0 CPUs).""" + memory: Union[int, str] = field(default=32 * 1024**2) + """Host memory (defaults to 32 MB)""" @validator("device") def _validate_device(cls, device: str) -> str: @@ -213,42 +229,61 @@ def _validate_device(cls, device: str) -> str: logger.error(err_msg) raise ValueError(err_msg) return device - - @validator("device_mem") - def _validate_device_mem(cls, device_mem: int) -> int: + + @validator("device_memory") + def _validate_device_memory(cls, device_memory: Union[int, str]) -> int: """Validate the device memory.""" - if device_mem <= 32 * 1024**2 or device_mem > 128 * 1024**3: - err_msg = f"Invalid device memory provided, device_mem={device_mem / 1024**2} MB. Provide a value between 32 MB and 128 GB." + if isinstance(device_memory, str): + raise NotImplementedError() + + if device_memory <= 32 * 1024**2 or device_memory > 128 * 1024**3: + err_msg = f"Invalid device memory provided, device_memory={device_memory / 1024**2} MB. Provide a value between 32 MB and 128 GB." logger.error(err_msg) raise ValueError(err_msg) - return device_mem - - @validator("num_cpus") - def _validate_num_cpus(cls, num_cpus: float) -> float: + return device_memory + + @validator("cpus") + def _validate_cpus(cls, cpus: Union[float, str]) -> float: """Validate the number of CPUs.""" - if num_cpus < 0. or num_cpus > 128.: - err_msg = f"Invalid number of CPUs provided, num_cpus={num_cpus}. Provide a value between 0 and 128." + if isinstance(cpus, str): + raise NotImplementedError() + + if cpus < 0.0 or cpus > 128.0: + err_msg = f"Invalid number of CPUs provided, cpus={cpus}. Provide a value between 0 and 128." + logger.error(err_msg) + raise ValueError(err_msg) + return cpus + + @validator("memory") + def _validate_memory(cls, memory: Union[int, str]) -> int: + """Validate the host memory.""" + if isinstance(memory, str): + raise NotImplementedError() + + if memory <= 32 * 1024**2 or memory > 128 * 1024**3: + err_msg = f"Invalid device memory provided, memory={memory / 1024**2} MB. Provide a value between 32 MB and 128 GB." logger.error(err_msg) raise ValueError(err_msg) - return num_cpus + return memory @dataclass class ModelSpecMetadata: """Model specification metadata. - + The metadata contains the model profiles, metrics, etc. """ + name: str """Model identifier.""" task: TaskType """Task type (e.g. image_embedding, image_generation, object_detection_2d, etc).""" - runtime: str - """Runtime (e.g. cpu, gpu, trt, etc). + runtime: set + """Runtimes supported (e.g. cpu, gpu, trt, etc). See `nos.server._runtime.InferenceServiceRuntime` for the list of supported runtimes. """ resources: ModelResources - """Model resources (device/host memory, etc).""" + """Model resource limits (device/host memory, etc).""" @validator("runtime") def _validate_runtime(cls, runtime: str) -> str: @@ -258,7 +293,7 @@ def _validate_runtime(cls, runtime: str) -> str: if runtime not in InferenceServiceRuntime.configs: raise ValueError(f"Invalid runtime, runtime={runtime}.") return runtime - + @dataclass class ModelSpec: @@ -277,9 +312,12 @@ class ModelSpec: runtime_env: RuntimeEnv = None """Runtime environment with custom packages.""" _metadata: ModelSpecMetadata = None - """Model specification metadata. The contents of the metadata (profiles, metrics, etc) + """Model specification metadata. The contents of the metadata (profiles, metrics, etc) are specified in a separate file.""" + def __repr__(self): + return f"""ModeSpec(name={self.name}, task={self.task})""" f"""\n {self.signature}""" + @cached_property def metadata(self) -> ModelSpecMetadata: try: @@ -292,9 +330,9 @@ def metadata(self) -> ModelSpecMetadata: metadata = None return metadata - @property - def task(self) -> TaskType: - return self.metadata.task + # @property + # def task(self) -> TaskType: + # return self.metadata.task @staticmethod def get_id(model_name: str, task: TaskType = None) -> str: diff --git a/nos/data/spec.py b/nos/data/spec.py new file mode 100644 index 00000000..65f34e67 --- /dev/null +++ b/nos/data/spec.py @@ -0,0 +1,14 @@ +from nos import hub +from nos.common import TaskType +from nos.common.spec import ModelResources, ModelSpec, ModelSpecMetadata + + +# path = NOS_PATH / f"data/models/{id}/metadata.json" +spec: ModelSpec = hub.load_spec(task=TaskType.IMAGE_EMBEDDING, model_name="openai/clip") +spec._metadata = ModelSpecMetadata( + name=spec.name, + task=spec.task, + runtime={"cpu", "gpu", "trt"}, + resources=ModelResources(device="cuda", device_memory=2 * 1024**3, cpus=1), +) +import pdb; pdb.set_trace() diff --git a/nos/managers/model.py b/nos/managers/model.py index 23ec1fb3..b93ee837 100644 --- a/nos/managers/model.py +++ b/nos/managers/model.py @@ -14,7 +14,6 @@ from ray.util.queue import Queue from nos.common import ModelSpec -from nos.common.profiler import profile_memory from nos.logging import logger @@ -91,7 +90,9 @@ class ModelHandle: def __post_init__(self): """Initialize the actor handles.""" - import pdb; pdb.set_trace() + import pdb + + pdb.set_trace() self._actors = [self.get_actor(self.spec) for _ in range(self.num_replicas)] self._actor_pool = ray.util.ActorPool(self._actors) self._results_queue_size = self.num_replicas diff --git a/tests/managers/test_model_manager.py b/tests/managers/test_model_manager.py index 80151226..eb718d75 100644 --- a/tests/managers/test_model_manager.py +++ b/tests/managers/test_model_manager.py @@ -230,7 +230,6 @@ def noop_gen(_noop, _pbar, B): "torchvision/fasterrcnn_mobilenet_v3_large_320_fpn", [(640, 480), (1280, 960), (2880, 1620)], ), - ] BENCHMARK_WARMUP_SEC = 2 BENCHMARK_DURATION_SEC = 10 From a3f0aacca30283b405bb2000681a7205da48f7cf Mon Sep 17 00:00:00 2001 From: Sudeep Pillai Date: Fri, 28 Jul 2023 03:36:40 -0700 Subject: [PATCH 7/9] New model spec metadata format with runtime and model resource information --- _internal/build_model_spec_metadata.py | 21 +++++++ nos/common/spec.py | 85 +++++++++++++++++--------- nos/common/tasks.py | 2 +- nos/data/spec.py | 14 ----- nos/managers/model.py | 3 - 5 files changed, 79 insertions(+), 46 deletions(-) create mode 100644 _internal/build_model_spec_metadata.py delete mode 100644 nos/data/spec.py diff --git a/_internal/build_model_spec_metadata.py b/_internal/build_model_spec_metadata.py new file mode 100644 index 00000000..853a015c --- /dev/null +++ b/_internal/build_model_spec_metadata.py @@ -0,0 +1,21 @@ +from nos import hub +from nos.common import TaskType +from nos.common.spec import ModelResources, ModelSpec, ModelSpecMetadata, _metadata_path +from nos.logging import logger + + +spec: ModelSpec = hub.load_spec(task=TaskType.IMAGE_EMBEDDING, model_name="openai/clip") +spec._metadata = ModelSpecMetadata( + name=spec.name, + task=spec.task, + resources={ + "cpu": ModelResources(runtime="cpu", device="cpu", device_memory=2 * 1024**3, cpus=2), + "gpu": ModelResources(runtime="gpu", device="cuda", device_memory=2 * 1024**3, cpus=1), + }, +) + +path = _metadata_path(spec) +if not path.exists(): + path.parent.mkdir(parents=True, exist_ok=True) + spec._metadata.to_json(path) + logger.info(f"Saved metadata to {path}") diff --git a/nos/common/spec.py b/nos/common/spec.py index 5e3bc407..eb32ccc6 100644 --- a/nos/common/spec.py +++ b/nos/common/spec.py @@ -5,9 +5,9 @@ from functools import cached_property import json -from dataclasses import field +from dataclasses import asdict, field from functools import cached_property -from typing import Any, Callable, Dict, List, MutableSet, Optional, Tuple, Type, Union, get_args, get_origin +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, get_args, get_origin from pydantic import validator from pydantic.dataclasses import dataclass @@ -16,7 +16,7 @@ from nos.common.exceptions import NosInputValidationException from nos.common.tasks import TaskType from nos.common.types import Batch, EmbeddingSpec, ImageSpec, ImageT, TensorSpec, TensorT # noqa: F401 -from nos.constants import NOS_PATH +from nos.constants import NOS_MODELS_DIR from nos.logging import logger from nos.protoc import import_module @@ -210,14 +210,28 @@ def from_packages(cls, packages: List[str]) -> Dict[str, Any]: class ModelResources: """Model resources (device/host memory etc).""" + runtime: str = "cpu" + """Runtime type (cpu, gpu, trt-runtime, etc). + See `nos.server._runtime.InferenceServiceRuntime` for the list of supported runtimes. + """ device: str = "cpu" """Device type (cpu, cuda, mps, neuron, etc).""" device_memory: Union[int, str] = field(default=512 * 1024**2) """Device memory (defaults to 512 MB).""" cpus: float = 0 """Number of CPUs (defaults to 0 CPUs).""" - memory: Union[int, str] = field(default=32 * 1024**2) - """Host memory (defaults to 32 MB)""" + memory: Union[int, str] = field(default=256 * 1024**2) + """Host memory (defaults to 256 MB)""" + + @validator("runtime") + def _validate_runtime(cls, runtime: str) -> str: + """Validate the runtime.""" + from nos.server._runtime import InferenceServiceRuntime + + # Check if runtime is subset of supported runtimes. + if runtime not in InferenceServiceRuntime.configs.keys(): + raise ValueError(f"Invalid runtime, runtime={runtime}.") + return runtime @validator("device") def _validate_device(cls, device: str) -> str: @@ -236,8 +250,8 @@ def _validate_device_memory(cls, device_memory: Union[int, str]) -> int: if isinstance(device_memory, str): raise NotImplementedError() - if device_memory <= 32 * 1024**2 or device_memory > 128 * 1024**3: - err_msg = f"Invalid device memory provided, device_memory={device_memory / 1024**2} MB. Provide a value between 32 MB and 128 GB." + if device_memory < 256 * 1024**2 or device_memory > 128 * 1024**3: + err_msg = f"Invalid device memory provided, device_memory={device_memory / 1024**2} MB. Provide a value between 256 MB and 128 GB." logger.error(err_msg) raise ValueError(err_msg) return device_memory @@ -260,8 +274,8 @@ def _validate_memory(cls, memory: Union[int, str]) -> int: if isinstance(memory, str): raise NotImplementedError() - if memory <= 32 * 1024**2 or memory > 128 * 1024**3: - err_msg = f"Invalid device memory provided, memory={memory / 1024**2} MB. Provide a value between 32 MB and 128 GB." + if memory < 256 * 1024**2 or memory > 128 * 1024**3: + err_msg = f"Invalid device memory provided, memory={memory / 1024**2} MB. Provide a value between 256 MB and 128 GB." logger.error(err_msg) raise ValueError(err_msg) return memory @@ -278,21 +292,31 @@ class ModelSpecMetadata: """Model identifier.""" task: TaskType """Task type (e.g. image_embedding, image_generation, object_detection_2d, etc).""" - runtime: set - """Runtimes supported (e.g. cpu, gpu, trt, etc). - See `nos.server._runtime.InferenceServiceRuntime` for the list of supported runtimes. - """ - resources: ModelResources + resources: Dict[str, ModelResources] = field(default_factory=dict) """Model resource limits (device/host memory, etc).""" + """Key is the runtime type (cpu, gpu, trt-runtime, etc).""" - @validator("runtime") - def _validate_runtime(cls, runtime: str) -> str: - """Validate the runtime.""" - from nos.server._runtime import InferenceServiceRuntime + def __repr__(self) -> str: + return f"""ModelSpecMetadata(name={self.name}, task={self.task}, """ f"""resources={self.resources})""" - if runtime not in InferenceServiceRuntime.configs: - raise ValueError(f"Invalid runtime, runtime={runtime}.") - return runtime + def to_json(self, filename: str) -> Dict[str, Any]: + """Convert the model spec to json.""" + specd = asdict(self) + with open(filename, "w") as f: + json.dump(specd, f, indent=4) + return specd + + @classmethod + def from_json(cls, filename: str) -> "ModelSpecMetadata": + """Convert the model spec from json.""" + with open(filename, "r") as f: + specd = json.load(f) + return cls(**specd) + + +def _metadata_path(spec: "ModelSpec") -> str: + """Return the metadata path for a model.""" + return NOS_MODELS_DIR / f"metadata/{spec.id}/metadata.json" @dataclass @@ -311,22 +335,27 @@ class ModelSpec: """Model function signature.""" runtime_env: RuntimeEnv = None """Runtime environment with custom packages.""" - _metadata: ModelSpecMetadata = None + _metadata: ModelSpecMetadata = field(init=False, default=None) """Model specification metadata. The contents of the metadata (profiles, metrics, etc) are specified in a separate file.""" + class Config: + """Custom configuration to keep _metadata private for now.""" + + underscore_attrs_are_private = True + def __repr__(self): - return f"""ModeSpec(name={self.name}, task={self.task})""" f"""\n {self.signature}""" + return f"""ModelSpec(name={self.name}, task={self.task})""" f"""\n {self.signature}""" @cached_property def metadata(self) -> ModelSpecMetadata: try: - path = NOS_PATH / f"data/models/{self.id}/metadata.json" - with open(str(path), "r") as f: - metadata = ModelSpecMetadata(**json.load(f)) + path = _metadata_path(self) + if not path.exists(): + raise FileNotFoundError(f"Model metadata not found. [path={path}]") + metadata = ModelSpecMetadata.from_json(str(path)) logger.info(f"Loaded model metadata [name={self.name}, path={path}, metadata={metadata}]") - except FileNotFoundError: - logger.warning(f"Model metadata not found. [path={path}]") + except Exception: metadata = None return metadata diff --git a/nos/common/tasks.py b/nos/common/tasks.py index 57244d91..09a632be 100644 --- a/nos/common/tasks.py +++ b/nos/common/tasks.py @@ -1,7 +1,7 @@ from enum import Enum -class TaskType(Enum): +class TaskType(str, Enum): """Task types.""" OBJECT_DETECTION_2D = "object_detection_2d" diff --git a/nos/data/spec.py b/nos/data/spec.py deleted file mode 100644 index 65f34e67..00000000 --- a/nos/data/spec.py +++ /dev/null @@ -1,14 +0,0 @@ -from nos import hub -from nos.common import TaskType -from nos.common.spec import ModelResources, ModelSpec, ModelSpecMetadata - - -# path = NOS_PATH / f"data/models/{id}/metadata.json" -spec: ModelSpec = hub.load_spec(task=TaskType.IMAGE_EMBEDDING, model_name="openai/clip") -spec._metadata = ModelSpecMetadata( - name=spec.name, - task=spec.task, - runtime={"cpu", "gpu", "trt"}, - resources=ModelResources(device="cuda", device_memory=2 * 1024**3, cpus=1), -) -import pdb; pdb.set_trace() diff --git a/nos/managers/model.py b/nos/managers/model.py index b93ee837..1f09d376 100644 --- a/nos/managers/model.py +++ b/nos/managers/model.py @@ -90,9 +90,6 @@ class ModelHandle: def __post_init__(self): """Initialize the actor handles.""" - import pdb - - pdb.set_trace() self._actors = [self.get_actor(self.spec) for _ in range(self.num_replicas)] self._actor_pool = ray.util.ActorPool(self._actors) self._results_queue_size = self.num_replicas From ed591a2138eee7bc76a54c989da667dc0512278c Mon Sep 17 00:00:00 2001 From: Sudeep Pillai Date: Tue, 22 Aug 2023 14:36:42 -0700 Subject: [PATCH 8/9] Move to `ModelHandle.load(spec)` instead with minor reverts --- nos/common/spec.py | 4 -- nos/managers/model.py | 78 ++++---------------------- nos/server/_service.py | 2 +- tests/managers/test_model_manager.py | 13 ++--- tests/server/test_inference_service.py | 4 +- 5 files changed, 21 insertions(+), 80 deletions(-) diff --git a/nos/common/spec.py b/nos/common/spec.py index eb32ccc6..ff940797 100644 --- a/nos/common/spec.py +++ b/nos/common/spec.py @@ -1,9 +1,5 @@ import copy import inspect -from dataclasses import field -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, get_args, get_origin -from functools import cached_property - import json from dataclasses import asdict, field from functools import cached_property diff --git a/nos/managers/model.py b/nos/managers/model.py index 1f09d376..5fb05691 100644 --- a/nos/managers/model.py +++ b/nos/managers/model.py @@ -83,8 +83,6 @@ class ModelHandle: """Ray actor pool.""" _results_queue: ModelResultQueue = field(init=False, default_factory=ModelResultQueue) """Queue to fetch results from the actor pool.""" - _results_queue_size: int = field(init=False, default=None) - """Maximum results queue size.""" _actor_profile: Dict[str, Any] = field(init=False, default=None) """Actor profile.""" @@ -92,13 +90,13 @@ def __post_init__(self): """Initialize the actor handles.""" self._actors = [self.get_actor(self.spec) for _ in range(self.num_replicas)] self._actor_pool = ray.util.ActorPool(self._actors) - self._results_queue_size = self.num_replicas + self._results_queue.resize(self.num_replicas * 2) def __repr__(self) -> str: assert len(self._actors) == self.num_replicas opts = self._actor_options(self.spec) opts_str = ", ".join([f"{k}={v}" for k, v in opts.items()]) - return f"ModelHandle(name={self.spec.name}, replicas={len(self._actors)}, qsize={self._results_queue_size}, opts=({opts_str}))" + return f"ModelHandle(name={self.spec.name}, replicas={len(self._actors)}, qsize={self.results._size}, opts=({opts_str}))" def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle": """Scale the model handle to a new number of replicas. @@ -120,22 +118,6 @@ def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle": logger.debug(f"Waiting for pending futures to complete before scaling [name={self.spec.name}].") while self._actor_pool.has_next(): self._fetch_next() - logger.debug(f"Removing actor pool [replicas={len(self._actors)}].") - del self._actor_pool - self._actor_pool = None - logger.debug(f"Scaling model [name={self.spec.name}].") - - # Check if there are any pending submits - # on the actor pool, and wait until they are complete / added - # to the results queue. - if self._actor_pool.has_next() or len(self._actor_pool._pending_submits): - logger.warning(f"Pending futures detected, this may result in dropped queue items [name={self.spec.name}]") - logger.debug(f"Waiting for pending futures to complete before scaling [name={self.spec.name}].") - while self._actor_pool.has_next(): - self._fetch_next() - logger.debug(f"Removing actor pool [replicas={len(self._actors)}].") - del self._actor_pool - self._actor_pool = None logger.debug(f"Scaling model [name={self.spec.name}].") if replicas == len(self._actors): @@ -154,10 +136,12 @@ def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle": # Update repicas and queue size self.num_replicas = replicas + self._results_queue.resize(self.num_replicas * 2) # Re-create the actor pool - logger.debug(f"Re-creating actor pool [name={self.spec.name}, replicas={replicas}].") - self._results_queue_size = 2 * self.num_replicas + logger.debug(f"Removing actor pool [replicas={len(self._actors)}].") + del self._actor_pool + self._actor_pool = None # Re-create the actor pool logger.debug(f"Re-creating actor pool [name={self.spec.name}, replicas={replicas}].") @@ -236,50 +220,12 @@ def __call__(self, *args, **kwargs) -> Any: Returns: Model response. """ + assert len(self._actors) >= 1, "Model should have atleast one replica." if self.num_replicas > 1: logger.warning("Model has >1 replicas, use `.submit()` instead to fully utilize them.") - self.submit(*args, **kwargs) - self._fetch_next() - return self.get() - - def submit(self, *args, **kwargs) -> None: - """Submit a task to the actor pool. - - Args: - *args: Model arguments. - **kwargs: Model keyword arguments. - """ - assert not len(self._actor_pool._pending_submits), "Pending submits should be empty." - - # Submit the task to the actor pool, leveraging all replicas - self._actor_pool.submit(lambda a, v: getattr(a, self.spec.signature.method_name).remote(**v), kwargs) - - # If there are pending submissions due to the actor pool being fully utilized, - # fetch the next result from the actor pool and put it in the queue. - if len(self._actor_pool._pending_submits): - self._fetch_next() - assert not len(self._actor_pool._pending_submits), "Pending submits should be empty." - - def has_next(self) -> bool: - """Check if the handle has a result in the queue.""" - return self._actor_pool.has_next() or len(self._results_queue) - - def get(self) -> Any: - """Get the next result from the actor pool queue or by the object reference.""" - if not len(self._results_queue): - self._results_queue.put(self._actor_pool.get_next()) - return self._results_queue.get() - - def full(self) -> bool: - """Check if the results queue is full.""" - return len(self._results_queue) >= self._results_queue_size - - def _fetch_next(self) -> None: - """Fetch results from the actor pool.""" - self._results_queue.put(self._actor_pool.get_next()) - if len(self._results_queue) > self._results_queue_size: - logger.warning("Results queue full, dropping results. Use `.get()` to get results.") - self._results_queue.get() + actor_method_func = getattr(self._actors[0], self.spec.signature.method_name) + response_ref: ray.ObjectRef = actor_method_func.remote(**kwargs) + return ray.get(response_ref) def submit(self, *args, **kwargs) -> str: """Submit a task to the actor pool. @@ -393,8 +339,8 @@ def __contains__(self, spec: ModelSpec) -> bool: """ return spec.id in self.handlers - def get(self, model_spec: ModelSpec) -> ModelHandle: - """Get a model handle from the manager. + def load(self, model_spec: ModelSpec) -> ModelHandle: + """Load a model handle from the manager using the model specification. Create a new model handle if it does not exist, else return an existing handle. diff --git a/nos/server/_service.py b/nos/server/_service.py index b8bb64b8..bd66d8e5 100644 --- a/nos/server/_service.py +++ b/nos/server/_service.py @@ -94,7 +94,7 @@ def execute(self, model_name: str, task: TaskType = None, inputs: Dict[str, Any] # Initialize the model (if not already initialized) # This call should also evict models and garbage collect if # too many models are loaded are loaded simultaneously. - model_handle: ModelHandle = self.model_manager.get(model_spec) + model_handle: ModelHandle = self.model_manager.load(model_spec) # Get the model handle and call it remotely (with model spec, actor handle) st = time.perf_counter() diff --git a/tests/managers/test_model_manager.py b/tests/managers/test_model_manager.py index eb718d75..f2f1a850 100644 --- a/tests/managers/test_model_manager.py +++ b/tests/managers/test_model_manager.py @@ -31,9 +31,9 @@ def test_model_manager(manager): # noqa: F811 # This should not raise any OOM errors as models are evicted # from the manager's cache. for idx, spec in enumerate(hub.list()): - # Note: `manager.get()` is a wrapper around `manager.add()` + # Note: `manager.load()` is a wrapper around `manager.add()` # and creates a single replica of the model. - handler: ModelHandle = manager.get(spec) + handler: ModelHandle = manager.load(spec) assert handler is not None assert isinstance(handler, ModelHandle) @@ -46,7 +46,7 @@ def test_model_manager(manager): # noqa: F811 # Test noop with model manager spec = hub.load_spec("noop/process-images", task=TaskType.CUSTOM) - noop: ModelHandle = manager.get(spec) + noop: ModelHandle = manager.load(spec) assert noop is not None assert isinstance(noop, ModelHandle) @@ -131,7 +131,7 @@ def __call__(self, images: Union[np.ndarray, List[np.ndarray]], n: int = 1) -> n # Check if the model can be loaded with the ModelManager # Note: This will be executed as a Ray actor within a custom runtime env. - model_handle = manager.get(spec) + model_handle = manager.load(spec) assert model_handle is not None assert isinstance(model_handle, ModelHandle) @@ -159,7 +159,7 @@ def test_model_manager_noop_inference(manager): # noqa: F811 from nos.common import tqdm spec = hub.load_spec("noop/process-images", task=TaskType.CUSTOM) - noop: ModelHandle = manager.get(spec) + noop: ModelHandle = manager.load(spec) assert noop is not None assert isinstance(noop, ModelHandle) @@ -248,7 +248,6 @@ def test_model_manager_inference(manager): # noqa: F811 pd.set_option("display.max_columns", 1000) from PIL import Image - from itertools import product from nos.common import timer, tqdm from nos.constants import NOS_CACHE_DIR @@ -263,7 +262,7 @@ def test_model_manager_inference(manager): # noqa: F811 spec = hub.load_spec(model_name, task=task) # Add the model to the manager (or via `manager.add()`) - model: ModelHandle = manager.get(spec) + model: ModelHandle = manager.load(spec) assert model is not None # Benchmark: for each image-shape and batch-size diff --git a/tests/server/test_inference_service.py b/tests/server/test_inference_service.py index 2118f287..575738be 100644 --- a/tests/server/test_inference_service.py +++ b/tests/server/test_inference_service.py @@ -44,7 +44,7 @@ def test_model_manager(ray_executor: RayExecutor): # noqa: F811 # This should not raise any OOM errors as models are evicted # from the manager's cache. for idx, spec in enumerate(hub.list()): - handler: ModelHandle = manager.get(spec) + handler: ModelHandle = manager.load(spec) assert handler is not None assert isinstance(handler, ModelHandle) @@ -64,7 +64,7 @@ def test_model_manager_inference(ray_executor: RayExecutor): # noqa: F811 spec = hub.load_spec("openai/clip-vit-base-patch32", task=TaskType.IMAGE_EMBEDDING) # Add the model to the manager (or via `manager.add()`) - handle: ModelHandle = manager.get(spec) + handle: ModelHandle = manager.load(spec) assert handle is not None img = Image.open(NOS_TEST_IMAGE) From 3f01c66d15c0bcc172dbc657b3a813de3d9f7c0c Mon Sep 17 00:00:00 2001 From: Sudeep Pillai Date: Sun, 27 Aug 2023 14:13:27 -0700 Subject: [PATCH 9/9] Removed task property until we remove task attribute from `ModelSpec` --- nos/common/spec.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/nos/common/spec.py b/nos/common/spec.py index ff940797..55b9df8c 100644 --- a/nos/common/spec.py +++ b/nos/common/spec.py @@ -336,9 +336,9 @@ class ModelSpec: are specified in a separate file.""" class Config: - """Custom configuration to keep _metadata private for now.""" + """Custom configuration to enable private attributes.""" - underscore_attrs_are_private = True + underscore_attrs_are_private: bool = True def __repr__(self): return f"""ModelSpec(name={self.name}, task={self.task})""" f"""\n {self.signature}""" @@ -355,10 +355,6 @@ def metadata(self) -> ModelSpecMetadata: metadata = None return metadata - # @property - # def task(self) -> TaskType: - # return self.metadata.task - @staticmethod def get_id(model_name: str, task: TaskType = None) -> str: if task is None: