diff --git a/notebooks/5_ip_creating_deployments.ipynb b/notebooks/5_ip_creating_deployments.ipynb index 273bfb6..d17919b 100644 --- a/notebooks/5_ip_creating_deployments.ipynb +++ b/notebooks/5_ip_creating_deployments.ipynb @@ -260,7 +260,7 @@ "outputs": [], "source": [ "query_model_deployment = (\n", - " hopsworks_integration.two_tower_serving.HopsworksQueryModel.deploy(project=project)\n", + " hopsworks_integration.two_tower_serving.HopsworksQueryModel.deploy()\n", ")" ] }, diff --git a/notebooks/7_ip_creating_llm_ranking_deployment.ipynb b/notebooks/7_ip_creating_llm_ranking_deployment.ipynb index 28ecc5b..d4f2bab 100644 --- a/notebooks/7_ip_creating_llm_ranking_deployment.ipynb +++ b/notebooks/7_ip_creating_llm_ranking_deployment.ipynb @@ -248,9 +248,7 @@ } ], "source": [ - "ranking_deployment = hopsworks_integration.llm_ranking_serving.HopsworksLLMRankingModel.deploy(\n", - " project=project\n", - ")" + "ranking_deployment = hopsworks_integration.llm_ranking_serving.HopsworksLLMRankingModel.deploy()" ] }, { diff --git a/recsys/config.py b/recsys/config.py index 3031d09..0ca5ebb 100644 --- a/recsys/config.py +++ b/recsys/config.py @@ -43,7 +43,7 @@ class Settings(BaseSettings): RANKING_EARLY_STOPPING_ROUNDS: int = 5 # Inference - RANKING_MODEL_TYPE: str = "llmranking" + RANKING_MODEL_TYPE: str = "ranking" settings = Settings() diff --git a/recsys/hopsworks_integration/feature_store.py b/recsys/hopsworks_integration/feature_store.py index ae65473..85e7485 100644 --- a/recsys/hopsworks_integration/feature_store.py +++ b/recsys/hopsworks_integration/feature_store.py @@ -14,41 +14,12 @@ def get_feature_store(): project = hopsworks.login( api_key_value=settings.HOPSWORKS_API_KEY.get_secret_value() ) - _init_secrets() else: logger.info("Login to Hopsworks using cached API key.") project = hopsworks.login() return project, project.get_feature_store() -def get_secrets_api(): - connection = hopsworks.connection(host="c.app.hopsworks.ai", - hostname_verification=False, - port=443, - api_key_value=settings.HOPSWORKS_API_KEY.get_secret_value() - ) - return connection.get_secrets_api() - - -def _init_secrets(): - secrets_api = get_secrets_api() - secrets = secrets_api.get_secrets() - existing_secret_keys = [secret.name for secret in secrets] - # Create the OPENAI_API_KEY secret if it doesn't exist - if "OPENAI_API_KEY" not in existing_secret_keys: - secrets_api.create_secret( - "OPENAI_API_KEY", - settings.OPENAI_API_KEY.get_secret_value(), - project="recommandersystem" - ) - # Create the RANKING_MODEL_TYPE secret if it doesn't exist - if "RANKING_MODEL_TYPE" not in existing_secret_keys: - secrets_api.create_secret( - "RANKING_MODEL_TYPE", - settings.RANKING_MODEL_TYPE, - project="recommandersystem" - ) - ######################## #### Feature Groups #### ######################## diff --git a/recsys/hopsworks_integration/llm_ranking_serving.py b/recsys/hopsworks_integration/llm_ranking_serving.py index a58a685..a681f02 100644 --- a/recsys/hopsworks_integration/llm_ranking_serving.py +++ b/recsys/hopsworks_integration/llm_ranking_serving.py @@ -1,7 +1,7 @@ import os +import hopsworks from hsml.transformer import Transformer - from recsys.config import settings @@ -18,7 +18,12 @@ def register(cls, mr): ranking_model.save(local_model_path) @classmethod - def deploy(cls, project): + def deploy(cls): + # Prepare secrets used in the deployment + cls._prepare_secrets() + + project = hopsworks.login() + cls._prepare_environment(project) mr = project.get_model_registry() dataset_api = project.get_dataset_api() @@ -71,3 +76,42 @@ def deploy(cls, project): ) return ranking_deployment + + @classmethod + def _prepare_environment(cls, project): + # Upload requirements file to Hopsworks + dataset_api = project.get_dataset_api() + + requirements_path = dataset_api.upload( + str(settings.RECSYS_DIR / "hopsworks_integration" / "llm_ranker" / "requirements.txt"), + "Resources", + overwrite=True, + ) + + # Install the extra requirements in the Python environment on Hopsworks + env_api = project.get_environment_api() + env = env_api.get_environments()[0] + print(env) + env.install_requirements(requirements_path) + @classmethod + def _prepare_secrets(cls): + connection = hopsworks.connection(host="c.app.hopsworks.ai", + hostname_verification=False, + port=443, + api_key_value=settings.HOPSWORKS_API_KEY.get_secret_value() + ) + if not settings.OPENAI_API_KEY: + raise ValueError( + "Missing required secret: 'OPENAI_API_KEY'. Please ensure it is set in the .env file or config.py " + "settings.") + + secrets_api = connection.get_secrets_api() + secrets = secrets_api.get_secrets() + existing_secret_keys = [secret.name for secret in secrets] + # Create the OPENAI_API_KEY secret if it doesn't exist + if "OPENAI_API_KEY" not in existing_secret_keys: + secrets_api.create_secret( + "OPENAI_API_KEY", + settings.OPENAI_API_KEY.get_secret_value(), + project=settings.HOPSWORKS_PROJECT + ) diff --git a/recsys/hopsworks_integration/two_tower_serving.py b/recsys/hopsworks_integration/two_tower_serving.py index c6b90f8..f2a014f 100644 --- a/recsys/hopsworks_integration/two_tower_serving.py +++ b/recsys/hopsworks_integration/two_tower_serving.py @@ -1,5 +1,5 @@ import os - +import hopsworks from loguru import logger import tensorflow as tf from hsml.model_schema import ModelSchema @@ -89,7 +89,11 @@ def register(self, mr, query_df, emb_dim) -> None: mr_query_model.save(local_model_path) # Path to save the model @classmethod - def deploy(cls, project): + def deploy(cls): + # Prepare secrets used in the deployment + cls._prepare_secrets() + + project = hopsworks.login() mr = project.get_model_registry() dataset_api = project.get_dataset_api() @@ -128,6 +132,30 @@ def deploy(cls, project): return query_model_deployment + @classmethod + def _prepare_secrets(cls): + connection = hopsworks.connection(host="c.app.hopsworks.ai", + hostname_verification=False, + port=443, + api_key_value=settings.HOPSWORKS_API_KEY.get_secret_value() + ) + if not settings.RANKING_MODEL_TYPE: + raise ValueError( + "Missing required secret: 'RANKING_MODEL_TYPE'. Please ensure it is set in the .env file or config.py " + "settings.") + + secrets_api = connection.get_secrets_api() + secrets = secrets_api.get_secrets() + existing_secret_keys = [secret.name for secret in secrets] + + # Create the RANKING_MODEL_TYPE secret if it doesn't exist + if "RANKING_MODEL_TYPE" not in existing_secret_keys: + secrets_api.create_secret( + "RANKING_MODEL_TYPE", + settings.RANKING_MODEL_TYPE, + project=settings.HOPSWORKS_PROJECT + ) + class QueryModelModule(tf.Module): def __init__(self, model: QueryTower) -> None: diff --git a/recsys/inference/llm_ranking_predictor.py b/recsys/inference/llm_ranking_predictor.py index 2b70cf7..41d841c 100644 --- a/recsys/inference/llm_ranking_predictor.py +++ b/recsys/inference/llm_ranking_predictor.py @@ -1,4 +1,5 @@ import logging +import hopsworks from langchain import PromptTemplate, LLMChain from langchain_openai import ChatOpenAI from pydantic import BaseModel, ValidationError, Field @@ -46,13 +47,18 @@ def __init__(self): "graphical_appearance_name", "colour_group_name", "perceived_colour_value_name", "perceived_colour_master_name", "department_name", "index_name", "index_group_name", "section_name", "garment_group_name"] + self._retrieve_secrets() self.llm = self._build_lang_chain() + def _retrieve_secrets(self): + secrets_api = hopsworks.connection().get_secrets_api() + self.openai_api_key = secrets_api.get_secret("OPENAI_API_KEY") + def predict(self, inputs): logging.info(f"✅ Inputs: {inputs}") - # Extract ranking features and article IDs from the inputs - # limit to 20 candidates because of OPENAI token requests limitations + # Extract ranking features and article IDs from the inputs limit to 20 candidates because otherwise the + # inference time is over 60 seconds and the predict endpoint closes the socket features = inputs[0].pop("ranking_features")[:20] article_ids = inputs[0].pop("article_ids")[:20] @@ -114,11 +120,10 @@ def _postprocess_output(self, output): def _build_lang_chain(self): - # todo get from secrets api model = ChatOpenAI( model_name='gpt-4o-mini-2024-07-18', temperature=0.7, - openai_api_key="OPENAI_KEY", + openai_api_key=self.openai_api_key, ) prompt = PromptTemplate( input_variables=self.input_variables, diff --git a/recsys/inference/query_transformer.py b/recsys/inference/query_transformer.py index 4640ca7..2fe2517 100644 --- a/recsys/inference/query_transformer.py +++ b/recsys/inference/query_transformer.py @@ -1,9 +1,9 @@ +import logging from datetime import datetime import hopsworks -import numpy as np - import nest_asyncio + nest_asyncio.apply() import pandas as pd @@ -12,6 +12,7 @@ def __init__(self) -> None: # Connect to the Hopsworks project = hopsworks.connection().get_project() ms = project.get_model_serving() + self._retrieve_secrets() # Retrieve the 'customers' feature view fs = project.get_feature_store() @@ -25,8 +26,16 @@ def __init__(self) -> None: self.ranking_fv.init_batch_scoring(1) # Retrieve the ranking deployment - # todo retrieve from secrets ranking or llmranking according to ranker used - self.ranking_server = ms.get_deployment("ranking") + self.ranking_server = ms.get_deployment(self.ranking_model_type) + + def _retrieve_secrets(self): + secrets_api = hopsworks.connection().get_secrets_api() + try: + self.ranking_model_type = secrets_api.get_secret("RANKING_MODEL_TYPE") + except Exception as e: + logging.error(e) + logging.error("Could not retrieve secret RANKING_MODEL_TYPE, defaulting to ranker") + self.ranking_model_type = "ranking" def preprocess(self, inputs): # Check if the input data contains a key named "instances"