diff --git a/python/hopsworks_common/client/__init__.py b/python/hopsworks_common/client/__init__.py index 952e1c345..faaffe2df 100644 --- a/python/hopsworks_common/client/__init__.py +++ b/python/hopsworks_common/client/__init__.py @@ -18,7 +18,8 @@ from typing import Literal, Optional, Union -from hopsworks_common.client import external, hopsworks +from hopsworks_common.client import external, hopsworks, istio +from hopsworks_common.constants import HOSTS _client: Union[hopsworks.Client, external.Client, None] = None @@ -62,9 +63,9 @@ def init( def get_instance() -> Union[hopsworks.Client, external.Client]: global _client - if _client: - return _client - raise Exception("Couldn't find client. Try reconnecting to Hopsworks.") + if not _client: + raise Exception("Couldn't find client. Try reconnecting to Hopsworks.") + return _client def stop() -> None: @@ -72,3 +73,68 @@ def stop() -> None: if _client: _client._close() _client = None + if istio._client: + istio._client._close() + istio._client = None + + +def is_saas_connection() -> bool: + return get_instance()._host == HOSTS.APP_HOST + + +_kserve_installed = None + + +def set_kserve_installed(kserve_installed): + global _kserve_installed + _kserve_installed = kserve_installed + + +def is_kserve_installed() -> bool: + global _kserve_installed + return _kserve_installed + + +_serving_resource_limits = None + + +def set_serving_resource_limits(max_resources): + global _serving_resource_limits + _serving_resource_limits = max_resources + + +def get_serving_resource_limits(): + global _serving_resource_limits + return _serving_resource_limits + + +_serving_num_instances_limits = None + + +def set_serving_num_instances_limits(num_instances_range): + global _serving_num_instances_limits + _serving_num_instances_limits = num_instances_range + + +def get_serving_num_instances_limits(): + global _serving_num_instances_limits + return _serving_num_instances_limits + + +def is_scale_to_zero_required(): + # scale-to-zero is required for KServe deployments if the Hopsworks variable `kube_serving_min_num_instances` + # is set to 0. Other possible values are -1 (unlimited num instances) or >1 num instances. + return get_serving_num_instances_limits()[0] == 0 + + +_knative_domain = None + + +def get_knative_domain(): + global _knative_domain + return _knative_domain + + +def set_knative_domain(knative_domain): + global _knative_domain + _knative_domain = knative_domain diff --git a/python/hopsworks_common/client/auth.py b/python/hopsworks_common/client/auth.py index a960a7a8b..77aea23a9 100644 --- a/python/hopsworks_common/client/auth.py +++ b/python/hopsworks_common/client/auth.py @@ -16,7 +16,10 @@ from __future__ import annotations +import os + import requests +from hopsworks_common.client import exceptions class BearerAuth(requests.auth.AuthBase): @@ -50,3 +53,25 @@ def __init__(self, token): def __call__(self, r): r.headers["X-API-KEY"] = self._token return r + + +def get_api_key(api_key_value, api_key_file): + if api_key_value is not None: + return api_key_value + elif api_key_file is not None: + file = None + if os.path.exists(api_key_file): + try: + file = open(api_key_file, mode="r") + return file.read() + finally: + file.close() + else: + raise IOError( + "Could not find api key file on path: {}".format(api_key_file) + ) + else: + raise exceptions.ExternalClientError( + "Either api_key_file or api_key_value must be set when connecting to" + " hopsworks from an external environment." + ) diff --git a/python/hopsworks_common/client/exceptions.py b/python/hopsworks_common/client/exceptions.py index 6f5b26e40..c4acae20b 100644 --- a/python/hopsworks_common/client/exceptions.py +++ b/python/hopsworks_common/client/exceptions.py @@ -25,6 +25,12 @@ class RestAPIError(Exception): """REST Exception encapsulating the response object and url.""" + STATUS_CODE_BAD_REQUEST = 400 + STATUS_CODE_UNAUTHORIZED = 401 + STATUS_CODE_FORBIDDEN = 403 + STATUS_CODE_NOT_FOUND = 404 + STATUS_CODE_INTERNAL_SERVER_ERROR = 500 + class FeatureStoreErrorCode(int, Enum): FEATURE_GROUP_COMMIT_NOT_FOUND = 270227 STATISTICS_NOT_FOUND = 270228 @@ -111,6 +117,17 @@ def __init__(self, missing_argument: str) -> None: super().__init__(message) +class HopsworksClientError(TypeError): + """Raised when hopsworks internal client cannot be initialized due to missing arguments.""" + + def __init__(self, missing_argument): + message = ( + "{0} cannot be of type NoneType, {0} is a non-optional " + "argument to connect to hopsworks from an internal environment." + ).format(missing_argument) + super().__init__(message) + + class GitException(Exception): """Generic git exception""" @@ -141,3 +158,17 @@ class OpenSearchException(Exception): class JobExecutionException(Exception): """Generic job executions exception""" + + +class ModelRegistryException(Exception): + """Generic model registry exception""" + + +class ModelServingException(Exception): + """Generic model serving exception""" + + ERROR_CODE_SERVING_NOT_FOUND = 240000 + ERROR_CODE_ILLEGAL_ARGUMENT = 240001 + ERROR_CODE_DUPLICATED_ENTRY = 240011 + + ERROR_CODE_DEPLOYMENT_NOT_RUNNING = 250001 diff --git a/python/hopsworks_common/client/istio/__init__.py b/python/hopsworks_common/client/istio/__init__.py new file mode 100644 index 000000000..6785126ea --- /dev/null +++ b/python/hopsworks_common/client/istio/__init__.py @@ -0,0 +1,41 @@ +# +# Copyright 2022 Logical Clocks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import annotations + +from typing import Union + +import hopsworks_common.client as _main +from hopsworks_common.client.istio import external, hopsworks + + +_client: Union[hopsworks.Client, external.Client, None] = None + + +def init(host, port, project=None, api_key_value=None): + global _client + + if _client: + return + if isinstance(_main._client, _main.hopsworks.Client): + _client = hopsworks.Client(host, port) + elif isinstance(_main, _main.external.Client): + _client = external.Client(host, port, project, api_key_value) + + +def get_instance() -> Union[hopsworks.Client, external.Client, None]: + global _client + return _client diff --git a/python/hsml/client/hopsworks/base.py b/python/hopsworks_common/client/istio/base.py similarity index 62% rename from python/hsml/client/hopsworks/base.py rename to python/hopsworks_common/client/istio/base.py index a0326b2d5..235ea6d83 100644 --- a/python/hsml/client/hopsworks/base.py +++ b/python/hopsworks_common/client/istio/base.py @@ -17,31 +17,33 @@ import os from abc import abstractmethod -from hsml.client import auth, base +from hopsworks_common.client import base +from hopsworks_common.client.istio.grpc.inference_client import ( + GRPCInferenceServerClient, +) class Client(base.Client): - TOKEN_FILE = "token.jwt" - APIKEY_FILE = "api.key" - REST_ENDPOINT = "REST_ENDPOINT" + SERVING_API_KEY = "SERVING_API_KEY" HOPSWORKS_PUBLIC_HOST = "HOPSWORKS_PUBLIC_HOST" + TOKEN_EXPIRED_MAX_RETRIES = 0 - BASE_PATH_PARAMS = ["hopsworks-api", "api"] + BASE_PATH_PARAMS = [] @abstractmethod def __init__(self): - """To be extended by clients.""" + """To be implemented by clients.""" pass def _get_verify(self, verify, trust_store_path): - """Get verification method for sending HTTP requests to Hopsworks. + """Get verification method for sending inference requests to Istio. Credit to https://gist.github.com/gdamjan/55a8b9eec6cf7b771f92021d93b87b2c :param verify: perform hostname verification, 'true' or 'false' :type verify: str :param trust_store_path: path of the truststore locally if it was uploaded manually to - the external environment + the external environment such as EKS or AKS :type trust_store_path: str :return: if verify is true and the truststore is provided, then return the trust store location if verify is true but the truststore wasn't provided, then return true @@ -56,23 +58,6 @@ def _get_verify(self, verify, trust_store_path): return False - def _get_retry(self, request, response): - """Get retry method for resending HTTP requests to Hopsworks - - :param request: original HTTP request already sent - :type request: requests.Request - :param response: response of the original HTTP request - :type response: requests.Response - """ - if response.status_code == 401 and self.REST_ENDPOINT in os.environ: - # refresh token and retry request - only on hopsworks - self._auth = auth.BearerAuth(self._read_jwt()) - # Update request with the new token - request.auth = self._auth - # retry request - return True - return False - def _get_host_port_pair(self): """ Removes "http or https" from the rest endpoint and returns a list @@ -88,19 +73,6 @@ def _get_host_port_pair(self): host, port = endpoint.split(":") return host, port - def _read_jwt(self): - """Retrieve jwt from local container.""" - return self._read_file(self.TOKEN_FILE) - - def _read_apikey(self): - """Retrieve apikey from local container.""" - return self._read_file(self.APIKEY_FILE) - - def _read_file(self, secret_file): - """Retrieve secret from local container.""" - with open(os.path.join(self._secrets_dir, secret_file), "r") as secret: - return secret.read() - def _close(self): """Closes a client. Can be implemented for clean up purposes, not mandatory.""" self._connected = False @@ -109,3 +81,10 @@ def _replace_public_host(self, url): """replace hostname to public hostname set in HOPSWORKS_PUBLIC_HOST""" ui_url = url._replace(netloc=os.environ[self.HOPSWORKS_PUBLIC_HOST]) return ui_url + + def _create_grpc_channel(self, service_hostname: str) -> GRPCInferenceServerClient: + return GRPCInferenceServerClient( + url=self._host + ":" + str(self._port), + channel_args=(("grpc.ssl_target_name_override", service_hostname),), + serving_api_key=self._auth._token, + ) diff --git a/python/hopsworks_common/client/istio/external.py b/python/hopsworks_common/client/istio/external.py new file mode 100644 index 000000000..87be9ec8d --- /dev/null +++ b/python/hopsworks_common/client/istio/external.py @@ -0,0 +1,56 @@ +# +# Copyright 2022 Logical Clocks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import requests +from hopsworks_common.client import auth +from hopsworks_common.client.istio import base as istio + + +class Client(istio.Client): + def __init__( + self, + host, + port, + project, + api_key_value, + hostname_verification=None, + trust_store_path=None, + ): + """Initializes a client in an external environment such as AWS Sagemaker.""" + self._host = host + self._port = port + self._base_url = "http://" + self._host + ":" + str(self._port) + self._project_name = project + + self._auth = auth.ApiKeyAuth(api_key_value) + + self._session = requests.session() + self._connected = True + self._verify = self._get_verify(hostname_verification, trust_store_path) + + self._cert_key = None + + def _close(self): + """Closes a client.""" + self._connected = False + + def _replace_public_host(self, url): + """no need to replace as we are already in external client""" + return url + + @property + def host(self): + return self._host diff --git a/python/hsml/client/istio/grpc/proto/__init__.py b/python/hopsworks_common/client/istio/grpc/__init__.py similarity index 100% rename from python/hsml/client/istio/grpc/proto/__init__.py rename to python/hopsworks_common/client/istio/grpc/__init__.py diff --git a/python/hopsworks_common/client/istio/grpc/errors.py b/python/hopsworks_common/client/istio/grpc/errors.py new file mode 100644 index 000000000..062630bea --- /dev/null +++ b/python/hopsworks_common/client/istio/grpc/errors.py @@ -0,0 +1,30 @@ +# Copyright 2022 The KServe Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This implementation has been borrowed from the kserve/kserve repository +# https://github.com/kserve/kserve/blob/release-0.11/python/kserve/kserve/errors.py + + +class InvalidInput(ValueError): + """ + Exception class indicating invalid input arguments. + HTTP Servers should return HTTP_400 (Bad Request). + """ + + def __init__(self, reason): + self.reason = reason + + def __str__(self): + return self.reason diff --git a/python/hopsworks_common/client/istio/grpc/exceptions.py b/python/hopsworks_common/client/istio/grpc/exceptions.py new file mode 100644 index 000000000..6477c9488 --- /dev/null +++ b/python/hopsworks_common/client/istio/grpc/exceptions.py @@ -0,0 +1,123 @@ +# Copyright 2023 The KServe Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# coding: utf-8 + +# This implementation has been borrowed from kserve/kserve repository +# https://github.com/kserve/kserve/blob/release-0.11/python/kserve/kserve/exceptions.py + +import six + + +class OpenApiException(Exception): + """The base exception class for all OpenAPIExceptions""" + + +class ApiTypeError(OpenApiException, TypeError): + def __init__(self, msg, path_to_item=None, valid_classes=None, key_type=None): + """Raises an exception for TypeErrors + + Args: + msg (str): the exception message + + Keyword Args: + path_to_item (list): a list of keys an indices to get to the + current_item + None if unset + valid_classes (tuple): the primitive classes that current item + should be an instance of + None if unset + key_type (bool): False if our value is a value in a dict + True if it is a key in a dict + False if our item is an item in a list + None if unset + """ + self.path_to_item = path_to_item + self.valid_classes = valid_classes + self.key_type = key_type + full_msg = msg + if path_to_item: + full_msg = "{0} at {1}".format(msg, render_path(path_to_item)) + super(ApiTypeError, self).__init__(full_msg) + + +class ApiValueError(OpenApiException, ValueError): + def __init__(self, msg, path_to_item=None): + """ + Args: + msg (str): the exception message + + Keyword Args: + path_to_item (list) the path to the exception in the + received_data dict. None if unset + """ + + self.path_to_item = path_to_item + full_msg = msg + if path_to_item: + full_msg = "{0} at {1}".format(msg, render_path(path_to_item)) + super(ApiValueError, self).__init__(full_msg) + + +class ApiKeyError(OpenApiException, KeyError): + def __init__(self, msg, path_to_item=None): + """ + Args: + msg (str): the exception message + + Keyword Args: + path_to_item (None/list) the path to the exception in the + received_data dict + """ + self.path_to_item = path_to_item + full_msg = msg + if path_to_item: + full_msg = "{0} at {1}".format(msg, render_path(path_to_item)) + super(ApiKeyError, self).__init__(full_msg) + + +class ApiException(OpenApiException): + def __init__(self, status=None, reason=None, http_resp=None): + if http_resp: + self.status = http_resp.status + self.reason = http_resp.reason + self.body = http_resp.data + self.headers = http_resp.getheaders() + else: + self.status = status + self.reason = reason + self.body = None + self.headers = None + + def __str__(self): + """Custom error messages for exception""" + error_message = "({0})\n" "Reason: {1}\n".format(self.status, self.reason) + if self.headers: + error_message += "HTTP response headers: {0}\n".format(self.headers) + + if self.body: + error_message += "HTTP response body: {0}\n".format(self.body) + + return error_message + + +def render_path(path_to_item): + """Returns a string representation of a path""" + result = "" + for pth in path_to_item: + if isinstance(pth, six.integer_types): + result += "[{0}]".format(pth) + else: + result += "['{0}']".format(pth) + return result diff --git a/python/hopsworks_common/client/istio/grpc/inference_client.py b/python/hopsworks_common/client/istio/grpc/inference_client.py new file mode 100644 index 000000000..7b03642bc --- /dev/null +++ b/python/hopsworks_common/client/istio/grpc/inference_client.py @@ -0,0 +1,74 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +from hopsworks_common.client.istio.grpc.proto.grpc_predict_v2_pb2_grpc import ( + GRPCInferenceServiceStub, +) +from hopsworks_common.client.istio.utils.infer_type import InferRequest, InferResponse + + +class GRPCInferenceServerClient: + def __init__( + self, + url, + serving_api_key, + channel_args=None, + ): + if channel_args is not None: + channel_opt = channel_args + else: + channel_opt = [ + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ] + + # Authentication is done via API Key in the Authorization header + self._channel = grpc.insecure_channel(url, options=channel_opt) + self._client_stub = GRPCInferenceServiceStub(self._channel) + self._serving_api_key = serving_api_key + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + def __del__(self): + """It is called during object garbage collection.""" + self.close() + + def close(self): + """Close the client. Future calls to server will result in an Error.""" + self._channel.close() + + def infer(self, infer_request: InferRequest, headers=None, client_timeout=None): + headers = {} if headers is None else headers + headers["authorization"] = "ApiKey " + self._serving_api_key + metadata = headers.items() + + # convert the InferRequest to a ModelInferRequest message + request = infer_request.to_grpc() + + try: + # send request + model_infer_response = self._client_stub.ModelInfer( + request=request, metadata=metadata, timeout=client_timeout + ) + except grpc.RpcError as rpc_error: + raise rpc_error + + # convert back the ModelInferResponse message to InferResponse + return InferResponse.from_grpc(model_infer_response) diff --git a/python/hsml/client/hopsworks/__init__.py b/python/hopsworks_common/client/istio/grpc/proto/__init__.py similarity index 93% rename from python/hsml/client/hopsworks/__init__.py rename to python/hopsworks_common/client/istio/grpc/proto/__init__.py index 7fa8fd556..ff8055b9b 100644 --- a/python/hsml/client/hopsworks/__init__.py +++ b/python/hopsworks_common/client/istio/grpc/proto/__init__.py @@ -1,5 +1,5 @@ # -# Copyright 2022 Logical Clocks AB +# Copyright 2024 Hopsworks AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/python/hsml/client/istio/grpc/proto/grpc_predict_v2.proto b/python/hopsworks_common/client/istio/grpc/proto/grpc_predict_v2.proto similarity index 100% rename from python/hsml/client/istio/grpc/proto/grpc_predict_v2.proto rename to python/hopsworks_common/client/istio/grpc/proto/grpc_predict_v2.proto diff --git a/python/hsml/client/istio/grpc/proto/grpc_predict_v2_pb2.py b/python/hopsworks_common/client/istio/grpc/proto/grpc_predict_v2_pb2.py similarity index 100% rename from python/hsml/client/istio/grpc/proto/grpc_predict_v2_pb2.py rename to python/hopsworks_common/client/istio/grpc/proto/grpc_predict_v2_pb2.py diff --git a/python/hsml/client/istio/grpc/proto/grpc_predict_v2_pb2.pyi b/python/hopsworks_common/client/istio/grpc/proto/grpc_predict_v2_pb2.pyi similarity index 100% rename from python/hsml/client/istio/grpc/proto/grpc_predict_v2_pb2.pyi rename to python/hopsworks_common/client/istio/grpc/proto/grpc_predict_v2_pb2.pyi diff --git a/python/hsml/client/istio/grpc/proto/grpc_predict_v2_pb2_grpc.py b/python/hopsworks_common/client/istio/grpc/proto/grpc_predict_v2_pb2_grpc.py similarity index 98% rename from python/hsml/client/istio/grpc/proto/grpc_predict_v2_pb2_grpc.py rename to python/hopsworks_common/client/istio/grpc/proto/grpc_predict_v2_pb2_grpc.py index a5f986c20..ab6601923 100644 --- a/python/hsml/client/istio/grpc/proto/grpc_predict_v2_pb2_grpc.py +++ b/python/hopsworks_common/client/istio/grpc/proto/grpc_predict_v2_pb2_grpc.py @@ -15,8 +15,8 @@ # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" -import hsml.client.istio.grpc.inference_client as inference_client -import hsml.client.istio.grpc.proto.grpc_predict_v2_pb2 as grpc__predict__v2__pb2 +import hopsworks_common.client.istio.grpc.inference_client as inference_client +import hopsworks_common.client.istio.grpc.proto.grpc_predict_v2_pb2 as grpc__predict__v2__pb2 class GRPCInferenceServiceStub(object): diff --git a/python/hsml/client/istio/internal.py b/python/hopsworks_common/client/istio/hopsworks.py similarity index 98% rename from python/hsml/client/istio/internal.py rename to python/hopsworks_common/client/istio/hopsworks.py index b1befd39d..be00de97c 100644 --- a/python/hsml/client/istio/internal.py +++ b/python/hopsworks_common/client/istio/hopsworks.py @@ -20,8 +20,8 @@ from pathlib import Path import requests -from hsml.client import auth, exceptions -from hsml.client.istio import base as istio +from hopsworks_common.client import auth, exceptions +from hopsworks_common.client.istio import base as istio try: diff --git a/python/hopsworks_common/client/istio/utils/__init__.py b/python/hopsworks_common/client/istio/utils/__init__.py new file mode 100644 index 000000000..ff8055b9b --- /dev/null +++ b/python/hopsworks_common/client/istio/utils/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/hopsworks_common/client/istio/utils/infer_type.py b/python/hopsworks_common/client/istio/utils/infer_type.py new file mode 100644 index 000000000..f06fa1dd1 --- /dev/null +++ b/python/hopsworks_common/client/istio/utils/infer_type.py @@ -0,0 +1,812 @@ +# Copyright 2023 The KServe Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This implementation has been borrowed from kserve/kserve repository +# https://github.com/kserve/kserve/blob/release-0.11/python/kserve/kserve/protocol/infer_type.py + +import struct +from typing import Dict, List, Optional + +import numpy +import numpy as np +import pandas as pd +from hopsworks_common.client.istio.grpc.errors import InvalidInput +from hopsworks_common.client.istio.grpc.proto.grpc_predict_v2_pb2 import ( + InferTensorContents, + ModelInferRequest, + ModelInferResponse, +) +from hopsworks_common.client.istio.utils.numpy_codec import from_np_dtype, to_np_dtype + + +GRPC_CONTENT_DATATYPE_MAPPINGS = { + "BOOL": "bool_contents", + "INT8": "int_contents", + "INT16": "int_contents", + "INT32": "int_contents", + "INT64": "int64_contents", + "UINT8": "uint_contents", + "UINT16": "uint_contents", + "UINT32": "uint_contents", + "UINT64": "uint64_contents", + "FP32": "fp32_contents", + "FP64": "fp64_contents", + "BYTES": "bytes_contents", +} + + +def raise_error(msg): + """ + Raise error with the provided message + """ + raise InferenceServerException(msg=msg) from None + + +def serialize_byte_tensor(input_tensor): + """ + Serializes a bytes tensor into a flat numpy array of length prepended + bytes. The numpy array should use dtype of np.object. For np.bytes, + numpy will remove trailing zeros at the end of byte sequence and because + of this it should be avoided. + + Parameters + ---------- + input_tensor : np.array + The bytes tensor to serialize. + + Returns + ------- + serialized_bytes_tensor : np.array + The 1-D numpy array of type uint8 containing the serialized bytes in row-major form. + + Raises + ------ + InferenceServerException + If unable to serialize the given tensor. + """ + + if input_tensor.size == 0: + return np.empty([0], dtype=np.object_) + + # If the input is a tensor of string/bytes objects, then must flatten those into + # a 1-dimensional array containing the 4-byte byte size followed by the + # actual element bytes. All elements are concatenated together in row-major + # order. + + if (input_tensor.dtype != np.object_) and (input_tensor.dtype.type != np.bytes_): + raise_error("cannot serialize bytes tensor: invalid datatype") + + flattened_ls = [] + # 'C' order is row-major. + for obj in np.nditer(input_tensor, flags=["refs_ok"], order="C"): + # If directly passing bytes to BYTES type, + # don't convert it to str as Python will encode the + # bytes which may distort the meaning + if input_tensor.dtype == np.object_: + if isinstance(obj.item(), bytes): + s = obj.item() + else: + s = str(obj.item()).encode("utf-8") + else: + s = obj.item() + flattened_ls.append(struct.pack(" np.ndarray: + dtype = to_np_dtype(self.datatype) + if dtype is None: + raise InvalidInput("invalid datatype in the input") + if self._raw_data is not None: + np_array = np.frombuffer(self._raw_data, dtype=dtype) + return np_array.reshape(self._shape) + else: + np_array = np.array(self._data, dtype=dtype) + return np_array.reshape(self._shape) + + def set_data_from_numpy(self, input_tensor, binary_data=True): + """Set the tensor data from the specified numpy array for + input associated with this object. + Parameters + ---------- + input_tensor : numpy array + The tensor data in numpy array format + binary_data : bool + Indicates whether to set data for the input in binary format + or explicit tensor within JSON. The default value is True, + which means the data will be delivered as binary data in the + HTTP body after the JSON object. + Raises + ------ + InferenceServerException + If failed to set data for the tensor. + """ + if not isinstance(input_tensor, (np.ndarray,)): + raise_error("input_tensor must be a numpy array") + + dtype = from_np_dtype(input_tensor.dtype) + if self._datatype != dtype: + raise_error( + "got unexpected datatype {} from numpy array, expected {}".format( + dtype, self._datatype + ) + ) + valid_shape = True + if len(self._shape) != len(input_tensor.shape): + valid_shape = False + else: + for i in range(len(self._shape)): + if self._shape[i] != input_tensor.shape[i]: + valid_shape = False + if not valid_shape: + raise_error( + "got unexpected numpy array shape [{}], expected [{}]".format( + str(input_tensor.shape)[1:-1], str(self._shape)[1:-1] + ) + ) + + if not binary_data: + self._parameters.pop("binary_data_size", None) + self._raw_data = None + if self._datatype == "BYTES": + self._data = [] + try: + if input_tensor.size > 0: + for obj in np.nditer( + input_tensor, flags=["refs_ok"], order="C" + ): + # We need to convert the object to string using utf-8, + # if we want to use the binary_data=False. JSON requires + # the input to be a UTF-8 string. + if input_tensor.dtype == np.object_: + if isinstance(obj.item(), bytes): + self._data.append(str(obj.item(), encoding="utf-8")) + else: + self._data.append(str(obj.item())) + else: + self._data.append(str(obj.item(), encoding="utf-8")) + except UnicodeDecodeError: + raise_error( + f'Failed to encode "{obj.item()}" using UTF-8. Please use binary_data=True, if' + " you want to pass a byte array." + ) + else: + self._data = [val.item() for val in input_tensor.flatten()] + else: + self._data = None + if self._datatype == "BYTES": + serialized_output = serialize_byte_tensor(input_tensor) + if serialized_output.size > 0: + self._raw_data = serialized_output.item() + else: + self._raw_data = b"" + else: + self._raw_data = input_tensor.tobytes() + self._parameters["binary_data_size"] = len(self._raw_data) + + +def get_content(datatype: str, data: InferTensorContents): + if datatype == "BOOL": + return list(data.bool_contents) + elif datatype in ["UINT8", "UINT16", "UINT32"]: + return list(data.uint_contents) + elif datatype == "UINT64": + return list(data.uint64_contents) + elif datatype in ["INT8", "INT16", "INT32"]: + return list(data.int_contents) + elif datatype == "INT64": + return list(data.int64_contents) + elif datatype == "FP32": + return list(data.fp32_contents) + elif datatype == "FP64": + return list(data.fp64_contents) + elif datatype == "BYTES": + return list(data.bytes_contents) + else: + raise InvalidInput("invalid content type") + + +class InferRequest: + """InferenceRequest Model + + $inference_request = + { + "id" : $string #optional, + "parameters" : $parameters #optional, + "inputs" : [ $request_input, ... ], + "outputs" : [ $request_output, ... ] #optional + } + """ + + id: Optional[str] + model_name: str + parameters: Optional[Dict] + inputs: List[InferInput] + from_grpc: bool + + def __init__( + self, + model_name: str, + infer_inputs: List[InferInput], + request_id=None, + raw_inputs=None, + from_grpc=False, + parameters=None, + ): + if parameters is None: + parameters = {} + self.id = request_id + self.model_name = model_name + self.inputs = infer_inputs + self.parameters = parameters + self.from_grpc = from_grpc + if raw_inputs: + for i, raw_input in enumerate(raw_inputs): + self.inputs[i]._raw_data = raw_input + + @classmethod + def from_grpc(cls, request: ModelInferRequest): + infer_inputs = [ + InferInput( + name=input_tensor.name, + shape=list(input_tensor.shape), + datatype=input_tensor.datatype, + data=get_content(input_tensor.datatype, input_tensor.contents), + parameters=input_tensor.parameters, + ) + for input_tensor in request.inputs + ] + return cls( + request_id=request.id, + model_name=request.model_name, + infer_inputs=infer_inputs, + raw_inputs=request.raw_input_contents, + from_grpc=True, + parameters=request.parameters, + ) + + def to_rest(self) -> Dict: + """Converts the InferRequest object to v2 REST InferenceRequest message""" + infer_inputs = [] + for infer_input in self.inputs: + infer_input_dict = { + "name": infer_input.name, + "shape": infer_input.shape, + "datatype": infer_input.datatype, + } + if isinstance(infer_input.data, numpy.ndarray): + infer_input.set_data_from_numpy(infer_input.data, binary_data=False) + infer_input_dict["data"] = infer_input.data + else: + infer_input_dict["data"] = infer_input.data + infer_inputs.append(infer_input_dict) + return {"id": self.id, "inputs": infer_inputs} + + def to_grpc(self) -> ModelInferRequest: + """Converts the InferRequest object to gRPC ModelInferRequest message""" + infer_inputs = [] + raw_input_contents = [] + for infer_input in self.inputs: + if isinstance(infer_input.data, numpy.ndarray): + infer_input.set_data_from_numpy(infer_input.data, binary_data=True) + infer_input_dict = { + "name": infer_input.name, + "shape": infer_input.shape, + "datatype": infer_input.datatype, + } + if infer_input._raw_data is not None: + raw_input_contents.append(infer_input._raw_data) + else: + if not isinstance(infer_input.data, List): + raise InvalidInput("input data is not a List") + infer_input_dict["contents"] = {} + data_key = GRPC_CONTENT_DATATYPE_MAPPINGS.get( + infer_input.datatype, None + ) + if data_key is not None: + infer_input._data = [ + bytes(val, "utf-8") if isinstance(val, str) else val + for val in infer_input.data + ] # str to byte conversion for grpc proto + infer_input_dict["contents"][data_key] = infer_input.data + else: + raise InvalidInput("invalid input datatype") + infer_inputs.append(infer_input_dict) + + return ModelInferRequest( + id=self.id, + model_name=self.model_name, + inputs=infer_inputs, + raw_input_contents=raw_input_contents, + ) + + def as_dataframe(self) -> pd.DataFrame: + """ + Decode the tensor inputs as pandas dataframe + """ + dfs = [] + for input in self.inputs: + input_data = input.data + if input.datatype == "BYTES": + input_data = [ + str(val, "utf-8") if isinstance(val, bytes) else val + for val in input.data + ] + dfs.append(pd.DataFrame(input_data, columns=[input.name])) + return pd.concat(dfs, axis=1) + + +class InferOutput: + def __init__(self, name, shape, datatype, data=None, parameters=None): + """An object of InferOutput class is used to describe + input tensor for an inference request. + Parameters + ---------- + name : str + The name of input whose data will be described by this object + shape : list + The shape of the associated input. + datatype : str + The datatype of the associated input. + data : Union[List, InferTensorContents] + The data of the REST/gRPC input. When data is not set, raw_data is used for gRPC for numpy array bytes. + parameters : dict + The additional server-specific parameters. + """ + if parameters is None: + parameters = {} + self._name = name + self._shape = shape + self._datatype = datatype + self._parameters = parameters + self._data = data + self._raw_data = None + + @property + def name(self): + """Get the name of input associated with this object. + Returns + ------- + str + The name of input + """ + return self._name + + @property + def datatype(self): + """Get the datatype of input associated with this object. + Returns + ------- + str + The datatype of input + """ + return self._datatype + + @property + def data(self): + """Get the data of InferOutput""" + return self._data + + @property + def shape(self): + """Get the shape of input associated with this object. + Returns + ------- + list + The shape of input + """ + return self._shape + + @property + def parameters(self): + """Get the parameters of input associated with this object. + Returns + ------- + dict + The key, value pair of string and InferParameter + """ + return self._parameters + + def set_shape(self, shape): + """Set the shape of input. + Parameters + ---------- + shape : list + The shape of the associated input. + """ + self._shape = shape + + def as_numpy(self) -> numpy.ndarray: + """ + Decode the tensor data as numpy array + """ + dtype = to_np_dtype(self.datatype) + if dtype is None: + raise InvalidInput("invalid datatype in the input") + if self._raw_data is not None: + np_array = np.frombuffer(self._raw_data, dtype=dtype) + return np_array.reshape(self._shape) + else: + np_array = np.array(self._data, dtype=dtype) + return np_array.reshape(self._shape) + + def set_data_from_numpy(self, input_tensor, binary_data=True): + """Set the tensor data from the specified numpy array for + input associated with this object. + Parameters + ---------- + input_tensor : numpy array + The tensor data in numpy array format + binary_data : bool + Indicates whether to set data for the input in binary format + or explicit tensor within JSON. The default value is True, + which means the data will be delivered as binary data in the + HTTP body after the JSON object. + Raises + ------ + InferenceServerException + If failed to set data for the tensor. + """ + if not isinstance(input_tensor, (np.ndarray,)): + raise_error("input_tensor must be a numpy array") + + dtype = from_np_dtype(input_tensor.dtype) + if self._datatype != dtype: + raise_error( + "got unexpected datatype {} from numpy array, expected {}".format( + dtype, self._datatype + ) + ) + valid_shape = True + if len(self._shape) != len(input_tensor.shape): + valid_shape = False + else: + for i in range(len(self._shape)): + if self._shape[i] != input_tensor.shape[i]: + valid_shape = False + if not valid_shape: + raise_error( + "got unexpected numpy array shape [{}], expected [{}]".format( + str(input_tensor.shape)[1:-1], str(self._shape)[1:-1] + ) + ) + + if not binary_data: + self._parameters.pop("binary_data_size", None) + self._raw_data = None + if self._datatype == "BYTES": + self._data = [] + try: + if input_tensor.size > 0: + for obj in np.nditer( + input_tensor, flags=["refs_ok"], order="C" + ): + # We need to convert the object to string using utf-8, + # if we want to use the binary_data=False. JSON requires + # the input to be a UTF-8 string. + if input_tensor.dtype == np.object_: + if isinstance(obj.item(), bytes): + self._data.append(str(obj.item(), encoding="utf-8")) + else: + self._data.append(str(obj.item())) + else: + self._data.append(str(obj.item(), encoding="utf-8")) + except UnicodeDecodeError: + raise_error( + f'Failed to encode "{obj.item()}" using UTF-8. Please use binary_data=True, if' + " you want to pass a byte array." + ) + else: + self._data = [val.item() for val in input_tensor.flatten()] + else: + self._data = None + if self._datatype == "BYTES": + serialized_output = serialize_byte_tensor(input_tensor) + if serialized_output.size > 0: + self._raw_data = serialized_output.item() + else: + self._raw_data = b"" + else: + self._raw_data = input_tensor.tobytes() + self._parameters["binary_data_size"] = len(self._raw_data) + + +class InferResponse: + """InferenceResponse + + $inference_response = + { + "model_name" : $string, + "model_version" : $string #optional, + "id" : $string, + "parameters" : $parameters #optional, + "outputs" : [ $response_output, ... ] + } + """ + + id: str + model_name: str + parameters: Optional[Dict] + outputs: List[InferOutput] + from_grpc: bool + + def __init__( + self, + response_id: str, + model_name: str, + infer_outputs: List[InferOutput], + raw_outputs=None, + from_grpc=False, + parameters=None, + ): + if parameters is None: + parameters = {} + self.id = response_id + self.model_name = model_name + self.outputs = infer_outputs + self.parameters = parameters + self.from_grpc = from_grpc + if raw_outputs: + for i, raw_output in enumerate(raw_outputs): + self.outputs[i]._raw_data = raw_output + + @classmethod + def from_grpc(cls, response: ModelInferResponse) -> "InferResponse": + infer_outputs = [ + InferOutput( + name=output.name, + shape=list(output.shape), + datatype=output.datatype, + data=get_content(output.datatype, output.contents), + parameters=output.parameters, + ) + for output in response.outputs + ] + return cls( + model_name=response.model_name, + response_id=response.id, + parameters=response.parameters, + infer_outputs=infer_outputs, + raw_outputs=response.raw_output_contents, + from_grpc=True, + ) + + @classmethod + def from_rest(cls, model_name: str, response: Dict) -> "InferResponse": + infer_outputs = [ + InferOutput( + name=output["name"], + shape=list(output["shape"]), + datatype=output["datatype"], + data=output["data"], + parameters=output.get("parameters", {}), + ) + for output in response["outputs"] + ] + return cls( + model_name=model_name, + response_id=response.get("id", None), + parameters=response.get("parameters", {}), + infer_outputs=infer_outputs, + ) + + def to_rest(self) -> Dict: + """Converts the InferResponse object to v2 REST InferenceRequest message""" + infer_outputs = [] + for infer_output in self.outputs: + infer_output_dict = { + "name": infer_output.name, + "shape": infer_output.shape, + "datatype": infer_output.datatype, + } + if isinstance(infer_output.data, numpy.ndarray): + infer_output.set_data_from_numpy(infer_output.data, binary_data=False) + infer_output_dict["data"] = infer_output.data + elif isinstance(infer_output._raw_data, bytes): + infer_output_dict["data"] = infer_output.as_numpy().tolist() + else: + infer_output_dict["data"] = infer_output.data + infer_outputs.append(infer_output_dict) + res = {"id": self.id, "model_name": self.model_name, "outputs": infer_outputs} + return res + + def to_grpc(self) -> ModelInferResponse: + """Converts the InferResponse object to gRPC ModelInferRequest message""" + infer_outputs = [] + raw_output_contents = [] + for infer_output in self.outputs: + if isinstance(infer_output.data, numpy.ndarray): + infer_output.set_data_from_numpy(infer_output.data, binary_data=True) + infer_output_dict = { + "name": infer_output.name, + "shape": infer_output.shape, + "datatype": infer_output.datatype, + } + if infer_output._raw_data is not None: + raw_output_contents.append(infer_output._raw_data) + else: + if not isinstance(infer_output.data, List): + raise InvalidInput("output data is not a List") + infer_output_dict["contents"] = {} + data_key = GRPC_CONTENT_DATATYPE_MAPPINGS.get( + infer_output.datatype, None + ) + if data_key is not None: + infer_output._data = [ + bytes(val, "utf-8") if isinstance(val, str) else val + for val in infer_output.data + ] # str to byte conversion for grpc proto + infer_output_dict["contents"][data_key] = infer_output.data + else: + raise InvalidInput("to_grpc: invalid output datatype") + infer_outputs.append(infer_output_dict) + + return ModelInferResponse( + id=self.id, + model_name=self.model_name, + outputs=infer_outputs, + raw_output_contents=raw_output_contents, + ) diff --git a/python/hopsworks_common/client/istio/utils/numpy_codec.py b/python/hopsworks_common/client/istio/utils/numpy_codec.py new file mode 100644 index 000000000..bf22bcf34 --- /dev/null +++ b/python/hopsworks_common/client/istio/utils/numpy_codec.py @@ -0,0 +1,67 @@ +# Copyright 2021 The KServe Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This implementation has been borrowed from kserve/kserve repository +# https://github.com/kserve/kserve/blob/release-0.11/python/kserve/kserve/utils/numpy_codec.py + +import numpy as np + + +def to_np_dtype(dtype): + dtype_map = { + "BOOL": bool, + "INT8": np.int8, + "INT16": np.int16, + "INT32": np.int32, + "INT64": np.int64, + "UINT8": np.uint8, + "UINT16": np.uint16, + "UINT32": np.uint32, + "UINT64": np.uint64, + "FP16": np.float16, + "FP32": np.float32, + "FP64": np.float64, + "BYTES": np.object_, + } + return dtype_map.get(dtype, None) + + +def from_np_dtype(np_dtype): + if np_dtype is bool: + return "BOOL" + elif np_dtype == np.int8: + return "INT8" + elif np_dtype == np.int16: + return "INT16" + elif np_dtype == np.int32: + return "INT32" + elif np_dtype == np.int64: + return "INT64" + elif np_dtype == np.uint8: + return "UINT8" + elif np_dtype == np.uint16: + return "UINT16" + elif np_dtype == np.uint32: + return "UINT32" + elif np_dtype == np.uint64: + return "UINT64" + elif np_dtype == np.float16: + return "FP16" + elif np_dtype == np.float32: + return "FP32" + elif np_dtype == np.float64: + return "FP64" + elif np_dtype == np.object_ or np_dtype.type == np.bytes_: + return "BYTES" + return None diff --git a/python/hsfs/client/exceptions.py b/python/hsfs/client/exceptions.py index b34ef198f..188eed3ce 100644 --- a/python/hsfs/client/exceptions.py +++ b/python/hsfs/client/exceptions.py @@ -21,9 +21,12 @@ ExternalClientError, FeatureStoreException, GitException, + HopsworksClientError, JobException, JobExecutionException, KafkaException, + ModelRegistryException, + ModelServingException, OpenSearchException, ProjectException, RestAPIError, @@ -39,9 +42,12 @@ ExternalClientError, FeatureStoreException, GitException, + HopsworksClientError, JobException, JobExecutionException, KafkaException, + ModelRegistryException, + ModelServingException, OpenSearchException, ProjectException, RestAPIError, diff --git a/python/hsml/client/__init__.py b/python/hsml/client/__init__.py index c54bbfb08..db950979a 100644 --- a/python/hsml/client/__init__.py +++ b/python/hsml/client/__init__.py @@ -1,5 +1,5 @@ # -# Copyright 2022 Logical Clocks AB +# Copyright 2024 Hopsworks AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,139 +14,49 @@ # limitations under the License. # -from hsml.client.hopsworks import base as hw_base -from hsml.client.hopsworks import external as hw_external -from hsml.client.hopsworks import internal as hw_internal -from hsml.client.istio import base as ist_base -from hsml.client.istio import external as ist_external -from hsml.client.istio import internal as ist_internal -from hsml.constants import HOSTS - - -_client_type = None -_saas_connection = None - -_hopsworks_client = None -_istio_client = None - -_kserve_installed = None -_serving_resource_limits = None -_serving_num_instances_limits = None -_knative_domain = None - - -def init( - client_type, - host=None, - port=None, - project=None, - hostname_verification=None, - trust_store_path=None, - api_key_file=None, - api_key_value=None, -): - global _client_type - _client_type = client_type - - global _saas_connection - _saas_connection = host == HOSTS.APP_HOST - - global _hopsworks_client - if not _hopsworks_client: - if client_type == "internal": - _hopsworks_client = hw_internal.Client() - elif client_type == "external": - _hopsworks_client = hw_external.Client( - host, - port, - project, - hostname_verification, - trust_store_path, - api_key_file, - api_key_value, - ) - - -def get_instance() -> hw_base.Client: - global _hopsworks_client - if _hopsworks_client: - return _hopsworks_client - raise Exception("Couldn't find client. Try reconnecting to Hopsworks.") - - -def set_istio_client(host, port, project=None, api_key_value=None): - global _client_type, _istio_client - - if not _istio_client: - if _client_type == "internal": - _istio_client = ist_internal.Client(host, port) - elif _client_type == "external": - _istio_client = ist_external.Client(host, port, project, api_key_value) - - -def get_istio_instance() -> ist_base.Client: - global _istio_client - return _istio_client - - -def get_client_type() -> str: - global _client_type - return _client_type - - -def is_saas_connection() -> bool: - global _saas_connection - return _saas_connection - - -def set_kserve_installed(kserve_installed): - global _kserve_installed - _kserve_installed = kserve_installed - - -def is_kserve_installed() -> bool: - global _kserve_installed - return _kserve_installed - - -def set_serving_resource_limits(max_resources): - global _serving_resource_limits - _serving_resource_limits = max_resources - - -def get_serving_resource_limits(): - global _serving_resource_limits - return _serving_resource_limits - - -def set_serving_num_instances_limits(num_instances_range): - global _serving_num_instances_limits - _serving_num_instances_limits = num_instances_range - - -def get_serving_num_instances_limits(): - global _serving_num_instances_limits - return _serving_num_instances_limits - - -def is_scale_to_zero_required(): - # scale-to-zero is required for KServe deployments if the Hopsworks variable `kube_serving_min_num_instances` - # is set to 0. Other possible values are -1 (unlimited num instances) or >1 num instances. - return get_serving_num_instances_limits()[0] == 0 - - -def get_knative_domain(): - global _knative_domain - return _knative_domain - - -def set_knative_domain(knative_domain): - global _knative_domain - _knative_domain = knative_domain - - -def stop(): - global _hopsworks_client, _istio_client - _hopsworks_client._close() - _istio_client._close() - _hopsworks_client = _istio_client = None +from hopsworks_common.client import ( + auth, + base, + exceptions, + external, + get_instance, + get_knative_domain, + get_serving_num_instances_limits, + get_serving_resource_limits, + hopsworks, + init, + is_kserve_installed, + is_saas_connection, + is_scale_to_zero_required, + istio, + online_store_rest_client, + set_knative_domain, + set_kserve_installed, + set_serving_num_instances_limits, + set_serving_resource_limits, + stop, +) + + +__all__ = [ + auth, + base, + exceptions, + external, + get_instance, + get_knative_domain, + get_serving_num_instances_limits, + get_serving_resource_limits, + hopsworks, + init, + is_kserve_installed, + is_saas_connection, + is_scale_to_zero_required, + istio, + online_store_rest_client, + set_knative_domain, + set_kserve_installed, + set_serving_num_instances_limits, + set_serving_resource_limits, + stop, +] diff --git a/python/hsml/client/auth.py b/python/hsml/client/auth.py index 696aaad2e..2f8d284e6 100644 --- a/python/hsml/client/auth.py +++ b/python/hsml/client/auth.py @@ -1,5 +1,5 @@ # -# Copyright 2021 Logical Clocks AB +# Copyright 2024 Hopsworks AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,51 +14,17 @@ # limitations under the License. # -import os +from hopsworks_common.client.auth import ( + ApiKeyAuth, + BearerAuth, + OnlineStoreKeyAuth, + get_api_key, +) -import requests -from hsml.client import exceptions - -class BearerAuth(requests.auth.AuthBase): - """Class to encapsulate a Bearer token.""" - - def __init__(self, token): - self._token = token - - def __call__(self, r): - r.headers["Authorization"] = "Bearer " + self._token - return r - - -class ApiKeyAuth(requests.auth.AuthBase): - """Class to encapsulate an API key.""" - - def __init__(self, token): - self._token = token - - def __call__(self, r): - r.headers["Authorization"] = "ApiKey " + self._token - return r - - -def get_api_key(api_key_value, api_key_file): - if api_key_value is not None: - return api_key_value - elif api_key_file is not None: - file = None - if os.path.exists(api_key_file): - try: - file = open(api_key_file, mode="r") - return file.read() - finally: - file.close() - else: - raise IOError( - "Could not find api key file on path: {}".format(api_key_file) - ) - else: - raise exceptions.ExternalClientError( - "Either api_key_file or api_key_value must be set when connecting to" - " hopsworks from an external environment." - ) +__all__ = [ + ApiKeyAuth, + BearerAuth, + OnlineStoreKeyAuth, + get_api_key, +] diff --git a/python/hsml/client/base.py b/python/hsml/client/base.py index d36e366c5..3ff35d800 100644 --- a/python/hsml/client/base.py +++ b/python/hsml/client/base.py @@ -1,5 +1,5 @@ # -# Copyright 2022 Logical Clocks AB +# Copyright 2024 Hopsworks AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,106 +14,11 @@ # limitations under the License. # -from abc import ABC, abstractmethod +from hopsworks_common.client.base import ( + Client, +) -import furl -import requests -import urllib3 -from hsml.client import exceptions -from hsml.decorators import connected - -urllib3.disable_warnings(urllib3.exceptions.SecurityWarning) -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - - -class Client(ABC): - @abstractmethod - def __init__(self): - """To be implemented by clients.""" - pass - - @abstractmethod - def _get_verify(self, verify, trust_store_path): - """To be implemented by clients.""" - pass - - @abstractmethod - def _get_retry(self, session, request, response): - """To be implemented by clients.""" - pass - - @abstractmethod - def _get_host_port_pair(self): - """To be implemented by clients.""" - pass - - @connected - def _send_request( - self, - method, - path_params, - query_params=None, - headers=None, - data=None, - stream=False, - files=None, - ): - """Send REST request to a REST endpoint. - - Uses the client it is executed from. Path parameters are url encoded automatically. - - :param method: 'GET', 'PUT' or 'POST' - :type method: str - :param path_params: a list of path params to build the query url from starting after - the api resource, for example `["project", 119]`. - :type path_params: list - :param query_params: A dictionary of key/value pairs to be added as query parameters, - defaults to None - :type query_params: dict, optional - :param headers: Additional header information, defaults to None - :type headers: dict, optional - :param data: The payload as a python dictionary to be sent as json, defaults to None - :type data: dict, optional - :param stream: Set if response should be a stream, defaults to False - :type stream: boolean, optional - :param files: dictionary for multipart encoding upload - :type files: dict, optional - :raises RestAPIError: Raised when request wasn't correctly received, understood or accepted - :return: Response json - :rtype: dict - """ - f_url = furl.furl(self._base_url) - f_url.path.segments = self.BASE_PATH_PARAMS + path_params - url = str(f_url) - request = requests.Request( - method, - url=url, - headers=headers, - data=data, - params=query_params, - auth=self._auth, - files=files, - ) - - prepped = self._session.prepare_request(request) - response = self._session.send(prepped, verify=self._verify, stream=stream) - - if self._get_retry(request, response): - prepped = self._session.prepare_request(request) - response = self._session.send(prepped, verify=self._verify, stream=stream) - - if response.status_code // 100 != 2: - raise exceptions.RestAPIError(url, response) - - if stream: - return response - else: - # handle different success response codes - if len(response.content) == 0: - return None - return response.json() - - def _close(self): - """Closes a client. Can be implemented for clean up purposes, not mandatory.""" - self._connected = False +__all__ = [ + Client, +] diff --git a/python/hsml/client/exceptions.py b/python/hsml/client/exceptions.py index 6a59909db..188eed3ce 100644 --- a/python/hsml/client/exceptions.py +++ b/python/hsml/client/exceptions.py @@ -1,5 +1,5 @@ # -# Copyright 2021 Logical Clocks AB +# Copyright 2024 Hopsworks AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,72 +14,43 @@ # limitations under the License. # - -class RestAPIError(Exception): - """REST Exception encapsulating the response object and url.""" - - def __init__(self, url, response): - try: - error_object = response.json() - except Exception: - self.error_code = error_object = None - - message = ( - "Metadata operation error: (url: {}). Server response: \n" - "HTTP code: {}, HTTP reason: {}, body: {}".format( - url, - response.status_code, - response.reason, - response.content, - ) - ) - - if error_object is not None: - self.error_code = error_object.get("errorCode", "") - message += ", error code: {}, error msg: {}, user msg: {}".format( - self.error_code, - error_object.get("errorMsg", ""), - error_object.get("usrMsg", ""), - ) - - super().__init__(message) - self.url = url - self.response = response - - STATUS_CODE_BAD_REQUEST = 400 - STATUS_CODE_UNAUTHORIZED = 401 - STATUS_CODE_FORBIDDEN = 403 - STATUS_CODE_NOT_FOUND = 404 - STATUS_CODE_INTERNAL_SERVER_ERROR = 500 - - -class UnknownSecretStorageError(Exception): - """This exception will be raised if an unused secrets storage is passed as a parameter.""" - - -class ModelRegistryException(Exception): - """Generic model registry exception""" - - -class ModelServingException(Exception): - """Generic model serving exception""" - - ERROR_CODE_SERVING_NOT_FOUND = 240000 - ERROR_CODE_ILLEGAL_ARGUMENT = 240001 - ERROR_CODE_DUPLICATED_ENTRY = 240011 - - ERROR_CODE_DEPLOYMENT_NOT_RUNNING = 250001 - - -class InternalClientError(TypeError): - """Raised when internal client cannot be initialized due to missing arguments.""" - - def __init__(self, message): - super().__init__(message) - - -class ExternalClientError(TypeError): - """Raised when external client cannot be initialized due to missing arguments.""" - - def __init__(self, message): - super().__init__(message) +from hopsworks_common.client.exceptions import ( + DatasetException, + DataValidationException, + EnvironmentException, + ExternalClientError, + FeatureStoreException, + GitException, + HopsworksClientError, + JobException, + JobExecutionException, + KafkaException, + ModelRegistryException, + ModelServingException, + OpenSearchException, + ProjectException, + RestAPIError, + UnknownSecretStorageError, + VectorDatabaseException, +) + + +__all__ = [ + DatasetException, + DataValidationException, + EnvironmentException, + ExternalClientError, + FeatureStoreException, + GitException, + HopsworksClientError, + JobException, + JobExecutionException, + KafkaException, + ModelRegistryException, + ModelServingException, + OpenSearchException, + ProjectException, + RestAPIError, + UnknownSecretStorageError, + VectorDatabaseException, +] diff --git a/python/hsml/client/external.py b/python/hsml/client/external.py new file mode 100644 index 000000000..1384b1c20 --- /dev/null +++ b/python/hsml/client/external.py @@ -0,0 +1,24 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from hopsworks_common.client.external import ( + Client, +) + + +__all__ = [ + Client, +] diff --git a/python/hsml/client/hopsworks.py b/python/hsml/client/hopsworks.py new file mode 100644 index 000000000..c360b8cb9 --- /dev/null +++ b/python/hsml/client/hopsworks.py @@ -0,0 +1,24 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from hopsworks_common.client.hopsworks import ( + Client, +) + + +__all__ = [ + Client, +] diff --git a/python/hsml/client/hopsworks/external.py b/python/hsml/client/hopsworks/external.py deleted file mode 100644 index 6da14a4d3..000000000 --- a/python/hsml/client/hopsworks/external.py +++ /dev/null @@ -1,85 +0,0 @@ -# -# Copyright 2021 Logical Clocks AB -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import requests -from hsml.client import auth, exceptions -from hsml.client.hopsworks import base as hopsworks - - -class Client(hopsworks.Client): - def __init__( - self, - host, - port, - project, - hostname_verification, - trust_store_path, - api_key_file, - api_key_value, - ): - """Initializes a client in an external environment.""" - if not host: - raise exceptions.ExternalClientError( - "host cannot be of type NoneType, host is a non-optional " - "argument to connect to hopsworks from an external environment." - ) - if not project: - raise exceptions.ExternalClientError( - "project cannot be of type NoneType, project is a non-optional " - "argument to connect to hopsworks from an external environment." - ) - - self._host = host - self._port = port - self._base_url = "https://" + self._host + ":" + str(self._port) - self._project_name = project - - api_key = auth.get_api_key(api_key_value, api_key_file) - self._auth = auth.ApiKeyAuth(api_key) - - self._session = requests.session() - self._connected = True - self._verify = self._get_verify(self._host, trust_store_path) - - if self._project_name is not None: - project_info = self._get_project_info(self._project_name) - self._project_id = str(project_info["projectId"]) - else: - self._project_id = None - - self._cert_key = None - - def _close(self): - """Closes a client.""" - self._connected = False - - def _get_project_info(self, project_name): - """Makes a REST call to hopsworks to get all metadata of a project for the provided project. - - :param project_name: the name of the project - :type project_name: str - :return: JSON response with project info - :rtype: dict - """ - return self._send_request("GET", ["project", "getProjectInfo", project_name]) - - def _replace_public_host(self, url): - """no need to replace as we are already in external client""" - return url - - @property - def host(self): - return self._host diff --git a/python/hsml/client/hopsworks/internal.py b/python/hsml/client/hopsworks/internal.py deleted file mode 100644 index 760251540..000000000 --- a/python/hsml/client/hopsworks/internal.py +++ /dev/null @@ -1,208 +0,0 @@ -# -# Copyright 2021 Logical Clocks AB -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import base64 -import os -import textwrap -from pathlib import Path - -import requests -from hsml.client import auth -from hsml.client.hopsworks import base as hopsworks - - -try: - import jks -except ImportError: - pass - - -class Client(hopsworks.Client): - REQUESTS_VERIFY = "REQUESTS_VERIFY" - DOMAIN_CA_TRUSTSTORE_PEM = "DOMAIN_CA_TRUSTSTORE_PEM" - PROJECT_ID = "HOPSWORKS_PROJECT_ID" - PROJECT_NAME = "HOPSWORKS_PROJECT_NAME" - HADOOP_USER_NAME = "HADOOP_USER_NAME" - MATERIAL_DIRECTORY = "MATERIAL_DIRECTORY" - HDFS_USER = "HDFS_USER" - T_CERTIFICATE = "t_certificate" - K_CERTIFICATE = "k_certificate" - TRUSTSTORE_SUFFIX = "__tstore.jks" - KEYSTORE_SUFFIX = "__kstore.jks" - PEM_CA_CHAIN = "ca_chain.pem" - CERT_KEY_SUFFIX = "__cert.key" - MATERIAL_PWD = "material_passwd" - SECRETS_DIR = "SECRETS_DIR" - - def __init__(self): - """Initializes a client being run from a job/notebook directly on Hopsworks.""" - self._base_url = self._get_hopsworks_rest_endpoint() - self._host, self._port = self._get_host_port_pair() - self._secrets_dir = ( - os.environ[self.SECRETS_DIR] if self.SECRETS_DIR in os.environ else "" - ) - self._cert_key = self._get_cert_pw() - trust_store_path = self._get_trust_store_path() - hostname_verification = ( - os.environ[self.REQUESTS_VERIFY] - if self.REQUESTS_VERIFY in os.environ - else "true" - ) - self._project_id = os.environ[self.PROJECT_ID] - self._project_name = self._project_name() - try: - self._auth = auth.BearerAuth(self._read_jwt()) - except FileNotFoundError: - self._auth = auth.ApiKeyAuth(self._read_apikey()) - self._verify = self._get_verify(hostname_verification, trust_store_path) - self._session = requests.session() - - self._connected = True - - def _get_hopsworks_rest_endpoint(self): - """Get the hopsworks REST endpoint for making requests to the REST API.""" - return os.environ[self.REST_ENDPOINT] - - def _get_trust_store_path(self): - """Convert truststore from jks to pem and return the location""" - ca_chain_path = Path(self.PEM_CA_CHAIN) - if not ca_chain_path.exists(): - self._write_ca_chain(ca_chain_path) - return str(ca_chain_path) - - def _write_ca_chain(self, ca_chain_path): - """ - Converts JKS trustore file into PEM to be compatible with Python libraries - """ - keystore_pw = self._cert_key - keystore_ca_cert = self._convert_jks_to_pem( - self._get_jks_key_store_path(), keystore_pw - ) - truststore_ca_cert = self._convert_jks_to_pem( - self._get_jks_trust_store_path(), keystore_pw - ) - - with ca_chain_path.open("w") as f: - f.write(keystore_ca_cert + truststore_ca_cert) - - def _convert_jks_to_pem(self, jks_path, keystore_pw): - """ - Converts a keystore JKS that contains client private key, - client certificate and CA certificate that was used to - sign the certificate to PEM format and returns the CA certificate. - Args: - :jks_path: path to the JKS file - :pw: password for decrypting the JKS file - Returns: - strings: (ca_cert) - """ - # load the keystore and decrypt it with password - ks = jks.KeyStore.load(jks_path, keystore_pw, try_decrypt_keys=True) - ca_certs = "" - - # Convert CA Certificates into PEM format and append to string - for _alias, c in ks.certs.items(): - ca_certs = ca_certs + self._bytes_to_pem_str(c.cert, "CERTIFICATE") - return ca_certs - - def _bytes_to_pem_str(self, der_bytes, pem_type): - """ - Utility function for creating PEM files - - Args: - der_bytes: DER encoded bytes - pem_type: type of PEM, e.g Certificate, Private key, or RSA private key - - Returns: - PEM String for a DER-encoded certificate or private key - """ - pem_str = "" - pem_str = pem_str + "-----BEGIN {}-----".format(pem_type) + "\n" - pem_str = ( - pem_str - + "\r\n".join( - textwrap.wrap(base64.b64encode(der_bytes).decode("ascii"), 64) - ) - + "\n" - ) - pem_str = pem_str + "-----END {}-----".format(pem_type) + "\n" - return pem_str - - def _get_jks_trust_store_path(self): - """ - Get truststore location - - Returns: - truststore location - """ - t_certificate = Path(self.T_CERTIFICATE) - if t_certificate.exists(): - return str(t_certificate) - else: - username = os.environ[self.HADOOP_USER_NAME] - material_directory = Path(os.environ[self.MATERIAL_DIRECTORY]) - return str(material_directory.joinpath(username + self.TRUSTSTORE_SUFFIX)) - - def _get_jks_key_store_path(self): - """ - Get keystore location - - Returns: - keystore location - """ - k_certificate = Path(self.K_CERTIFICATE) - if k_certificate.exists(): - return str(k_certificate) - else: - username = os.environ[self.HADOOP_USER_NAME] - material_directory = Path(os.environ[self.MATERIAL_DIRECTORY]) - return str(material_directory.joinpath(username + self.KEYSTORE_SUFFIX)) - - def _project_name(self): - try: - return os.environ[self.PROJECT_NAME] - except KeyError: - pass - - hops_user = self._project_user() - hops_user_split = hops_user.split( - "__" - ) # project users have username project__user - project = hops_user_split[0] - return project - - def _project_user(self): - try: - hops_user = os.environ[self.HADOOP_USER_NAME] - except KeyError: - hops_user = os.environ[self.HDFS_USER] - return hops_user - - def _get_cert_pw(self): - """ - Get keystore password from local container - - Returns: - Certificate password - """ - pwd_path = Path(self.MATERIAL_PWD) - if not pwd_path.exists(): - username = os.environ[self.HADOOP_USER_NAME] - material_directory = Path(os.environ[self.MATERIAL_DIRECTORY]) - pwd_path = material_directory.joinpath(username + self.CERT_KEY_SUFFIX) - - with pwd_path.open() as f: - return f.read() diff --git a/python/hsml/client/istio/__init__.py b/python/hsml/client/istio/__init__.py index 7fa8fd556..a99b8ba0f 100644 --- a/python/hsml/client/istio/__init__.py +++ b/python/hsml/client/istio/__init__.py @@ -1,5 +1,5 @@ # -# Copyright 2022 Logical Clocks AB +# Copyright 2024 Hopsworks AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,3 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +from hopsworks_common.client.istio import ( + base, + external, + get_instance, + grpc, + hopsworks, + init, + utils, +) + + +__all__ = [ + base, + external, + get_instance, + grpc, + hopsworks, + init, + utils, +] diff --git a/python/hsml/client/istio/base.py b/python/hsml/client/istio/base.py index 9aaab9ba0..049d6daa1 100644 --- a/python/hsml/client/istio/base.py +++ b/python/hsml/client/istio/base.py @@ -1,5 +1,5 @@ # -# Copyright 2022 Logical Clocks AB +# Copyright 2024 Hopsworks AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,84 +14,11 @@ # limitations under the License. # -import os -from abc import abstractmethod +from hopsworks_common.client.istio.base import ( + Client, +) -from hsml.client import base -from hsml.client.istio.grpc.inference_client import GRPCInferenceServerClient - -class Client(base.Client): - SERVING_API_KEY = "SERVING_API_KEY" - HOPSWORKS_PUBLIC_HOST = "HOPSWORKS_PUBLIC_HOST" - - BASE_PATH_PARAMS = [] - - @abstractmethod - def __init__(self): - """To be implemented by clients.""" - pass - - def _get_verify(self, verify, trust_store_path): - """Get verification method for sending inference requests to Istio. - - Credit to https://gist.github.com/gdamjan/55a8b9eec6cf7b771f92021d93b87b2c - - :param verify: perform hostname verification, 'true' or 'false' - :type verify: str - :param trust_store_path: path of the truststore locally if it was uploaded manually to - the external environment such as EKS or AKS - :type trust_store_path: str - :return: if verify is true and the truststore is provided, then return the trust store location - if verify is true but the truststore wasn't provided, then return true - if verify is false, then return false - :rtype: str or boolean - """ - if verify == "true": - if trust_store_path is not None: - return trust_store_path - else: - return True - - return False - - def _get_retry(self, request, response): - """Get retry method for resending HTTP requests to Istio - - :param request: original HTTP request already sent - :type request: requests.Request - :param response: response of the original HTTP request - :type response: requests.Response - """ - return False - - def _get_host_port_pair(self): - """ - Removes "http or https" from the rest endpoint and returns a list - [endpoint, port], where endpoint is on the format /path.. without http:// - - :return: a list [endpoint, port] - :rtype: list - """ - endpoint = self._base_url - if endpoint.startswith("http"): - last_index = endpoint.rfind("/") - endpoint = endpoint[last_index + 1 :] - host, port = endpoint.split(":") - return host, port - - def _close(self): - """Closes a client. Can be implemented for clean up purposes, not mandatory.""" - self._connected = False - - def _replace_public_host(self, url): - """replace hostname to public hostname set in HOPSWORKS_PUBLIC_HOST""" - ui_url = url._replace(netloc=os.environ[self.HOPSWORKS_PUBLIC_HOST]) - return ui_url - - def _create_grpc_channel(self, service_hostname: str) -> GRPCInferenceServerClient: - return GRPCInferenceServerClient( - url=self._host + ":" + str(self._port), - channel_args=(("grpc.ssl_target_name_override", service_hostname),), - serving_api_key=self._auth._token, - ) +__all__ = [ + Client, +] diff --git a/python/hsml/client/istio/external.py b/python/hsml/client/istio/external.py index c4fd89787..6f1995a9f 100644 --- a/python/hsml/client/istio/external.py +++ b/python/hsml/client/istio/external.py @@ -1,5 +1,5 @@ # -# Copyright 2022 Logical Clocks AB +# Copyright 2024 Hopsworks AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,43 +14,11 @@ # limitations under the License. # -import requests -from hsml.client import auth -from hsml.client.istio import base as istio +from hopsworks_common.client.istio.external import ( + Client, +) -class Client(istio.Client): - def __init__( - self, - host, - port, - project, - api_key_value, - hostname_verification=None, - trust_store_path=None, - ): - """Initializes a client in an external environment such as AWS Sagemaker.""" - self._host = host - self._port = port - self._base_url = "http://" + self._host + ":" + str(self._port) - self._project_name = project - - self._auth = auth.ApiKeyAuth(api_key_value) - - self._session = requests.session() - self._connected = True - self._verify = self._get_verify(hostname_verification, trust_store_path) - - self._cert_key = None - - def _close(self): - """Closes a client.""" - self._connected = False - - def _replace_public_host(self, url): - """no need to replace as we are already in external client""" - return url - - @property - def host(self): - return self._host +__all__ = [ + Client, +] diff --git a/python/hsml/client/istio/grpc/errors.py b/python/hsml/client/istio/grpc/errors.py index 062630bea..3a27334b7 100644 --- a/python/hsml/client/istio/grpc/errors.py +++ b/python/hsml/client/istio/grpc/errors.py @@ -1,30 +1,24 @@ -# Copyright 2022 The KServe Authors. # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Copyright 2024 Hopsworks AB # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# This implementation has been borrowed from the kserve/kserve repository -# https://github.com/kserve/kserve/blob/release-0.11/python/kserve/kserve/errors.py - -class InvalidInput(ValueError): - """ - Exception class indicating invalid input arguments. - HTTP Servers should return HTTP_400 (Bad Request). - """ +from hopsworks_common.client.istio.grpc.errors import ( + InvalidInput, +) - def __init__(self, reason): - self.reason = reason - def __str__(self): - return self.reason +__all__ = [ + InvalidInput, +] diff --git a/python/hsml/client/istio/grpc/exceptions.py b/python/hsml/client/istio/grpc/exceptions.py index 6477c9488..cac5a664c 100644 --- a/python/hsml/client/istio/grpc/exceptions.py +++ b/python/hsml/client/istio/grpc/exceptions.py @@ -1,123 +1,34 @@ -# Copyright 2023 The KServe Authors. # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Copyright 2024 Hopsworks AB # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# coding: utf-8 - -# This implementation has been borrowed from kserve/kserve repository -# https://github.com/kserve/kserve/blob/release-0.11/python/kserve/kserve/exceptions.py - -import six - - -class OpenApiException(Exception): - """The base exception class for all OpenAPIExceptions""" - - -class ApiTypeError(OpenApiException, TypeError): - def __init__(self, msg, path_to_item=None, valid_classes=None, key_type=None): - """Raises an exception for TypeErrors - - Args: - msg (str): the exception message - - Keyword Args: - path_to_item (list): a list of keys an indices to get to the - current_item - None if unset - valid_classes (tuple): the primitive classes that current item - should be an instance of - None if unset - key_type (bool): False if our value is a value in a dict - True if it is a key in a dict - False if our item is an item in a list - None if unset - """ - self.path_to_item = path_to_item - self.valid_classes = valid_classes - self.key_type = key_type - full_msg = msg - if path_to_item: - full_msg = "{0} at {1}".format(msg, render_path(path_to_item)) - super(ApiTypeError, self).__init__(full_msg) - - -class ApiValueError(OpenApiException, ValueError): - def __init__(self, msg, path_to_item=None): - """ - Args: - msg (str): the exception message - - Keyword Args: - path_to_item (list) the path to the exception in the - received_data dict. None if unset - """ - - self.path_to_item = path_to_item - full_msg = msg - if path_to_item: - full_msg = "{0} at {1}".format(msg, render_path(path_to_item)) - super(ApiValueError, self).__init__(full_msg) - - -class ApiKeyError(OpenApiException, KeyError): - def __init__(self, msg, path_to_item=None): - """ - Args: - msg (str): the exception message - - Keyword Args: - path_to_item (None/list) the path to the exception in the - received_data dict - """ - self.path_to_item = path_to_item - full_msg = msg - if path_to_item: - full_msg = "{0} at {1}".format(msg, render_path(path_to_item)) - super(ApiKeyError, self).__init__(full_msg) - - -class ApiException(OpenApiException): - def __init__(self, status=None, reason=None, http_resp=None): - if http_resp: - self.status = http_resp.status - self.reason = http_resp.reason - self.body = http_resp.data - self.headers = http_resp.getheaders() - else: - self.status = status - self.reason = reason - self.body = None - self.headers = None - - def __str__(self): - """Custom error messages for exception""" - error_message = "({0})\n" "Reason: {1}\n".format(self.status, self.reason) - if self.headers: - error_message += "HTTP response headers: {0}\n".format(self.headers) - - if self.body: - error_message += "HTTP response body: {0}\n".format(self.body) - - return error_message - -def render_path(path_to_item): - """Returns a string representation of a path""" - result = "" - for pth in path_to_item: - if isinstance(pth, six.integer_types): - result += "[{0}]".format(pth) - else: - result += "['{0}']".format(pth) - return result +from hopsworks_common.client.istio.grpc.exceptions import ( + ApiException, + ApiKeyError, + ApiTypeError, + ApiValueError, + OpenApiException, + render_path, +) + + +__all__ = [ + ApiException, + ApiKeyError, + ApiTypeError, + ApiValueError, + OpenApiException, + render_path, +] diff --git a/python/hsml/client/istio/grpc/inference_client.py b/python/hsml/client/istio/grpc/inference_client.py index 3cc3164c5..e5de660b0 100644 --- a/python/hsml/client/istio/grpc/inference_client.py +++ b/python/hsml/client/istio/grpc/inference_client.py @@ -12,63 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# -import grpc -from hsml.client.istio.grpc.proto.grpc_predict_v2_pb2_grpc import ( - GRPCInferenceServiceStub, +from hopsworks_common.client.istio.grpc.inference_client import ( + GRPCInferenceServerClient, ) -from hsml.client.istio.utils.infer_type import InferRequest, InferResponse - - -class GRPCInferenceServerClient: - def __init__( - self, - url, - serving_api_key, - channel_args=None, - ): - if channel_args is not None: - channel_opt = channel_args - else: - channel_opt = [ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ] - - # Authentication is done via API Key in the Authorization header - self._channel = grpc.insecure_channel(url, options=channel_opt) - self._client_stub = GRPCInferenceServiceStub(self._channel) - self._serving_api_key = serving_api_key - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - self.close() - - def __del__(self): - """It is called during object garbage collection.""" - self.close() - - def close(self): - """Close the client. Future calls to server will result in an Error.""" - self._channel.close() - - def infer(self, infer_request: InferRequest, headers=None, client_timeout=None): - headers = {} if headers is None else headers - headers["authorization"] = "ApiKey " + self._serving_api_key - metadata = headers.items() - - # convert the InferRequest to a ModelInferRequest message - request = infer_request.to_grpc() - try: - # send request - model_infer_response = self._client_stub.ModelInfer( - request=request, metadata=metadata, timeout=client_timeout - ) - except grpc.RpcError as rpc_error: - raise rpc_error - # convert back the ModelInferResponse message to InferResponse - return InferResponse.from_grpc(model_infer_response) +__all__ = [ + GRPCInferenceServerClient, +] diff --git a/python/hsml/client/istio/hopsworks.py b/python/hsml/client/istio/hopsworks.py new file mode 100644 index 000000000..f84e1cb8b --- /dev/null +++ b/python/hsml/client/istio/hopsworks.py @@ -0,0 +1,24 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from hopsworks_common.client.istio.hopsworks import ( + Client, +) + + +__all__ = [ + Client, +] diff --git a/python/hsml/client/istio/utils/infer_type.py b/python/hsml/client/istio/utils/infer_type.py index e1dd2ab92..3a80272ba 100644 --- a/python/hsml/client/istio/utils/infer_type.py +++ b/python/hsml/client/istio/utils/infer_type.py @@ -1,812 +1,40 @@ -# Copyright 2023 The KServe Authors. # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Copyright 2024 Hopsworks AB # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This implementation has been borrowed from kserve/kserve repository -# https://github.com/kserve/kserve/blob/release-0.11/python/kserve/kserve/protocol/infer_type.py - -import struct -from typing import Dict, List, Optional -import numpy -import numpy as np -import pandas as pd -from hsml.client.istio.grpc.errors import InvalidInput -from hsml.client.istio.grpc.proto.grpc_predict_v2_pb2 import ( - InferTensorContents, - ModelInferRequest, - ModelInferResponse, +from hopsworks_common.client.istio.utils.infer_type import ( + GRPC_CONTENT_DATATYPE_MAPPINGS, + InferenceServerException, + InferInput, + InferOutput, + InferRequest, + InferResponse, + get_content, + raise_error, + serialize_byte_tensor, ) -from hsml.client.istio.utils.numpy_codec import from_np_dtype, to_np_dtype - - -GRPC_CONTENT_DATATYPE_MAPPINGS = { - "BOOL": "bool_contents", - "INT8": "int_contents", - "INT16": "int_contents", - "INT32": "int_contents", - "INT64": "int64_contents", - "UINT8": "uint_contents", - "UINT16": "uint_contents", - "UINT32": "uint_contents", - "UINT64": "uint64_contents", - "FP32": "fp32_contents", - "FP64": "fp64_contents", - "BYTES": "bytes_contents", -} - - -def raise_error(msg): - """ - Raise error with the provided message - """ - raise InferenceServerException(msg=msg) from None - - -def serialize_byte_tensor(input_tensor): - """ - Serializes a bytes tensor into a flat numpy array of length prepended - bytes. The numpy array should use dtype of np.object. For np.bytes, - numpy will remove trailing zeros at the end of byte sequence and because - of this it should be avoided. - - Parameters - ---------- - input_tensor : np.array - The bytes tensor to serialize. - - Returns - ------- - serialized_bytes_tensor : np.array - The 1-D numpy array of type uint8 containing the serialized bytes in row-major form. - - Raises - ------ - InferenceServerException - If unable to serialize the given tensor. - """ - - if input_tensor.size == 0: - return np.empty([0], dtype=np.object_) - - # If the input is a tensor of string/bytes objects, then must flatten those into - # a 1-dimensional array containing the 4-byte byte size followed by the - # actual element bytes. All elements are concatenated together in row-major - # order. - - if (input_tensor.dtype != np.object_) and (input_tensor.dtype.type != np.bytes_): - raise_error("cannot serialize bytes tensor: invalid datatype") - - flattened_ls = [] - # 'C' order is row-major. - for obj in np.nditer(input_tensor, flags=["refs_ok"], order="C"): - # If directly passing bytes to BYTES type, - # don't convert it to str as Python will encode the - # bytes which may distort the meaning - if input_tensor.dtype == np.object_: - if isinstance(obj.item(), bytes): - s = obj.item() - else: - s = str(obj.item()).encode("utf-8") - else: - s = obj.item() - flattened_ls.append(struct.pack(" np.ndarray: - dtype = to_np_dtype(self.datatype) - if dtype is None: - raise InvalidInput("invalid datatype in the input") - if self._raw_data is not None: - np_array = np.frombuffer(self._raw_data, dtype=dtype) - return np_array.reshape(self._shape) - else: - np_array = np.array(self._data, dtype=dtype) - return np_array.reshape(self._shape) - - def set_data_from_numpy(self, input_tensor, binary_data=True): - """Set the tensor data from the specified numpy array for - input associated with this object. - Parameters - ---------- - input_tensor : numpy array - The tensor data in numpy array format - binary_data : bool - Indicates whether to set data for the input in binary format - or explicit tensor within JSON. The default value is True, - which means the data will be delivered as binary data in the - HTTP body after the JSON object. - Raises - ------ - InferenceServerException - If failed to set data for the tensor. - """ - if not isinstance(input_tensor, (np.ndarray,)): - raise_error("input_tensor must be a numpy array") - - dtype = from_np_dtype(input_tensor.dtype) - if self._datatype != dtype: - raise_error( - "got unexpected datatype {} from numpy array, expected {}".format( - dtype, self._datatype - ) - ) - valid_shape = True - if len(self._shape) != len(input_tensor.shape): - valid_shape = False - else: - for i in range(len(self._shape)): - if self._shape[i] != input_tensor.shape[i]: - valid_shape = False - if not valid_shape: - raise_error( - "got unexpected numpy array shape [{}], expected [{}]".format( - str(input_tensor.shape)[1:-1], str(self._shape)[1:-1] - ) - ) - - if not binary_data: - self._parameters.pop("binary_data_size", None) - self._raw_data = None - if self._datatype == "BYTES": - self._data = [] - try: - if input_tensor.size > 0: - for obj in np.nditer( - input_tensor, flags=["refs_ok"], order="C" - ): - # We need to convert the object to string using utf-8, - # if we want to use the binary_data=False. JSON requires - # the input to be a UTF-8 string. - if input_tensor.dtype == np.object_: - if isinstance(obj.item(), bytes): - self._data.append(str(obj.item(), encoding="utf-8")) - else: - self._data.append(str(obj.item())) - else: - self._data.append(str(obj.item(), encoding="utf-8")) - except UnicodeDecodeError: - raise_error( - f'Failed to encode "{obj.item()}" using UTF-8. Please use binary_data=True, if' - " you want to pass a byte array." - ) - else: - self._data = [val.item() for val in input_tensor.flatten()] - else: - self._data = None - if self._datatype == "BYTES": - serialized_output = serialize_byte_tensor(input_tensor) - if serialized_output.size > 0: - self._raw_data = serialized_output.item() - else: - self._raw_data = b"" - else: - self._raw_data = input_tensor.tobytes() - self._parameters["binary_data_size"] = len(self._raw_data) - - -def get_content(datatype: str, data: InferTensorContents): - if datatype == "BOOL": - return list(data.bool_contents) - elif datatype in ["UINT8", "UINT16", "UINT32"]: - return list(data.uint_contents) - elif datatype == "UINT64": - return list(data.uint64_contents) - elif datatype in ["INT8", "INT16", "INT32"]: - return list(data.int_contents) - elif datatype == "INT64": - return list(data.int64_contents) - elif datatype == "FP32": - return list(data.fp32_contents) - elif datatype == "FP64": - return list(data.fp64_contents) - elif datatype == "BYTES": - return list(data.bytes_contents) - else: - raise InvalidInput("invalid content type") - - -class InferRequest: - """InferenceRequest Model - - $inference_request = - { - "id" : $string #optional, - "parameters" : $parameters #optional, - "inputs" : [ $request_input, ... ], - "outputs" : [ $request_output, ... ] #optional - } - """ - - id: Optional[str] - model_name: str - parameters: Optional[Dict] - inputs: List[InferInput] - from_grpc: bool - - def __init__( - self, - model_name: str, - infer_inputs: List[InferInput], - request_id=None, - raw_inputs=None, - from_grpc=False, - parameters=None, - ): - if parameters is None: - parameters = {} - self.id = request_id - self.model_name = model_name - self.inputs = infer_inputs - self.parameters = parameters - self.from_grpc = from_grpc - if raw_inputs: - for i, raw_input in enumerate(raw_inputs): - self.inputs[i]._raw_data = raw_input - - @classmethod - def from_grpc(cls, request: ModelInferRequest): - infer_inputs = [ - InferInput( - name=input_tensor.name, - shape=list(input_tensor.shape), - datatype=input_tensor.datatype, - data=get_content(input_tensor.datatype, input_tensor.contents), - parameters=input_tensor.parameters, - ) - for input_tensor in request.inputs - ] - return cls( - request_id=request.id, - model_name=request.model_name, - infer_inputs=infer_inputs, - raw_inputs=request.raw_input_contents, - from_grpc=True, - parameters=request.parameters, - ) - - def to_rest(self) -> Dict: - """Converts the InferRequest object to v2 REST InferenceRequest message""" - infer_inputs = [] - for infer_input in self.inputs: - infer_input_dict = { - "name": infer_input.name, - "shape": infer_input.shape, - "datatype": infer_input.datatype, - } - if isinstance(infer_input.data, numpy.ndarray): - infer_input.set_data_from_numpy(infer_input.data, binary_data=False) - infer_input_dict["data"] = infer_input.data - else: - infer_input_dict["data"] = infer_input.data - infer_inputs.append(infer_input_dict) - return {"id": self.id, "inputs": infer_inputs} - - def to_grpc(self) -> ModelInferRequest: - """Converts the InferRequest object to gRPC ModelInferRequest message""" - infer_inputs = [] - raw_input_contents = [] - for infer_input in self.inputs: - if isinstance(infer_input.data, numpy.ndarray): - infer_input.set_data_from_numpy(infer_input.data, binary_data=True) - infer_input_dict = { - "name": infer_input.name, - "shape": infer_input.shape, - "datatype": infer_input.datatype, - } - if infer_input._raw_data is not None: - raw_input_contents.append(infer_input._raw_data) - else: - if not isinstance(infer_input.data, List): - raise InvalidInput("input data is not a List") - infer_input_dict["contents"] = {} - data_key = GRPC_CONTENT_DATATYPE_MAPPINGS.get( - infer_input.datatype, None - ) - if data_key is not None: - infer_input._data = [ - bytes(val, "utf-8") if isinstance(val, str) else val - for val in infer_input.data - ] # str to byte conversion for grpc proto - infer_input_dict["contents"][data_key] = infer_input.data - else: - raise InvalidInput("invalid input datatype") - infer_inputs.append(infer_input_dict) - - return ModelInferRequest( - id=self.id, - model_name=self.model_name, - inputs=infer_inputs, - raw_input_contents=raw_input_contents, - ) - - def as_dataframe(self) -> pd.DataFrame: - """ - Decode the tensor inputs as pandas dataframe - """ - dfs = [] - for input in self.inputs: - input_data = input.data - if input.datatype == "BYTES": - input_data = [ - str(val, "utf-8") if isinstance(val, bytes) else val - for val in input.data - ] - dfs.append(pd.DataFrame(input_data, columns=[input.name])) - return pd.concat(dfs, axis=1) - - -class InferOutput: - def __init__(self, name, shape, datatype, data=None, parameters=None): - """An object of InferOutput class is used to describe - input tensor for an inference request. - Parameters - ---------- - name : str - The name of input whose data will be described by this object - shape : list - The shape of the associated input. - datatype : str - The datatype of the associated input. - data : Union[List, InferTensorContents] - The data of the REST/gRPC input. When data is not set, raw_data is used for gRPC for numpy array bytes. - parameters : dict - The additional server-specific parameters. - """ - if parameters is None: - parameters = {} - self._name = name - self._shape = shape - self._datatype = datatype - self._parameters = parameters - self._data = data - self._raw_data = None - - @property - def name(self): - """Get the name of input associated with this object. - Returns - ------- - str - The name of input - """ - return self._name - - @property - def datatype(self): - """Get the datatype of input associated with this object. - Returns - ------- - str - The datatype of input - """ - return self._datatype - - @property - def data(self): - """Get the data of InferOutput""" - return self._data - - @property - def shape(self): - """Get the shape of input associated with this object. - Returns - ------- - list - The shape of input - """ - return self._shape - - @property - def parameters(self): - """Get the parameters of input associated with this object. - Returns - ------- - dict - The key, value pair of string and InferParameter - """ - return self._parameters - - def set_shape(self, shape): - """Set the shape of input. - Parameters - ---------- - shape : list - The shape of the associated input. - """ - self._shape = shape - - def as_numpy(self) -> numpy.ndarray: - """ - Decode the tensor data as numpy array - """ - dtype = to_np_dtype(self.datatype) - if dtype is None: - raise InvalidInput("invalid datatype in the input") - if self._raw_data is not None: - np_array = np.frombuffer(self._raw_data, dtype=dtype) - return np_array.reshape(self._shape) - else: - np_array = np.array(self._data, dtype=dtype) - return np_array.reshape(self._shape) - - def set_data_from_numpy(self, input_tensor, binary_data=True): - """Set the tensor data from the specified numpy array for - input associated with this object. - Parameters - ---------- - input_tensor : numpy array - The tensor data in numpy array format - binary_data : bool - Indicates whether to set data for the input in binary format - or explicit tensor within JSON. The default value is True, - which means the data will be delivered as binary data in the - HTTP body after the JSON object. - Raises - ------ - InferenceServerException - If failed to set data for the tensor. - """ - if not isinstance(input_tensor, (np.ndarray,)): - raise_error("input_tensor must be a numpy array") - - dtype = from_np_dtype(input_tensor.dtype) - if self._datatype != dtype: - raise_error( - "got unexpected datatype {} from numpy array, expected {}".format( - dtype, self._datatype - ) - ) - valid_shape = True - if len(self._shape) != len(input_tensor.shape): - valid_shape = False - else: - for i in range(len(self._shape)): - if self._shape[i] != input_tensor.shape[i]: - valid_shape = False - if not valid_shape: - raise_error( - "got unexpected numpy array shape [{}], expected [{}]".format( - str(input_tensor.shape)[1:-1], str(self._shape)[1:-1] - ) - ) - - if not binary_data: - self._parameters.pop("binary_data_size", None) - self._raw_data = None - if self._datatype == "BYTES": - self._data = [] - try: - if input_tensor.size > 0: - for obj in np.nditer( - input_tensor, flags=["refs_ok"], order="C" - ): - # We need to convert the object to string using utf-8, - # if we want to use the binary_data=False. JSON requires - # the input to be a UTF-8 string. - if input_tensor.dtype == np.object_: - if isinstance(obj.item(), bytes): - self._data.append(str(obj.item(), encoding="utf-8")) - else: - self._data.append(str(obj.item())) - else: - self._data.append(str(obj.item(), encoding="utf-8")) - except UnicodeDecodeError: - raise_error( - f'Failed to encode "{obj.item()}" using UTF-8. Please use binary_data=True, if' - " you want to pass a byte array." - ) - else: - self._data = [val.item() for val in input_tensor.flatten()] - else: - self._data = None - if self._datatype == "BYTES": - serialized_output = serialize_byte_tensor(input_tensor) - if serialized_output.size > 0: - self._raw_data = serialized_output.item() - else: - self._raw_data = b"" - else: - self._raw_data = input_tensor.tobytes() - self._parameters["binary_data_size"] = len(self._raw_data) - - -class InferResponse: - """InferenceResponse - - $inference_response = - { - "model_name" : $string, - "model_version" : $string #optional, - "id" : $string, - "parameters" : $parameters #optional, - "outputs" : [ $response_output, ... ] - } - """ - - id: str - model_name: str - parameters: Optional[Dict] - outputs: List[InferOutput] - from_grpc: bool - - def __init__( - self, - response_id: str, - model_name: str, - infer_outputs: List[InferOutput], - raw_outputs=None, - from_grpc=False, - parameters=None, - ): - if parameters is None: - parameters = {} - self.id = response_id - self.model_name = model_name - self.outputs = infer_outputs - self.parameters = parameters - self.from_grpc = from_grpc - if raw_outputs: - for i, raw_output in enumerate(raw_outputs): - self.outputs[i]._raw_data = raw_output - - @classmethod - def from_grpc(cls, response: ModelInferResponse) -> "InferResponse": - infer_outputs = [ - InferOutput( - name=output.name, - shape=list(output.shape), - datatype=output.datatype, - data=get_content(output.datatype, output.contents), - parameters=output.parameters, - ) - for output in response.outputs - ] - return cls( - model_name=response.model_name, - response_id=response.id, - parameters=response.parameters, - infer_outputs=infer_outputs, - raw_outputs=response.raw_output_contents, - from_grpc=True, - ) - - @classmethod - def from_rest(cls, model_name: str, response: Dict) -> "InferResponse": - infer_outputs = [ - InferOutput( - name=output["name"], - shape=list(output["shape"]), - datatype=output["datatype"], - data=output["data"], - parameters=output.get("parameters", {}), - ) - for output in response["outputs"] - ] - return cls( - model_name=model_name, - response_id=response.get("id", None), - parameters=response.get("parameters", {}), - infer_outputs=infer_outputs, - ) - - def to_rest(self) -> Dict: - """Converts the InferResponse object to v2 REST InferenceRequest message""" - infer_outputs = [] - for infer_output in self.outputs: - infer_output_dict = { - "name": infer_output.name, - "shape": infer_output.shape, - "datatype": infer_output.datatype, - } - if isinstance(infer_output.data, numpy.ndarray): - infer_output.set_data_from_numpy(infer_output.data, binary_data=False) - infer_output_dict["data"] = infer_output.data - elif isinstance(infer_output._raw_data, bytes): - infer_output_dict["data"] = infer_output.as_numpy().tolist() - else: - infer_output_dict["data"] = infer_output.data - infer_outputs.append(infer_output_dict) - res = {"id": self.id, "model_name": self.model_name, "outputs": infer_outputs} - return res - def to_grpc(self) -> ModelInferResponse: - """Converts the InferResponse object to gRPC ModelInferRequest message""" - infer_outputs = [] - raw_output_contents = [] - for infer_output in self.outputs: - if isinstance(infer_output.data, numpy.ndarray): - infer_output.set_data_from_numpy(infer_output.data, binary_data=True) - infer_output_dict = { - "name": infer_output.name, - "shape": infer_output.shape, - "datatype": infer_output.datatype, - } - if infer_output._raw_data is not None: - raw_output_contents.append(infer_output._raw_data) - else: - if not isinstance(infer_output.data, List): - raise InvalidInput("output data is not a List") - infer_output_dict["contents"] = {} - data_key = GRPC_CONTENT_DATATYPE_MAPPINGS.get( - infer_output.datatype, None - ) - if data_key is not None: - infer_output._data = [ - bytes(val, "utf-8") if isinstance(val, str) else val - for val in infer_output.data - ] # str to byte conversion for grpc proto - infer_output_dict["contents"][data_key] = infer_output.data - else: - raise InvalidInput("to_grpc: invalid output datatype") - infer_outputs.append(infer_output_dict) - return ModelInferResponse( - id=self.id, - model_name=self.model_name, - outputs=infer_outputs, - raw_output_contents=raw_output_contents, - ) +__all__ = [ + GRPC_CONTENT_DATATYPE_MAPPINGS, + InferenceServerException, + InferInput, + InferOutput, + InferRequest, + InferResponse, + get_content, + raise_error, + serialize_byte_tensor, +] diff --git a/python/hsml/client/istio/utils/numpy_codec.py b/python/hsml/client/istio/utils/numpy_codec.py index bf22bcf34..2ddc39880 100644 --- a/python/hsml/client/istio/utils/numpy_codec.py +++ b/python/hsml/client/istio/utils/numpy_codec.py @@ -1,67 +1,26 @@ -# Copyright 2021 The KServe Authors. # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Copyright 2024 Hopsworks AB # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This implementation has been borrowed from kserve/kserve repository -# https://github.com/kserve/kserve/blob/release-0.11/python/kserve/kserve/utils/numpy_codec.py - -import numpy as np - -def to_np_dtype(dtype): - dtype_map = { - "BOOL": bool, - "INT8": np.int8, - "INT16": np.int16, - "INT32": np.int32, - "INT64": np.int64, - "UINT8": np.uint8, - "UINT16": np.uint16, - "UINT32": np.uint32, - "UINT64": np.uint64, - "FP16": np.float16, - "FP32": np.float32, - "FP64": np.float64, - "BYTES": np.object_, - } - return dtype_map.get(dtype, None) +from hopsworks_common.client.istio.utils.numpy_codec import ( + from_np_dtype, + to_np_dtype, +) -def from_np_dtype(np_dtype): - if np_dtype is bool: - return "BOOL" - elif np_dtype == np.int8: - return "INT8" - elif np_dtype == np.int16: - return "INT16" - elif np_dtype == np.int32: - return "INT32" - elif np_dtype == np.int64: - return "INT64" - elif np_dtype == np.uint8: - return "UINT8" - elif np_dtype == np.uint16: - return "UINT16" - elif np_dtype == np.uint32: - return "UINT32" - elif np_dtype == np.uint64: - return "UINT64" - elif np_dtype == np.float16: - return "FP16" - elif np_dtype == np.float32: - return "FP32" - elif np_dtype == np.float64: - return "FP64" - elif np_dtype == np.object_ or np_dtype.type == np.bytes_: - return "BYTES" - return None +__all__ = [ + from_np_dtype, + to_np_dtype, +] diff --git a/python/hsml/core/model_serving_api.py b/python/hsml/core/model_serving_api.py index 437327742..1a87b5878 100644 --- a/python/hsml/core/model_serving_api.py +++ b/python/hsml/core/model_serving_api.py @@ -57,7 +57,7 @@ def load_default_configuration(self): client.set_kserve_installed(is_kserve_installed) # istio client - self._set_istio_client_if_available() + self._istio_init_if_available() # resource limits max_resources = self._serving_api.get_resource_limits() @@ -71,26 +71,26 @@ def load_default_configuration(self): knative_domain = self._serving_api.get_knative_domain() client.set_knative_domain(knative_domain) - def _set_istio_client_if_available(self): - """Set istio client if available""" + def _istio_init_if_available(self): + """Initialize istio client if available""" if client.is_kserve_installed(): # check existing istio client try: - if client.get_istio_instance() is not None: + if client.istio.get_instance() is not None: return # istio client already set except Exception: pass # setup istio client inference_endpoints = self._serving_api.get_inference_endpoints() - if client.get_client_type() == "internal": + if isinstance(client.get_instance(), client.hopsworks.Client): # if internal, get node port endpoint = get_endpoint_by_type( inference_endpoints, INFERENCE_ENDPOINTS.ENDPOINT_TYPE_NODE ) if endpoint is not None: - client.set_istio_client( + client.istio.init( endpoint.get_any_host(), endpoint.get_port(INFERENCE_ENDPOINTS.PORT_NAME_HTTP).number, ) @@ -107,7 +107,7 @@ def _set_istio_client_if_available(self): if endpoint is not None: # if load balancer (external ip) available _client = client.get_instance() - client.set_istio_client( + client.istio.init( endpoint.get_any_host(), endpoint.get_port(INFERENCE_ENDPOINTS.PORT_NAME_HTTP).number, _client._project_name, @@ -125,7 +125,7 @@ def _set_istio_client_if_available(self): port = endpoint.get_port(INFERENCE_ENDPOINTS.PORT_NAME_HTTP).number if self._is_host_port_open(host, port): # and it is open - client.set_istio_client( + client.istio.init( host, port, _client._project_name, diff --git a/python/hsml/core/serving_api.py b/python/hsml/core/serving_api.py index 45f4e7fcc..c0e2a565d 100644 --- a/python/hsml/core/serving_api.py +++ b/python/hsml/core/serving_api.py @@ -253,7 +253,7 @@ def _send_inference_request_via_rest_protocol( _client._project_id, deployment_instance ) else: - _client = client.get_istio_instance() + _client = client.istio_get_instance() if _client is not None: # use istio client path_params = self._get_istio_inference_path(deployment_instance) @@ -302,7 +302,7 @@ def _send_inference_request_via_grpc_protocol( return infer_response.outputs def _create_grpc_channel(self, deployment_name: str): - _client = client.get_istio_instance() + _client = client.istio_get_instance() service_hostname = self._get_inference_request_host_header( _client._project_name, deployment_name,