diff --git a/ads/feature_store/common/enums.py b/ads/feature_store/common/enums.py index b3f55c19a..58c329151 100644 --- a/ads/feature_store/common/enums.py +++ b/ads/feature_store/common/enums.py @@ -67,6 +67,35 @@ class IngestionMode(Enum): DEFAULT = "DEFAULT" UPSERT = "UPSERT" +class StreamIngestionMode(Enum): + """ + Enumeration for stream ingestion modes. + + - `COMPLETE`: Represents complete stream ingestion where the entire dataset is replaced. + - `APPEND`: Represents appending new data to the existing dataset. + - `UPDATE`: Represents updating existing data in the dataset. + """ + COMPLETE = "COMPLETE" + APPEND = "APPEND" + UPDATE = "UPDATE" + +class StreamingIngestionMode(Enum): + """ + An enumeration that represents the supported Ingestion Mode in feature store. + + Attributes: + OVERWRITE (str): Ingestion mode to overwrite the data in the system. + APPEND (str): Ingestion mode to append the data in the system. + UPSERT (str): Ingestion mode to insert and update the data in the system. + + Methods: + None + """ + + APPEND = "APPEND" + DEFAULT = "DEFAULT" + UPSERT = "UPSERT" + class JoinType(Enum): """Enumeration of supported SQL join types. diff --git a/ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py b/ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py index 361815b6d..e47912a38 100644 --- a/ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py +++ b/ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py @@ -57,6 +57,18 @@ def write_dataframe_to_delta_lake( None. """ logger.info(f"target table name {target_table_name}") + # query = ( + # dataflow_output.writeStream.outputMode("append") + # .format("delta") + # .option( + # "checkpointLocation", + # "/Users/yogeshkumawat/Desktop/Github-Oracle/accelerated-data-science/TestYogi/streaming", + # ) + # .toTable(target_table_name) + # ) + # + # query.awaitTermination() + if ( self.spark_engine.is_delta_table_exists(target_table_name) and ingestion_mode.upper() == IngestionMode.UPSERT.value @@ -341,3 +353,33 @@ def __get_insert_update_query_expression(feature_data_source_columns, table_name logger.info(f"get_insert_update_query_expression {feature_data_update_set}") return feature_data_update_set + + def write_stream_dataframe_to_delta_lake( + self, + stream_dataframe, + target_table, + output_mode, + query_name, + await_termination, + timeout, + checkpoint_dir, + feature_option_details, + ): + query = ( + stream_dataframe + .writeStream. + outputMode(output_mode) + .format("delta") + .option( + "checkpointLocation", + checkpoint_dir, + ) + .options(self.get_delta_write_config(feature_option_details)) + .queryName(query_name) + .toTable(target_table) + ) + + if await_termination: + query.awaitTermination(timeout) + + return query diff --git a/ads/feature_store/execution_strategy/engine/spark_engine.py b/ads/feature_store/execution_strategy/engine/spark_engine.py index 77b2b3747..0a540bdb8 100644 --- a/ads/feature_store/execution_strategy/engine/spark_engine.py +++ b/ads/feature_store/execution_strategy/engine/spark_engine.py @@ -8,6 +8,7 @@ from datetime import datetime from ads.common.decorator.runtime_dependency import OptionalDependency +from ads.feature_store.common.utils.utility import get_schema_from_spark_dataframe, get_schema_from_spark_df try: from pyspark.sql import SparkSession @@ -42,10 +43,10 @@ def __init__(self, metastore_id: str = None, spark_session: SparkSession = None) ) def get_time_version_data( - self, - delta_table_name: str, - version_number: int = None, - timestamp: datetime = None, + self, + delta_table_name: str, + version_number: int = None, + timestamp: datetime = None, ): split_db_name = delta_table_name.split(".") @@ -103,10 +104,10 @@ def _read_delta_table(self, delta_table_path: str, read_options: Dict): return df def sql( - self, - query: str, - dataframe_type: DataFrameType = DataFrameType.SPARK, - is_online: bool = False, + self, + query: str, + dataframe_type: DataFrameType = DataFrameType.SPARK, + is_online: bool = False, ): """Execute SQL command on the offline or online feature store database @@ -186,19 +187,27 @@ def get_tables_from_database(self, database): return permanent_tables - def get_columns_from_table(self, table_name: str): + def get_output_columns_from_table_or_dataframe(self, table_name: str = None, dataframe=None): """Returns the column(features) along with type from the given table. Args: table_name(str): A string specifying the name of table name for which columns should be returned. + dataframe: Dataframe containing the transformed dataframe. Returns: List[{"name": "","featureType": ""}] Returns the List of dictionary of column with name and type from the given table. + """ + if table_name is None and dataframe is None: + raise ValueError("Either 'table_name' or 'dataframe' must be provided to retrieve output columns.") + + if dataframe is not None: + feature_data_target = dataframe + else: + feature_data_target = self.spark.sql(f"SELECT * FROM {table_name} LIMIT 1") target_table_columns = [] - feature_data_target = self.spark.sql(f"SELECT * FROM {table_name} LIMIT 1") for field in feature_data_target.schema.fields: target_table_columns.append( diff --git a/ads/feature_store/execution_strategy/execution_strategy.py b/ads/feature_store/execution_strategy/execution_strategy.py index 66650e58e..7d9c5b7e5 100644 --- a/ads/feature_store/execution_strategy/execution_strategy.py +++ b/ads/feature_store/execution_strategy/execution_strategy.py @@ -42,6 +42,19 @@ def ingest_feature_definition( """ pass + @abstractmethod + def ingest_feature_definition_stream( + self, + feature_group, + feature_group_job: FeatureGroupJob, + dataframe, + query_name, + await_termination, + timeout, + checkpoint_dir, + ): + pass + @abstractmethod def ingest_dataset(self, dataset, dataset_job: DatasetJob): """ diff --git a/ads/feature_store/execution_strategy/spark/spark_execution.py b/ads/feature_store/execution_strategy/spark/spark_execution.py index caa74dd46..1e88de6e6 100644 --- a/ads/feature_store/execution_strategy/spark/spark_execution.py +++ b/ads/feature_store/execution_strategy/spark/spark_execution.py @@ -77,16 +77,39 @@ def __init__(self, metastore_id: str = None): self._jvm = self._spark_context._jvm def ingest_feature_definition( - self, - feature_group: "FeatureGroup", - feature_group_job: FeatureGroupJob, - dataframe, + self, + feature_group: "FeatureGroup", + feature_group_job: FeatureGroupJob, + dataframe, ): try: self._save_offline_dataframe(dataframe, feature_group, feature_group_job) except Exception as e: raise SparkExecutionException(e).with_traceback(e.__traceback__) + def ingest_feature_definition_stream( + self, + feature_group, + feature_group_job: FeatureGroupJob, + dataframe, + query_name, + await_termination, + timeout, + checkpoint_dir, + ): + try: + self._save_offline_dataframe_stream( + dataframe, + feature_group, + feature_group_job, + query_name, + await_termination, + timeout, + checkpoint_dir, + ) + except Exception as e: + raise SparkExecutionException(e).with_traceback(e.__traceback__) + def ingest_dataset(self, dataset, dataset_job: DatasetJob): try: self._save_dataset_input(dataset, dataset_job) @@ -94,7 +117,7 @@ def ingest_dataset(self, dataset, dataset_job: DatasetJob): raise SparkExecutionException(e).with_traceback(e.__traceback__) def delete_feature_definition( - self, feature_group: "FeatureGroup", feature_group_job: FeatureGroupJob + self, feature_group: "FeatureGroup", feature_group_job: FeatureGroupJob ): """ Deletes a feature definition from the system. @@ -187,8 +210,28 @@ def _validate_expectation(expectation_type, validation_output: dict): if error_message: raise Exception(error_message) + @classmethod + def is_streaming_dataframe(cls, data_frame): + """ + Check if the provided DataFrame is a Spark Streaming DataFrame. + + Args: + data_frame (DataFrame): The DataFrame to check. + + Returns: + bool: True if it's a Spark Streaming DataFrame, False otherwise. + """ + if isinstance(data_frame, pd.DataFrame): + return False + elif isinstance(data_frame, DataFrame): + return data_frame.isStreaming + else: + raise ValueError( + "Invalid DataFrame type. Expected Pandas or Spark DataFrame." + ) + def _save_offline_dataframe( - self, data_frame, feature_group, feature_group_job: FeatureGroupJob + self, data_frame, feature_group, feature_group_job: FeatureGroupJob ): """Ingest dataframe to the feature store system. as now this handles both spark dataframe and pandas dataframe. in case of pandas after transformation we convert it to spark and write to the delta. @@ -225,7 +268,9 @@ def _save_offline_dataframe( # TODO: Get event timestamp column and apply filtering basis from and to timestamp if feature_group.expectation_details: - expectation_type = feature_group.expectation_details["expectationType"] + expectation_type = feature_group.expectation_details[ + "expectationType" + ] logger.info(f"Validation expectation type: {expectation_type}") # Apply validations @@ -289,10 +334,15 @@ def _save_offline_dataframe( logger.info(f"output features for the FeatureGroup: {output_features}") # Compute Feature Statistics - feature_statistics = StatisticsService.compute_stats_with_mlm( - statistics_config=feature_group.oci_feature_group.statistics_config, - input_df=featured_data, - ) + if self.is_streaming_dataframe(data_frame): + logger.warning( + "Stats skipped: Streaming DataFrames are not supported for statistics." + ) + else: + feature_statistics = StatisticsService.compute_stats_with_mlm( + statistics_config=feature_group.oci_feature_group.statistics_config, + input_df=featured_data, + ) except Exception as ex: error_details = str(ex) @@ -419,7 +469,7 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob): # Get the output features output_features = get_features( - output_columns=self.spark_engine.get_columns_from_table(target_table), + output_columns=self.spark_engine.get_output_columns_from_table_or_dataframe(table_name=target_table), parent_id=dataset.id, entity_type=EntityType.DATASET, ) @@ -460,7 +510,7 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob): @staticmethod def _update_job_and_parent_details( - parent_entity, job_entity, output_features=None, output_details=None + parent_entity, job_entity, output_features=None, output_details=None ): """ Updates the parent and job entities with relevant details. @@ -484,3 +534,92 @@ def _update_job_and_parent_details( # Update both the parent and job entities. parent_entity.update() + + def _save_offline_dataframe_stream( + self, + dataframe, + feature_group, + feature_group_job, + query_name, + await_termination, + timeout, + checkpoint_dir, + ): + output_features = [] + output_details = { + "error_details": None, + "validation_output": None, + "commit_id": "commit_id", + "feature_statistics": None, + } + + try: + # Create database in hive metastore if not exist + database = feature_group.entity_id + self.spark_engine.create_database(database) + + target_table = f"{database}.{feature_group.name}" + + # Apply the transformation + if feature_group.transformation_id: + logger.info("Dataframe is transformation enabled.") + + # Get the Transformation Arguments if exists and pass to the transformation function. + transformation_kwargs = Base64EncoderDecoder.decode( + feature_group.transformation_kwargs + ) + + # Loads the transformation resource + transformation = Transformation.from_id(feature_group.transformation_id) + + featured_data = TransformationUtils.apply_transformation( + self._spark_session, + dataframe, + transformation, + transformation_kwargs, + ) + else: + logger.info("Transformation not defined.") + featured_data = dataframe + + # Get the output features + output_features = get_features( + self.spark_engine.get_output_columns_from_table_or_dataframe(dataframe=featured_data), feature_group.id + ) + + self._update_job_and_parent_details( + parent_entity=feature_group, + job_entity=feature_group_job, + output_features=output_features, + output_details=output_details, + ) + + streaming_query = self.delta_lake_service.write_stream_dataframe_to_delta_lake( + featured_data, + target_table, + feature_group_job.ingestion_mode, + query_name, + await_termination, + timeout, + checkpoint_dir, + feature_group_job.feature_option_details, + ) + + return streaming_query + + except Exception as ex: + # Update Job with Failed Status + error_details = str(ex) + tb = traceback.format_exc() + logger.error( + f"FeatureGroup Stream Materialization Failed with : {type(ex)} with error message: {ex} and stacktrace {tb}", + ) + + output_details["error_details"] = error_details + + self._update_job_and_parent_details( + parent_entity=feature_group, + job_entity=feature_group_job, + output_features=output_features, + output_details=output_details, + ) diff --git a/ads/feature_store/feature_group.py b/ads/feature_store/feature_group.py index 888c7cafb..b9d3491da 100644 --- a/ads/feature_store/feature_group.py +++ b/ads/feature_store/feature_group.py @@ -17,7 +17,7 @@ from ads.common import utils from ads.common.decorator.runtime_dependency import OptionalDependency from ads.common.oci_mixin import OCIModelMixin -from ads.feature_store.common.enums import ExpectationType, EntityType +from ads.feature_store.common.enums import ExpectationType, EntityType, StreamIngestionMode from ads.feature_store.common.exceptions import ( NotMaterializedError, ) @@ -913,6 +913,93 @@ def materialise( self, feature_group_job, input_dataframe ) + def materialise_stream( + self, + input_dataframe: Union[DataFrame], + query_name: Optional[str] = None, + ingestion_mode: StreamIngestionMode = StreamIngestionMode.APPEND, + await_termination: Optional[bool] = False, + timeout: Optional[int] = None, + checkpoint_dir: Optional[str] = None, + feature_option_details: FeatureOptionDetails = None, + ): + """Ingest a Spark Structured Streaming Dataframe to the feature store. + + This method creates a long running Spark Streaming Query, you can control the + termination of the query through the arguments. + + It is possible to stop the returned query with the `.stop()` and check its + status with `.isActive`. + + !!! warning "Engine Support" + **Spark only** + + Stream ingestion using Pandas/Python as engine is currently not supported. + Python/Pandas has no notion of streaming. + + !!! warning "Data Validation Support" + `materialise_stream` does not perform any data validation using Great Expectations + even when a expectation suite is attached. + + # Arguments + input_dataframe: Features in Streaming Dataframe to be saved. + query_name: It is possible to optionally specify a name for the query to + make it easier to recognise in the Spark UI. Defaults to `None`. + ingestion_mode: Specifies how data of a streaming DataFrame/Dataset is + written to a streaming sink. (1) `"append"`: Only the new rows in the + streaming DataFrame/Dataset will be written to the sink. (2) + `"complete"`: All the rows in the streaming DataFrame/Dataset will be + written to the sink every time there is some update. (3) `"update"`: + only the rows that were updated in the streaming DataFrame/Dataset will + be written to the sink every time there are some updates. + If the query doesn’t contain aggregations, it will be equivalent to + append mode. Defaults to `"append"`. + await_termination: Waits for the termination of this query, either by + query.stop() or by an exception. If the query has terminated with an + exception, then the exception will be thrown. If timeout is set, it + returns whether the query has terminated or not within the timeout + seconds. Defaults to `False`. + timeout: Only relevant in combination with `await_termination=True`. + Defaults to `None`. + checkpoint_dir: Checkpoint directory location. This will be used to as a reference to + from where to resume the streaming job. If `None` then hsfs will construct as + "insert_stream_" + online_topic_name. Defaults to `None`. + write_options: Additional write options for Spark as key-value pairs. + Defaults to `{}`. + + # Returns + `StreamingQuery`: Spark Structured Streaming Query object. + """ + + # Create Feature Definition Job and persist it + feature_group_job = self._build_feature_group_job( + ingestion_mode=ingestion_mode, + feature_option_details=feature_option_details, + ) + + # Create the Job + feature_group_job.create() + + # Update the feature group with corresponding job so that user can see the details about the job + self.with_job_id(feature_group_job.id) + + feature_group_execution_strategy = ( + OciExecutionStrategyProvider.provide_execution_strategy( + execution_engine=get_execution_engine_type(input_dataframe), + metastore_id=get_metastore_id(self.feature_store_id), + ) + ) + + return feature_group_execution_strategy.ingest_feature_definition_stream( + self, + feature_group_job, + input_dataframe, + query_name, + await_termination, + timeout, + checkpoint_dir, + ) + def get_last_job(self) -> "FeatureGroupJob": """Gets the Job details for the last running job.