Skip to content

Commit

Permalink
code changes for streaming dataframe
Browse files Browse the repository at this point in the history
  • Loading branch information
yogesh266 committed Sep 27, 2023
1 parent a14b4e3 commit 3873b99
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 24 deletions.
29 changes: 29 additions & 0 deletions ads/feature_store/common/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,35 @@ class IngestionMode(Enum):
DEFAULT = "DEFAULT"
UPSERT = "UPSERT"

class StreamIngestionMode(Enum):
"""
Enumeration for stream ingestion modes.
- `COMPLETE`: Represents complete stream ingestion where the entire dataset is replaced.
- `APPEND`: Represents appending new data to the existing dataset.
- `UPDATE`: Represents updating existing data in the dataset.
"""
COMPLETE = "COMPLETE"
APPEND = "APPEND"
UPDATE = "UPDATE"

class StreamingIngestionMode(Enum):
"""
An enumeration that represents the supported Ingestion Mode in feature store.
Attributes:
OVERWRITE (str): Ingestion mode to overwrite the data in the system.
APPEND (str): Ingestion mode to append the data in the system.
UPSERT (str): Ingestion mode to insert and update the data in the system.
Methods:
None
"""

APPEND = "APPEND"
DEFAULT = "DEFAULT"
UPSERT = "UPSERT"


class JoinType(Enum):
"""Enumeration of supported SQL join types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ def write_dataframe_to_delta_lake(
None.
"""
logger.info(f"target table name {target_table_name}")
# query = (
# dataflow_output.writeStream.outputMode("append")
# .format("delta")
# .option(
# "checkpointLocation",
# "/Users/yogeshkumawat/Desktop/Github-Oracle/accelerated-data-science/TestYogi/streaming",
# )
# .toTable(target_table_name)
# )
#
# query.awaitTermination()

if (
self.spark_engine.is_delta_table_exists(target_table_name)
and ingestion_mode.upper() == IngestionMode.UPSERT.value
Expand Down Expand Up @@ -341,3 +353,33 @@ def __get_insert_update_query_expression(feature_data_source_columns, table_name

logger.info(f"get_insert_update_query_expression {feature_data_update_set}")
return feature_data_update_set

def write_stream_dataframe_to_delta_lake(
self,
stream_dataframe,
target_table,
output_mode,
query_name,
await_termination,
timeout,
checkpoint_dir,
feature_option_details,
):
query = (
stream_dataframe
.writeStream.
outputMode(output_mode)
.format("delta")
.option(
"checkpointLocation",
checkpoint_dir,
)
.options(self.get_delta_write_config(feature_option_details))
.queryName(query_name)
.toTable(target_table)
)

if await_termination:
query.awaitTermination(timeout)

return query
29 changes: 19 additions & 10 deletions ads/feature_store/execution_strategy/engine/spark_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime

from ads.common.decorator.runtime_dependency import OptionalDependency
from ads.feature_store.common.utils.utility import get_schema_from_spark_dataframe, get_schema_from_spark_df

try:
from pyspark.sql import SparkSession
Expand Down Expand Up @@ -42,10 +43,10 @@ def __init__(self, metastore_id: str = None, spark_session: SparkSession = None)
)

def get_time_version_data(
self,
delta_table_name: str,
version_number: int = None,
timestamp: datetime = None,
self,
delta_table_name: str,
version_number: int = None,
timestamp: datetime = None,
):
split_db_name = delta_table_name.split(".")

Expand Down Expand Up @@ -103,10 +104,10 @@ def _read_delta_table(self, delta_table_path: str, read_options: Dict):
return df

def sql(
self,
query: str,
dataframe_type: DataFrameType = DataFrameType.SPARK,
is_online: bool = False,
self,
query: str,
dataframe_type: DataFrameType = DataFrameType.SPARK,
is_online: bool = False,
):
"""Execute SQL command on the offline or online feature store database
Expand Down Expand Up @@ -186,19 +187,27 @@ def get_tables_from_database(self, database):

return permanent_tables

def get_columns_from_table(self, table_name: str):
def get_output_columns_from_table_or_dataframe(self, table_name: str = None, dataframe=None):
"""Returns the column(features) along with type from the given table.
Args:
table_name(str): A string specifying the name of table name for which columns should be returned.
dataframe: Dataframe containing the transformed dataframe.
Returns:
List[{"name": "<feature_name>","featureType": "<feature_type>"}]
Returns the List of dictionary of column with name and type from the given table.
"""
if table_name is None and dataframe is None:
raise ValueError("Either 'table_name' or 'dataframe' must be provided to retrieve output columns.")

if dataframe is not None:
feature_data_target = dataframe
else:
feature_data_target = self.spark.sql(f"SELECT * FROM {table_name} LIMIT 1")

target_table_columns = []
feature_data_target = self.spark.sql(f"SELECT * FROM {table_name} LIMIT 1")

for field in feature_data_target.schema.fields:
target_table_columns.append(
Expand Down
13 changes: 13 additions & 0 deletions ads/feature_store/execution_strategy/execution_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ def ingest_feature_definition(
"""
pass

@abstractmethod
def ingest_feature_definition_stream(
self,
feature_group,
feature_group_job: FeatureGroupJob,
dataframe,
query_name,
await_termination,
timeout,
checkpoint_dir,
):
pass

@abstractmethod
def ingest_dataset(self, dataset, dataset_job: DatasetJob):
"""
Expand Down
Loading

0 comments on commit 3873b99

Please sign in to comment.