From 1381a0b915d98b1249b6cfe97bfa9012c46575e7 Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Wed, 3 Apr 2024 16:33:56 +0800 Subject: [PATCH] bug: fix publisher failure when prediction output is in one dimensional format (#565) # Description Standard model prediction output can be either in one dimensional or two dimensional format. Currently, the publisher assumes that the output is only in two dimensional format, which is not the case. # Modifications The parser will now parse both one dimensional and two dimensional prediction output correctly. # Tests # Checklist - [ ] Added PR label - [ ] Added unit test, integration, and/or e2e tests - [ ] Tested locally - [ ] Updated documentation - [ ] Update Swagger spec if the PR introduce API changes - [ ] Regenerated Golang and Python client if the PR introduces API changes # Release Notes ```release-note ``` --- .../publisher/prediction_log_consumer.py | 41 ++++++++++++ .../publisher/prediction_log_parser.py | 40 +++++++++++- .../tests/test_prediction_log_consumer.py | 65 ++++++++++++++++++- 3 files changed, 142 insertions(+), 4 deletions(-) diff --git a/python/observation-publisher/publisher/prediction_log_consumer.py b/python/observation-publisher/publisher/prediction_log_consumer.py index a29d639a8..0f9f2a80e 100644 --- a/python/observation-publisher/publisher/prediction_log_consumer.py +++ b/python/observation-publisher/publisher/prediction_log_consumer.py @@ -21,20 +21,35 @@ class PredictionLogConsumer(abc.ABC): + """ + Abstract class for consuming prediction logs from a streaming source, then write to one or multiple sinks + """ def __init__(self, buffer_capacity: int, buffer_max_duration_seconds: int): self.buffer_capacity = buffer_capacity self.buffer_max_duration_seconds = buffer_max_duration_seconds @abc.abstractmethod def poll_new_logs(self) -> List[PredictionLog]: + """ + Poll new logs from the source + :return: + """ raise NotImplementedError @abc.abstractmethod def commit(self): + """ + Commit the current offset after the logs have been written to all the sinks + :return: + """ raise NotImplementedError @abc.abstractmethod def close(self): + """ + Clean up the resources when the polling process run into error unexpectedly. + :return: + """ raise NotImplementedError def start_polling( @@ -43,6 +58,13 @@ def start_polling( inference_schema: InferenceSchema, model_version: str, ): + """ + Start polling new logs from the source, then write to the sinks. The prediction logs are written to each sink asynchronously. + :param observation_sinks: + :param inference_schema: + :param model_version: + :return: + """ try: buffered_logs = [] buffer_start_time = datetime.now() @@ -155,6 +177,11 @@ def new_consumer(config: ObservationSourceConfig) -> PredictionLogConsumer: def parse_message_to_prediction_log(msg: str) -> PredictionLog: + """ + Parse the message from the Kafka consumer to a PredictionLog object + :param msg: + :return: + """ log = PredictionLog() log.ParseFromString(msg) return log @@ -163,6 +190,13 @@ def parse_message_to_prediction_log(msg: str) -> PredictionLog: def log_to_records( log: PredictionLog, inference_schema: InferenceSchema, model_version: str ) -> Tuple[List[List[np.int64 | np.float64 | np.bool_ | np.str_]], List[str]]: + """ + Convert a PredictionLog object to a list of records and column names + :param log: Prediction log. + :param inference_schema: Inference schema. + :param model_version: Model version. + :return: + """ request_timestamp = log.request_timestamp.ToDatetime() feature_table = PredictionLogFeatureTable.from_struct( log.input.features_table, inference_schema @@ -199,6 +233,13 @@ def log_to_records( def log_batch_to_dataframe( logs: List[PredictionLog], inference_schema: InferenceSchema, model_version: str ) -> pd.DataFrame: + """ + Combines several logs into a single DataFrame + :param logs: List of prediction logs. + :param inference_schema: Inference schema. + :param model_version: Model version. + :return: + """ combined_records = [] column_names: List[str] = [] for log in logs: diff --git a/python/observation-publisher/publisher/prediction_log_parser.py b/python/observation-publisher/publisher/prediction_log_parser.py index cfdee43ef..12893cb49 100644 --- a/python/observation-publisher/publisher/prediction_log_parser.py +++ b/python/observation-publisher/publisher/prediction_log_parser.py @@ -21,6 +21,12 @@ class PredictionLogFeatureTable: def from_struct( cls, table_struct: Struct, inference_schema: InferenceSchema ) -> Self: + """ + Create a PredictionLogFeatureTable object from a Protobuf Struct object + :param table_struct: A Protobuf Struct object that represents a feature table. + :param inference_schema: Model inference schema. + :return: Instance of PredictionLogFeatureTable. + """ if inference_schema.feature_orders is not None: columns = inference_schema.feature_orders else: @@ -36,6 +42,11 @@ def from_struct( def prediction_columns(inference_schema: InferenceSchema) -> List[str]: + """ + Get the column name for the prediction output + :param inference_schema: Model inference schema + :return: List of column names + """ if isinstance(inference_schema.model_prediction_output, BinaryClassificationOutput): return [inference_schema.model_prediction_output.prediction_score_column] elif isinstance(inference_schema.model_prediction_output, RankingOutput): @@ -56,6 +67,12 @@ class PredictionLogResultsTable: def from_struct( cls, table_struct: Struct, inference_schema: InferenceSchema ) -> Self: + """ + Create a PredictionLogResultsTable object from a Protobuf Struct object + :param table_struct: Protobuf Struct object that represents a prediction result table. + :param inference_schema: Model InferenceSchema. + :return: PredictionLogResultsTable instnace. + """ if "columns" in table_struct.keys(): assert isinstance(table_struct["columns"], ListValue) columns = list_value_as_string_list(table_struct["columns"]) @@ -102,6 +119,9 @@ def convert_to_numpy_value( def list_value_as_string_list(list_value: ListValue) -> List[str]: + """ + Convert protobuf string list to it's native python type counterpart. + """ string_list: List[str] = [] for v in list_value.items(): assert isinstance(v, str) @@ -110,10 +130,19 @@ def list_value_as_string_list(list_value: ListValue) -> List[str]: def list_value_as_rows(list_value: ListValue) -> List[ListValue]: + """ + Convert a ListValue object to a list of ListValue objects + :param list_value: ListValue object representing a two dimensional matrix or a vector. + :return: List of ListValue objects, representing a two dimensional matrix. + """ rows: List[ListValue] = [] for d in list_value.items(): - assert isinstance(d, ListValue) - rows.append(d) + if isinstance(d, ListValue): + rows.append(d) + else: + nd = ListValue() + nd.append(d) + rows.append(nd) return rows @@ -121,6 +150,13 @@ def list_value_as_rows(list_value: ListValue) -> List[ListValue]: def list_value_as_numpy_list( list_value: ListValue, column_names: List[str], column_types: Dict[str, ValueType] ) -> List[np.int64 | np.float64 | np.bool_ | np.str_]: + """ + Convert a ListValue representing a row, to it's native python type counterpart. + :param list_value: ListValue object representing a row. + :param column_names: Column names corresponds to each column in a row. + :param column_types: Map of column name to type. + :return: List of numpy types. + """ column_values: List[int | str | float | bool | None] = [] for v in list_value.items(): assert isinstance(v, (int, str, float, bool, NoneType)) diff --git a/python/observation-publisher/tests/test_prediction_log_consumer.py b/python/observation-publisher/tests/test_prediction_log_consumer.py index 326a0be79..23d13112d 100644 --- a/python/observation-publisher/tests/test_prediction_log_consumer.py +++ b/python/observation-publisher/tests/test_prediction_log_consumer.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Any, List +from typing import Any, List, Union import numpy as np import pandas as pd @@ -57,7 +57,7 @@ def new_standard_model_log( session_id: str, row_ids: List[str], input_data: List[List[Any]], - output_data: List[List[Any]], + output_data: Union[List[List[Any]], List[Any]], request_timestamp: datetime, ): prediction_log = PredictionLog() @@ -303,3 +303,64 @@ def test_standard_model_log_to_dataframe(): ], ) assert_frame_equal(prediction_logs_df, expected_df, check_like=True) + + +def test_one_dimensional_prediction_output_to_dataframe(): + model_id = "test_model" + model_version = "0.1.0" + inference_schema = InferenceSchema( + feature_types={ + "acceptance_rate": ValueType.FLOAT64, + "minutes_since_last_order": ValueType.INT64, + "service_type": ValueType.STRING, + }, + feature_orders=["acceptance_rate", "minutes_since_last_order", "service_type"], + model_prediction_output=BinaryClassificationOutput( + prediction_score_column="prediction_score", + actual_score_column="actual_score", + positive_class_label="fraud", + negative_class_label="non fraud", + score_threshold=0.5, + ), + session_id_column="order_id", + row_id_column="driver_id" + ) + request_timestamp = datetime(2021, 1, 1, 0, 0, 0) + prediction_logs = [ + new_standard_model_log( + session_id="1234", + model_id=model_id, + model_version=model_version, + input_data=[ + [0.8, 24, "FOOD"], + [0.5, 2, "RIDE"], + ], + output_data=[ + 0.9, + 0.5, + ], + request_timestamp=request_timestamp, + row_ids=["a", "b"], + ), + ] + prediction_logs_df = log_batch_to_dataframe( + prediction_logs, inference_schema, model_version + ) + expected_df = pd.DataFrame.from_records( + [ + [0.8, 24, "FOOD", 0.9, "fraud", "1234", "a", request_timestamp, model_version], + [0.5, 2, "RIDE", 0.5, "fraud", "1234", "b", request_timestamp, model_version], + ], + columns=[ + "acceptance_rate", + "minutes_since_last_order", + "service_type", + "prediction_score", + "_prediction_label", + "order_id", + "driver_id", + "request_timestamp", + "model_version", + ], + ) + assert_frame_equal(prediction_logs_df, expected_df, check_like=True)