Skip to content

Commit

Permalink
fix: add Merlin project as config to observation publisher (#563)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->
# Description
<!-- Briefly describe the motivation for the change. Please include
illustrations where appropriate. -->
This PR addresses two bugs found in the observation publisher:
1. The model id config field in observation publisher is wrongly assumed
to be in the form of (project)-(model id), instead of just (model id) .
This affect the bigquery table write location for BigQuerySink, and
arize model id for ArizeSink.
2. Prediction log from standard model does not contain the "columns"
field, but this is not handled by the observation publisher.

# Modifications
<!-- Summarize the key code changes. -->
- API service will now supply model project as an input to the deployer.
- Observation publisher will have a new config field, project. This is
going to be passed to the observation sink as well.
- Observation publisher will now use the feature_orders field in
inference schema when trying to construct the pandas dataframe from the
prediction log, if the field value is non null

# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [ ] Added PR label
- [ ] Added unit test, integration, and/or e2e tests
- [ ] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```
  • Loading branch information
khorshuheng authored Apr 2, 2024
1 parent ce2b447 commit 5f70094
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 21 deletions.
1 change: 1 addition & 0 deletions api/pkg/observability/deployment/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
)

type ConsumerConfig struct {
Project string `yaml:"project"`
ModelID string `yaml:"model_id"`
ModelVersion string `yaml:"model_version"`
InferenceSchema *models.SchemaSpec `yaml:"inference_schema"`
Expand Down
1 change: 1 addition & 0 deletions api/pkg/observability/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (c *deployer) applySecret(ctx context.Context, data *models.WorkerData) (se

func (c *deployer) createSecretSpec(data *models.WorkerData) (*corev1.Secret, error) {
consumerCfg := &ConsumerConfig{
Project: data.Project,
ModelID: data.ModelName,
ModelVersion: data.ModelVersion,
InferenceSchema: data.ModelSchemaSpec,
Expand Down
12 changes: 6 additions & 6 deletions api/pkg/observability/deployment/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func Test_deployer_Deploy(t *testing.T) {
},
},
StringData: map[string]string{
"config.yaml": "model_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
"config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
},
}}, nil, false)
deploymentAPI := clientSet.AppsV1().Deployments(namespace).(*fakeappsv1.FakeDeployments)
Expand Down Expand Up @@ -365,7 +365,7 @@ func Test_deployer_Deploy(t *testing.T) {
},
},
StringData: map[string]string{
"config.yaml": "model_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
"config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
},
}}, fmt.Errorf("deployment control plane is down"), false)
return clientSet
Expand Down Expand Up @@ -417,7 +417,7 @@ func Test_deployer_Deploy(t *testing.T) {
},
},
StringData: map[string]string{
"config.yaml": "model_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
"config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
},
}}, nil, false)
deploymentAPI := clientSet.AppsV1().Deployments(namespace).(*fakeappsv1.FakeDeployments)
Expand Down Expand Up @@ -485,7 +485,7 @@ func Test_deployer_Deploy(t *testing.T) {
},
},
StringData: map[string]string{
"config.yaml": "model_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
"config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
},
}, nil)
prependUpsertSecretReactor(t, secretAPI, []*corev1.Secret{
Expand All @@ -503,7 +503,7 @@ func Test_deployer_Deploy(t *testing.T) {
},
},
StringData: map[string]string{
"config.yaml": "model_id: model-1\nmodel_version: \"2\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-2-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
"config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"2\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-2-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
},
}}, nil, true)
deploymentAPI := clientSet.AppsV1().Deployments(namespace).(*fakeappsv1.FakeDeployments)
Expand Down Expand Up @@ -591,7 +591,7 @@ func Test_deployer_Deploy(t *testing.T) {
},
},
StringData: map[string]string{
"config.yaml": "model_id: model-1\nmodel_version: \"2\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-2-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
"config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"2\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-2-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n",
},
}, {
ObjectMeta: metav1.ObjectMeta{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
project: "test-project"
model_id: "test-model"
model_version: "0.1.0"
inference_schema:
Expand Down
1 change: 1 addition & 0 deletions python/observation-publisher/publisher/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def start_consumer(cfg: PublisherConfig) -> None:
observation_sinks = [
new_observation_sink(
sink_config=sink_config,
project=cfg.environment.project,
inference_schema=inference_schema,
model_id=cfg.environment.model_id,
model_version=cfg.environment.model_version,
Expand Down
1 change: 1 addition & 0 deletions python/observation-publisher/publisher/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ObservationSourceConfig:

@dataclass
class Environment:
project: str
model_id: str
model_version: str
inference_schema: dict
Expand Down
29 changes: 20 additions & 9 deletions python/observation-publisher/publisher/observation_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ class ObservationSink(abc.ABC):

def __init__(
self,
project: str,
inference_schema: InferenceSchema,
model_id: str,
model_version: str,
):
self._project = project
self._inference_schema = inference_schema
self._model_id = model_id
self._model_version = model_version
Expand Down Expand Up @@ -65,18 +67,20 @@ class ArizeSink(ObservationSink):

def __init__(
self,
project: str,
inference_schema: InferenceSchema,
model_id: str,
model_version: str,
arize_client: ArizeClient,
):
"""
:param project: CaraML project
:param inference_schema: Inference schema for the ingested model
:param model_id: Merlin model id
:param model_version: Merlin model version
:param arize_client: Arize Pandas Logger client
"""
super().__init__(inference_schema, model_id, model_version)
super().__init__(project, inference_schema, model_id, model_version)
self._client = arize_client

def _common_arize_schema_attributes(self) -> dict:
Expand Down Expand Up @@ -121,7 +125,7 @@ def write(self, df: pd.DataFrame):
dataframe=df,
environment=Environments.PRODUCTION,
schema=arize_schema,
model_id=self._model_id,
model_id=f"{self._project}-{self._model_id}",
model_type=model_type,
model_version=self._model_version,
)
Expand Down Expand Up @@ -176,27 +180,26 @@ class BigQuerySink(ObservationSink):

def __init__(
self,
project: str,
inference_schema: InferenceSchema,
model_id: str,
model_version: str,
config: BigQueryConfig,
):
"""
:param project: CaraML project
:param inference_schema: Inference schema for the ingested model
:param model_id: Merlin model id
:param model_version: Merlin model version
:param config: Configuration to write to bigquery sink
"""
super().__init__(inference_schema, model_id, model_version)
super().__init__(project, inference_schema, model_id, model_version)
self._client = BigQueryClient()
self._inference_schema = inference_schema
self._model_id = model_id
self._model_version = model_version
self._config = config
self._table = self.create_or_update_table()

@property
def project(self) -> str:
def bq_project(self) -> str:
return self._config.project

@property
Expand Down Expand Up @@ -275,10 +278,15 @@ def schema_fields(self) -> List[SchemaField]:

@property
def write_location(self) -> str:
table_name = f"prediction_log_{self._model_id}".replace("-", "_").replace(
"""
Returns the BigQuery table location to write the prediction logs, which will be unique
for each CaraML project / model pair. Different versions of a model share the same table.
:return:
"""
table_name = f"prediction_log_{self._project}_{self._model_id}".replace("-", "_").replace(
".", "_"
)
return f"{self.project}.{self.dataset}.{table_name}"
return f"{self.bq_project}.{self.dataset}.{table_name}"

def write(self, dataframe: pd.DataFrame):
for i in range(0, self.retry.retry_attempts + 1):
Expand Down Expand Up @@ -308,6 +316,7 @@ def write(self, dataframe: pd.DataFrame):

def new_observation_sink(
sink_config: ObservationSinkConfig,
project: str,
inference_schema: InferenceSchema,
model_id: str,
model_version: str,
Expand All @@ -317,6 +326,7 @@ def new_observation_sink(
bq_config: BigQueryConfig = BigQueryConfig.from_dict(sink_config.config) # type: ignore[attr-defined]

return BigQuerySink(
project=project,
inference_schema=inference_schema,
model_id=model_id,
model_version=model_version,
Expand All @@ -328,6 +338,7 @@ def new_observation_sink(
space_key=arize_config.space_key, api_key=arize_config.api_key
)
return ArizeSink(
project=project,
inference_schema=inference_schema,
model_id=model_id,
model_version=model_version,
Expand Down
Loading

0 comments on commit 5f70094

Please sign in to comment.