From 38e19d12726a64d3c55dbec6b5b7354f9daf6da5 Mon Sep 17 00:00:00 2001 From: Yogesh Kumawat Date: Tue, 25 Jul 2023 09:16:24 +0530 Subject: [PATCH] partitioning code changes for the dataset --- ads/feature_store/dataset.py | 33 +++++++ ads/feature_store/entity.py | 90 ++++++++++--------- .../delta_lake/delta_lake_service.py | 35 +++++--- .../spark/spark_execution.py | 1 + ads/feature_store/feature_group.py | 8 +- ads/feature_store/validation_output.py | 12 ++- 6 files changed, 114 insertions(+), 65 deletions(-) diff --git a/ads/feature_store/dataset.py b/ads/feature_store/dataset.py index ac7537d70..2303777bd 100644 --- a/ads/feature_store/dataset.py +++ b/ads/feature_store/dataset.py @@ -105,6 +105,7 @@ class Dataset(Builder): CONST_DESCRIPTION = "description" CONST_FREEFORM_TAG = "freeformTags" CONST_DEFINED_TAG = "definedTags" + CONST_PARTITION_KEYS = "partitionKeys" CONST_OUTPUT_FEATURE_DETAILS = "outputFeatureDetails" CONST_EXPECTATION_DETAILS = "expectationDetails" CONST_STATISTICS_CONFIG = "statisticsConfig" @@ -128,6 +129,7 @@ class Dataset(Builder): CONST_OUTPUT_FEATURE_DETAILS: "output_feature_details", CONST_LIFECYCLE_STATE: "lifecycle_state", CONST_MODEL_DETAILS: "model_details", + CONST_PARTITION_KEYS: "partition_keys", } def __init__(self, spec: Dict = None, **kwargs) -> None: @@ -500,6 +502,37 @@ def with_model_details(self, model_details: ModelDetails) -> "Dataset": ) return self.set_spec(self.CONST_MODEL_DETAILS, model_details.to_dict()) + @property + def partition_keys(self) -> List[str]: + return self.get_spec(self.CONST_PARTITION_KEYS) + + @partition_keys.setter + def partition_keys(self, value: List[str]): + self.with_partition_keys(value) + + def with_partition_keys(self, partition_keys: List[str]) -> "Dataset": + """Sets the partition keys of the dataset. + + Parameters + ---------- + partition_keys: List[str] + The List of partition keys for the feature group. + + Returns + ------- + FeatureGroup + The FeatureGroup instance (self) + """ + return self.set_spec( + self.CONST_PARTITION_KEYS, + { + self.CONST_ITEMS: [ + {self.CONST_NAME: partition_key} + for partition_key in partition_keys or [] + ] + }, + ) + def add_models(self, model_details: ModelDetails) -> "Dataset": """Add model details to the dataset, Append to the existing model id list diff --git a/ads/feature_store/entity.py b/ads/feature_store/entity.py index 3db7e00af..f889e1d60 100644 --- a/ads/feature_store/entity.py +++ b/ads/feature_store/entity.py @@ -281,17 +281,17 @@ def create(self, **kwargs) -> "Entity": return self def _build_feature_group( - self, - primary_keys, - partition_keys, - input_feature_details, - expectation_suite: ExpectationSuite = None, - expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION, - statistics_config: Union[StatisticsConfig, bool] = True, - transformation_id: str = None, - name: str = None, - description: str = None, - compartment_id: str = None, + self, + primary_keys, + partition_keys, + input_feature_details, + expectation_suite: ExpectationSuite = None, + expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION, + statistics_config: Union[StatisticsConfig, bool] = True, + transformation_id: str = None, + name: str = None, + description: str = None, + compartment_id: str = None, ): feature_group_resource = ( FeatureGroup() @@ -316,18 +316,18 @@ def _build_feature_group( return feature_group_resource def create_feature_group( - self, - primary_keys: List[str], - partition_keys: List[str] = None, - input_feature_details: List[FeatureDetail] = None, - schema_details_dataframe: Union[DataFrame, pd.DataFrame] = None, - expectation_suite: ExpectationSuite = None, - expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION, - statistics_config: Union[StatisticsConfig, bool] = True, - transformation_id: str = None, - name: str = None, - description: str = None, - compartment_id: str = None, + self, + primary_keys: List[str], + partition_keys: List[str] = None, + input_feature_details: List[FeatureDetail] = None, + schema_details_dataframe: Union[DataFrame, pd.DataFrame] = None, + expectation_suite: ExpectationSuite = None, + expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION, + statistics_config: Union[StatisticsConfig, bool] = True, + transformation_id: str = None, + name: str = None, + description: str = None, + compartment_id: str = None, ) -> "FeatureGroup": """Creates FeatureGroup resource. @@ -411,7 +411,7 @@ def delete_feature_group(self): @classmethod def list_feature_group( - cls, compartment_id: str = None, **kwargs + cls, compartment_id: str = None, **kwargs ) -> List["FeatureGroup"]: """Lists FeatureGroup resources in a given compartment. @@ -432,7 +432,7 @@ def list_feature_group( @classmethod def list_feature_group_df( - cls, compartment_id: str = None, **kwargs + cls, compartment_id: str = None, **kwargs ) -> "pandas.DataFrame": """Lists FeatureGroup resources in a given compartment as pandas dataframe. @@ -452,14 +452,15 @@ def list_feature_group_df( return FeatureGroup.list_df(compartment_id, **kwargs) def _build_dataset( - self, - query: str, - name: str = None, - description: str = None, - compartment_id: str = None, - expectation_suite: ExpectationSuite = None, - expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION, - statistics_config: Union[StatisticsConfig, bool] = True, + self, + query: str, + name: str = None, + description: str = None, + compartment_id: str = None, + expectation_suite: ExpectationSuite = None, + expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION, + statistics_config: Union[StatisticsConfig, bool] = True, + partition_keys: List[str] = None, ): dataset_resource = ( Dataset() @@ -472,6 +473,7 @@ def _build_dataset( compartment_id if compartment_id else self.compartment_id ) .with_statistics_config(statistics_config) + .with_partition_keys(partition_keys) ) if expectation_suite: @@ -483,14 +485,15 @@ def _build_dataset( return dataset_resource def create_dataset( - self, - query: str, - name: str = None, - description: str = None, - compartment_id: str = None, - expectation_suite: ExpectationSuite = None, - expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION, - statistics_config: Union[StatisticsConfig, bool] = True, + self, + query: str, + name: str = None, + description: str = None, + compartment_id: str = None, + expectation_suite: ExpectationSuite = None, + expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION, + statistics_config: Union[StatisticsConfig, bool] = True, + partition_keys: List[str] = None, ) -> "Dataset": """Creates Dataset resource. @@ -510,6 +513,8 @@ def create_dataset( Type of the expectation. statistics_config: StatisticsConfig = None Config details for the Statistics. + partition_keys: List[str] + Partition keys for the datset. Returns ------- @@ -529,6 +534,7 @@ def create_dataset( expectation_suite, expectation_type, statistics_config, + partition_keys, ) return self.oci_fs_dataset.create() @@ -566,7 +572,7 @@ def list_dataset(cls, compartment_id: str = None, **kwargs) -> List["Dataset"]: @classmethod def list_dataset_df( - cls, compartment_id: str = None, **kwargs + cls, compartment_id: str = None, **kwargs ) -> "pandas.DataFrame": """Lists Dataset resources in a given compartment as pandas dataframe. diff --git a/ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py b/ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py index c5f4130b4..361815b6d 100644 --- a/ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py +++ b/ads/feature_store/execution_strategy/delta_lake/delta_lake_service.py @@ -56,9 +56,6 @@ def write_dataframe_to_delta_lake( Returns: None. """ - - print(delta_table_primary_key, partition_keys, "Aaaya Kya") - logger.info(f"target table name {target_table_name}") if ( self.spark_engine.is_delta_table_exists(target_table_name) @@ -113,7 +110,11 @@ def write_dataframe_to_delta_lake( logger.info(f"Upsert ops for target table {target_table_name} ended") else: self.save_delta_dataframe( - dataflow_output, ingestion_mode, target_table_name, feature_options, partition_keys + dataflow_output, + ingestion_mode, + target_table_name, + feature_options, + partition_keys, ) def __execute_delta_merge_insert_update( @@ -197,7 +198,12 @@ def __execute_delta_merge_insert_update_all( ) def save_delta_dataframe( - self, dataframe, dataframe_ingestion_mode, table_name, feature_options=None, partition_keys=None + self, + dataframe, + dataframe_ingestion_mode, + table_name, + feature_options=None, + partition_keys=None, ): """ Saves a DataFrame to a Delta table with the specified options. @@ -211,9 +217,12 @@ def save_delta_dataframe( """ delta_partition_keys = [] - partition_keys_items = partition_keys["items"] - if partition_keys_items: - delta_partition_keys = [partition_key.get("name") for partition_key in partition_keys_items] + if partition_keys: + partition_keys_items = partition_keys["items"] + if partition_keys_items: + delta_partition_keys = [ + partition_key.get("name") for partition_key in partition_keys_items + ] if feature_options and feature_options.get("featureOptionWriteConfigDetails"): feature_delta_write_option_config = feature_options.get( @@ -226,11 +235,15 @@ def save_delta_dataframe( dataframe.write.format("delta").options( **self.get_delta_write_config(feature_delta_write_option_config) - ).mode(dataframe_ingestion_mode).partitionBy(delta_partition_keys).saveAsTable(table_name) - else: - dataframe.write.format("delta").mode(dataframe_ingestion_mode).partitionBy(delta_partition_keys).saveAsTable( + ).mode(dataframe_ingestion_mode).partitionBy( + delta_partition_keys + ).saveAsTable( table_name ) + else: + dataframe.write.format("delta").mode(dataframe_ingestion_mode).partitionBy( + delta_partition_keys + ).saveAsTable(table_name) def get_delta_write_config(self, feature_delta_write_option_config): """Returns a dictionary containing delta schema configuration options based on a given dictionary of feature diff --git a/ads/feature_store/execution_strategy/spark/spark_execution.py b/ads/feature_store/execution_strategy/spark/spark_execution.py index eeec21701..7453db00f 100644 --- a/ads/feature_store/execution_strategy/spark/spark_execution.py +++ b/ads/feature_store/execution_strategy/spark/spark_execution.py @@ -398,6 +398,7 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob): dataset_job.ingestion_mode, target_table, dataset_job.feature_option_details, + dataset.partition_keys, ) # Get the output features diff --git a/ads/feature_store/feature_group.py b/ads/feature_store/feature_group.py index 89322e027..94ac817da 100644 --- a/ads/feature_store/feature_group.py +++ b/ads/feature_store/feature_group.py @@ -125,7 +125,6 @@ class FeatureGroup(Builder): CONST_FEATURE_STORE_ID = "featureStoreId" CONST_ENTITY_ID = "entityId" CONST_ITEMS = "items" - CONST_PRIMARY_KEY_NAME = "name" CONST_PRIMARY_KEYS = "primaryKeys" CONST_PARTITION_KEYS = "partitionKeys" CONST_EXPECTATION_DETAILS = "expectationDetails" @@ -157,7 +156,7 @@ class FeatureGroup(Builder): CONST_OUTPUT_FEATURE_DETAILS: "output_feature_details", CONST_STATISTICS_CONFIG: "statistics_config", CONST_INFER_SCHEMA: "is_infer_schema", - CONST_PARTITION_KEYS: "partition_keys" + CONST_PARTITION_KEYS: "partition_keys", } def __init__(self, spec: Dict = None, **kwargs) -> None: @@ -321,8 +320,7 @@ def with_primary_keys(self, primary_keys: List[str]) -> "FeatureGroup": self.CONST_PRIMARY_KEYS, { self.CONST_ITEMS: [ - {self.CONST_PRIMARY_KEY_NAME: primary_key} - for primary_key in primary_keys + {self.CONST_NAME: primary_key} for primary_key in primary_keys ] }, ) @@ -352,7 +350,7 @@ def with_partition_keys(self, partition_keys: List[str]) -> "FeatureGroup": self.CONST_PARTITION_KEYS, { self.CONST_ITEMS: [ - {self.CONST_PRIMARY_KEY_NAME: partition_key} + {self.CONST_NAME: partition_key} for partition_key in partition_keys or [] ] }, diff --git a/ads/feature_store/validation_output.py b/ads/feature_store/validation_output.py index 13f793646..2ceb17ef9 100644 --- a/ads/feature_store/validation_output.py +++ b/ads/feature_store/validation_output.py @@ -23,10 +23,10 @@ def to_pandas(self) -> pd.DataFrame: The validation output information as a pandas DataFrame. """ if self.content: - validation_output_json = ( - json.loads(self.content) - ) - profile_result = pd.json_normalize(validation_output_json.get("results")).transpose() + validation_output_json = json.loads(self.content) + profile_result = pd.json_normalize( + validation_output_json.get("results") + ).transpose() return profile_result def to_summary(self) -> pd.DataFrame: @@ -39,9 +39,7 @@ def to_summary(self) -> pd.DataFrame: The validation output summary information as a pandas DataFrame. """ if self.content: - validation_output_json = ( - json.loads(self.content) - ) + validation_output_json = json.loads(self.content) profile_result = pd.json_normalize(validation_output_json).transpose() summary_df = profile_result.drop("results") return summary_df