Skip to content

Commit

Permalink
[FSTORE-1479] Deduplicate hsml client (#243)
Browse files Browse the repository at this point in the history
* Move hsml/client/istio

* Adapt the moved hsml client

* Fix istio.__init__

* Fix client.__init__

* Fix hsml client so that it is possible to do hsml.connection(...)

* Add aliases to hsml.client
  • Loading branch information
aversey authored Jul 26, 2024
1 parent 85105f5 commit 7de0e4f
Show file tree
Hide file tree
Showing 39 changed files with 1,705 additions and 1,891 deletions.
74 changes: 70 additions & 4 deletions python/hopsworks_common/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

from typing import Literal, Optional, Union

from hopsworks_common.client import external, hopsworks
from hopsworks_common.client import external, hopsworks, istio
from hopsworks_common.constants import HOSTS


_client: Union[hopsworks.Client, external.Client, None] = None
Expand Down Expand Up @@ -62,13 +63,78 @@ def init(

def get_instance() -> Union[hopsworks.Client, external.Client]:
global _client
if _client:
return _client
raise Exception("Couldn't find client. Try reconnecting to Hopsworks.")
if not _client:
raise Exception("Couldn't find client. Try reconnecting to Hopsworks.")
return _client


def stop() -> None:
global _client
if _client:
_client._close()
_client = None
if istio._client:
istio._client._close()
istio._client = None


def is_saas_connection() -> bool:
return get_instance()._host == HOSTS.APP_HOST


_kserve_installed = None


def set_kserve_installed(kserve_installed):
global _kserve_installed
_kserve_installed = kserve_installed


def is_kserve_installed() -> bool:
global _kserve_installed
return _kserve_installed


_serving_resource_limits = None


def set_serving_resource_limits(max_resources):
global _serving_resource_limits
_serving_resource_limits = max_resources


def get_serving_resource_limits():
global _serving_resource_limits
return _serving_resource_limits


_serving_num_instances_limits = None


def set_serving_num_instances_limits(num_instances_range):
global _serving_num_instances_limits
_serving_num_instances_limits = num_instances_range


def get_serving_num_instances_limits():
global _serving_num_instances_limits
return _serving_num_instances_limits


def is_scale_to_zero_required():
# scale-to-zero is required for KServe deployments if the Hopsworks variable `kube_serving_min_num_instances`
# is set to 0. Other possible values are -1 (unlimited num instances) or >1 num instances.
return get_serving_num_instances_limits()[0] == 0


_knative_domain = None


def get_knative_domain():
global _knative_domain
return _knative_domain


def set_knative_domain(knative_domain):
global _knative_domain
_knative_domain = knative_domain
25 changes: 25 additions & 0 deletions python/hopsworks_common/client/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

from __future__ import annotations

import os

import requests
from hopsworks_common.client import exceptions


class BearerAuth(requests.auth.AuthBase):
Expand Down Expand Up @@ -50,3 +53,25 @@ def __init__(self, token):
def __call__(self, r):
r.headers["X-API-KEY"] = self._token
return r


def get_api_key(api_key_value, api_key_file):
if api_key_value is not None:
return api_key_value
elif api_key_file is not None:
file = None
if os.path.exists(api_key_file):
try:
file = open(api_key_file, mode="r")
return file.read()
finally:
file.close()
else:
raise IOError(
"Could not find api key file on path: {}".format(api_key_file)
)
else:
raise exceptions.ExternalClientError(
"Either api_key_file or api_key_value must be set when connecting to"
" hopsworks from an external environment."
)
31 changes: 31 additions & 0 deletions python/hopsworks_common/client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
class RestAPIError(Exception):
"""REST Exception encapsulating the response object and url."""

STATUS_CODE_BAD_REQUEST = 400
STATUS_CODE_UNAUTHORIZED = 401
STATUS_CODE_FORBIDDEN = 403
STATUS_CODE_NOT_FOUND = 404
STATUS_CODE_INTERNAL_SERVER_ERROR = 500

class FeatureStoreErrorCode(int, Enum):
FEATURE_GROUP_COMMIT_NOT_FOUND = 270227
STATISTICS_NOT_FOUND = 270228
Expand Down Expand Up @@ -111,6 +117,17 @@ def __init__(self, missing_argument: str) -> None:
super().__init__(message)


class HopsworksClientError(TypeError):
"""Raised when hopsworks internal client cannot be initialized due to missing arguments."""

def __init__(self, missing_argument):
message = (
"{0} cannot be of type NoneType, {0} is a non-optional "
"argument to connect to hopsworks from an internal environment."
).format(missing_argument)
super().__init__(message)


class GitException(Exception):
"""Generic git exception"""

Expand Down Expand Up @@ -141,3 +158,17 @@ class OpenSearchException(Exception):

class JobExecutionException(Exception):
"""Generic job executions exception"""


class ModelRegistryException(Exception):
"""Generic model registry exception"""


class ModelServingException(Exception):
"""Generic model serving exception"""

ERROR_CODE_SERVING_NOT_FOUND = 240000
ERROR_CODE_ILLEGAL_ARGUMENT = 240001
ERROR_CODE_DUPLICATED_ENTRY = 240011

ERROR_CODE_DEPLOYMENT_NOT_RUNNING = 250001
41 changes: 41 additions & 0 deletions python/hopsworks_common/client/istio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Copyright 2022 Logical Clocks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import annotations

from typing import Union

import hopsworks_common.client as _main
from hopsworks_common.client.istio import external, hopsworks


_client: Union[hopsworks.Client, external.Client, None] = None


def init(host, port, project=None, api_key_value=None):
global _client

if _client:
return
if isinstance(_main._client, _main.hopsworks.Client):
_client = hopsworks.Client(host, port)
elif isinstance(_main, _main.external.Client):
_client = external.Client(host, port, project, api_key_value)


def get_instance() -> Union[hopsworks.Client, external.Client, None]:
global _client
return _client
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,33 @@
import os
from abc import abstractmethod

from hsml.client import auth, base
from hopsworks_common.client import base
from hopsworks_common.client.istio.grpc.inference_client import (
GRPCInferenceServerClient,
)


class Client(base.Client):
TOKEN_FILE = "token.jwt"
APIKEY_FILE = "api.key"
REST_ENDPOINT = "REST_ENDPOINT"
SERVING_API_KEY = "SERVING_API_KEY"
HOPSWORKS_PUBLIC_HOST = "HOPSWORKS_PUBLIC_HOST"
TOKEN_EXPIRED_MAX_RETRIES = 0

BASE_PATH_PARAMS = ["hopsworks-api", "api"]
BASE_PATH_PARAMS = []

@abstractmethod
def __init__(self):
"""To be extended by clients."""
"""To be implemented by clients."""
pass

def _get_verify(self, verify, trust_store_path):
"""Get verification method for sending HTTP requests to Hopsworks.
"""Get verification method for sending inference requests to Istio.
Credit to https://gist.github.com/gdamjan/55a8b9eec6cf7b771f92021d93b87b2c
:param verify: perform hostname verification, 'true' or 'false'
:type verify: str
:param trust_store_path: path of the truststore locally if it was uploaded manually to
the external environment
the external environment such as EKS or AKS
:type trust_store_path: str
:return: if verify is true and the truststore is provided, then return the trust store location
if verify is true but the truststore wasn't provided, then return true
Expand All @@ -56,23 +58,6 @@ def _get_verify(self, verify, trust_store_path):

return False

def _get_retry(self, request, response):
"""Get retry method for resending HTTP requests to Hopsworks
:param request: original HTTP request already sent
:type request: requests.Request
:param response: response of the original HTTP request
:type response: requests.Response
"""
if response.status_code == 401 and self.REST_ENDPOINT in os.environ:
# refresh token and retry request - only on hopsworks
self._auth = auth.BearerAuth(self._read_jwt())
# Update request with the new token
request.auth = self._auth
# retry request
return True
return False

def _get_host_port_pair(self):
"""
Removes "http or https" from the rest endpoint and returns a list
Expand All @@ -88,19 +73,6 @@ def _get_host_port_pair(self):
host, port = endpoint.split(":")
return host, port

def _read_jwt(self):
"""Retrieve jwt from local container."""
return self._read_file(self.TOKEN_FILE)

def _read_apikey(self):
"""Retrieve apikey from local container."""
return self._read_file(self.APIKEY_FILE)

def _read_file(self, secret_file):
"""Retrieve secret from local container."""
with open(os.path.join(self._secrets_dir, secret_file), "r") as secret:
return secret.read()

def _close(self):
"""Closes a client. Can be implemented for clean up purposes, not mandatory."""
self._connected = False
Expand All @@ -109,3 +81,10 @@ def _replace_public_host(self, url):
"""replace hostname to public hostname set in HOPSWORKS_PUBLIC_HOST"""
ui_url = url._replace(netloc=os.environ[self.HOPSWORKS_PUBLIC_HOST])
return ui_url

def _create_grpc_channel(self, service_hostname: str) -> GRPCInferenceServerClient:
return GRPCInferenceServerClient(
url=self._host + ":" + str(self._port),
channel_args=(("grpc.ssl_target_name_override", service_hostname),),
serving_api_key=self._auth._token,
)
56 changes: 56 additions & 0 deletions python/hopsworks_common/client/istio/external.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# Copyright 2022 Logical Clocks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import requests
from hopsworks_common.client import auth
from hopsworks_common.client.istio import base as istio


class Client(istio.Client):
def __init__(
self,
host,
port,
project,
api_key_value,
hostname_verification=None,
trust_store_path=None,
):
"""Initializes a client in an external environment such as AWS Sagemaker."""
self._host = host
self._port = port
self._base_url = "http://" + self._host + ":" + str(self._port)
self._project_name = project

self._auth = auth.ApiKeyAuth(api_key_value)

self._session = requests.session()
self._connected = True
self._verify = self._get_verify(hostname_verification, trust_store_path)

self._cert_key = None

def _close(self):
"""Closes a client."""
self._connected = False

def _replace_public_host(self, url):
"""no need to replace as we are already in external client"""
return url

@property
def host(self):
return self._host
Loading

0 comments on commit 7de0e4f

Please sign in to comment.