-
Notifications
You must be signed in to change notification settings - Fork 44
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
code changes for streaming integeration
- Loading branch information
Showing
4 changed files
with
198 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
4 changes: 4 additions & 0 deletions
4
tests/integration/feature_store/test_data/credit_score_batch_1.csv
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
user_id,date,credit_score | ||
c123006815,01/01/22,568 | ||
c123006815,01/01/22,568 | ||
c123006850,05/02/22,740 |
8 changes: 8 additions & 0 deletions
8
tests/integration/feature_store/test_data/credit_score_batch_2.csv
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
user_id,date,credit_score | ||
c123006818,04/01/22,571 | ||
c123006847,02/02/22,800 | ||
c123006820,06/01/22,573 | ||
c123006857,12/02/22,850 | ||
c123006822,08/01/22,575 | ||
c123006823,09/01/22,300 | ||
c123006824,10/01/22,577 |
185 changes: 185 additions & 0 deletions
185
tests/integration/feature_store/test_streaming_dataframe_feature_group.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
import time | ||
|
||
from delta import configure_spark_with_delta_pip | ||
from great_expectations.core import ExpectationSuite, ExpectationConfiguration | ||
from pyspark.sql import SparkSession | ||
from pyspark.sql.types import StructType | ||
|
||
from ads.feature_store.common.enums import TransformationMode, ExpectationType | ||
from ads.feature_store.statistics_config import StatisticsConfig | ||
from tests.integration.feature_store.test_base import FeatureStoreTestCase | ||
|
||
|
||
def get_streaming_df(): | ||
spark_builder = ( | ||
SparkSession.builder.appName("FeatureStore") | ||
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") | ||
.config( | ||
"spark.sql.catalog.spark_catalog", | ||
"org.apache.spark.sql.delta.catalog.DeltaCatalog", | ||
) | ||
.enableHiveSupport() | ||
) | ||
|
||
spark = configure_spark_with_delta_pip( | ||
spark_builder | ||
).getOrCreate() | ||
|
||
# Define the schema for the streaming data frame | ||
credit_score_schema = StructType() \ | ||
.add("user_id", "string") \ | ||
.add("date", "string") \ | ||
.add("credit_score", "string") | ||
|
||
credit_score_streaming_df = spark.readStream \ | ||
.option("sep", ",") \ | ||
.option("header", "true")\ | ||
.schema(credit_score_schema) \ | ||
.csv("test_data/") | ||
|
||
return credit_score_streaming_df | ||
|
||
|
||
def credit_score_transformation(credit_score): | ||
import pyspark.sql.functions as F | ||
|
||
# Create a new Spark DataFrame that contains the transformed credit score. | ||
transformed_credit_score = credit_score.select( | ||
"user_id", | ||
"date", | ||
F.when(F.col("credit_score").cast("int") > 500, 1).otherwise(0).alias("credit_score") | ||
) | ||
|
||
# Return the new Spark DataFrame. | ||
return transformed_credit_score | ||
|
||
|
||
class TestFeatureGroupWithStreamingDataFrame(FeatureStoreTestCase): | ||
"""Contains integration tests for Feature Group Kwargs supported transformation.""" | ||
|
||
def create_transformation_resource_stream(self, feature_store) -> "Transformation": | ||
transformation = feature_store.create_transformation( | ||
source_code_func=credit_score_transformation, | ||
display_name="credit_score_transformation", | ||
transformation_mode=TransformationMode.SPARK, | ||
) | ||
return transformation | ||
|
||
|
||
def test_feature_group_materialization_with_streaming_data_frame(self): | ||
fs = self.define_feature_store_resource().create() | ||
assert fs.oci_fs.id | ||
|
||
entity = self.create_entity_resource(fs) | ||
assert entity.oci_fs_entity.id | ||
|
||
transformation = self.create_transformation_resource_stream(fs) | ||
streaming_df = get_streaming_df() | ||
|
||
stats_config = StatisticsConfig().with_is_enabled(False) | ||
fg = entity.create_feature_group( | ||
primary_keys=["User_id"], | ||
schema_details_dataframe=streaming_df, | ||
statistics_config=stats_config, | ||
name=self.get_name("streaming_fg_1"), | ||
transformation_id=transformation.id | ||
) | ||
assert fg.oci_feature_group.id | ||
|
||
query = fg.materialise_stream(input_dataframe=streaming_df, | ||
checkpoint_dir=f"test_data/checkpoint/{fg.name}") | ||
|
||
assert query | ||
time.sleep(10) | ||
query.stop() | ||
|
||
assert fg.select().read().count() == 10 | ||
|
||
self.clean_up_feature_group(fg) | ||
self.clean_up_transformation(transformation) | ||
self.clean_up_entity(entity) | ||
self.clean_up_feature_store(fs) | ||
|
||
def test_feature_group_materialization_with_streaming_data_frame_and_expectation(self): | ||
fs = self.define_feature_store_resource().create() | ||
assert fs.oci_fs.id | ||
|
||
entity = self.create_entity_resource(fs) | ||
assert entity.oci_fs_entity.id | ||
|
||
transformation = self.create_transformation_resource_stream(fs) | ||
streaming_df = get_streaming_df() | ||
|
||
stats_config = StatisticsConfig().with_is_enabled(False) | ||
# Initialize Expectation Suite | ||
expectation_suite_trans = ExpectationSuite(expectation_suite_name="feature_definition") | ||
expectation_suite_trans.add_expectation( | ||
ExpectationConfiguration( | ||
expectation_type="EXPECT_COLUMN_VALUES_TO_BE_NULL", kwargs={"column": "date"} | ||
) | ||
) | ||
expectation_suite_trans.add_expectation( | ||
ExpectationConfiguration( | ||
expectation_type="EXPECT_COLUMN_VALUES_TO_NOT_BE_NULL", | ||
kwargs={"column": "date"}, | ||
) | ||
) | ||
|
||
fg = entity.create_feature_group( | ||
primary_keys=["User_id"], | ||
schema_details_dataframe=streaming_df, | ||
statistics_config=stats_config, | ||
expectation_suite=expectation_suite_trans, | ||
expectation_type=ExpectationType.LENIENT, | ||
name=self.get_name("streaming_fg_2"), | ||
transformation_id=transformation.id | ||
) | ||
assert fg.oci_feature_group.id | ||
|
||
query = fg.materialise_stream(input_dataframe=streaming_df, | ||
checkpoint_dir=f"test_data/checkpoint/{fg.name}") | ||
|
||
assert query | ||
time.sleep(10) | ||
query.stop() | ||
|
||
assert fg.select().read().count() == 10 | ||
assert fg.get_validation_output().to_pandas() is None | ||
|
||
self.clean_up_feature_group(fg) | ||
self.clean_up_transformation(transformation) | ||
self.clean_up_entity(entity) | ||
self.clean_up_feature_store(fs) | ||
|
||
def test_feature_group_materialization_with_streaming_data_frame_and_stats(self): | ||
fs = self.define_feature_store_resource().create() | ||
assert fs.oci_fs.id | ||
|
||
entity = self.create_entity_resource(fs) | ||
assert entity.oci_fs_entity.id | ||
|
||
transformation = self.create_transformation_resource_stream(fs) | ||
streaming_df = get_streaming_df() | ||
|
||
fg = entity.create_feature_group( | ||
primary_keys=["User_id"], | ||
schema_details_dataframe=streaming_df, | ||
name=self.get_name("streaming_fg_3"), | ||
transformation_id=transformation.id | ||
) | ||
assert fg.oci_feature_group.id | ||
|
||
query = fg.materialise_stream(input_dataframe=streaming_df, | ||
checkpoint_dir=f"test_data/checkpoint/{fg.name}") | ||
|
||
assert query | ||
time.sleep(10) | ||
query.stop() | ||
|
||
assert fg.select().read().count() == 10 | ||
assert fg.get_statistics().to_pandas() is None | ||
|
||
self.clean_up_feature_group(fg) | ||
self.clean_up_transformation(transformation) | ||
self.clean_up_entity(entity) | ||
self.clean_up_feature_store(fs) |