Skip to content

Commit

Permalink
partitioning code changes for the dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
yogesh266 committed Jul 25, 2023
1 parent 8c8dc05 commit 38e19d1
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 65 deletions.
33 changes: 33 additions & 0 deletions ads/feature_store/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class Dataset(Builder):
CONST_DESCRIPTION = "description"
CONST_FREEFORM_TAG = "freeformTags"
CONST_DEFINED_TAG = "definedTags"
CONST_PARTITION_KEYS = "partitionKeys"
CONST_OUTPUT_FEATURE_DETAILS = "outputFeatureDetails"
CONST_EXPECTATION_DETAILS = "expectationDetails"
CONST_STATISTICS_CONFIG = "statisticsConfig"
Expand All @@ -128,6 +129,7 @@ class Dataset(Builder):
CONST_OUTPUT_FEATURE_DETAILS: "output_feature_details",
CONST_LIFECYCLE_STATE: "lifecycle_state",
CONST_MODEL_DETAILS: "model_details",
CONST_PARTITION_KEYS: "partition_keys",
}

def __init__(self, spec: Dict = None, **kwargs) -> None:
Expand Down Expand Up @@ -500,6 +502,37 @@ def with_model_details(self, model_details: ModelDetails) -> "Dataset":
)
return self.set_spec(self.CONST_MODEL_DETAILS, model_details.to_dict())

@property
def partition_keys(self) -> List[str]:
return self.get_spec(self.CONST_PARTITION_KEYS)

@partition_keys.setter
def partition_keys(self, value: List[str]):
self.with_partition_keys(value)

def with_partition_keys(self, partition_keys: List[str]) -> "Dataset":
"""Sets the partition keys of the dataset.
Parameters
----------
partition_keys: List[str]
The List of partition keys for the feature group.
Returns
-------
FeatureGroup
The FeatureGroup instance (self)
"""
return self.set_spec(
self.CONST_PARTITION_KEYS,
{
self.CONST_ITEMS: [
{self.CONST_NAME: partition_key}
for partition_key in partition_keys or []
]
},
)

def add_models(self, model_details: ModelDetails) -> "Dataset":
"""Add model details to the dataset, Append to the existing model id list
Expand Down
90 changes: 48 additions & 42 deletions ads/feature_store/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,17 @@ def create(self, **kwargs) -> "Entity":
return self

def _build_feature_group(
self,
primary_keys,
partition_keys,
input_feature_details,
expectation_suite: ExpectationSuite = None,
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
statistics_config: Union[StatisticsConfig, bool] = True,
transformation_id: str = None,
name: str = None,
description: str = None,
compartment_id: str = None,
self,
primary_keys,
partition_keys,
input_feature_details,
expectation_suite: ExpectationSuite = None,
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
statistics_config: Union[StatisticsConfig, bool] = True,
transformation_id: str = None,
name: str = None,
description: str = None,
compartment_id: str = None,
):
feature_group_resource = (
FeatureGroup()
Expand All @@ -316,18 +316,18 @@ def _build_feature_group(
return feature_group_resource

def create_feature_group(
self,
primary_keys: List[str],
partition_keys: List[str] = None,
input_feature_details: List[FeatureDetail] = None,
schema_details_dataframe: Union[DataFrame, pd.DataFrame] = None,
expectation_suite: ExpectationSuite = None,
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
statistics_config: Union[StatisticsConfig, bool] = True,
transformation_id: str = None,
name: str = None,
description: str = None,
compartment_id: str = None,
self,
primary_keys: List[str],
partition_keys: List[str] = None,
input_feature_details: List[FeatureDetail] = None,
schema_details_dataframe: Union[DataFrame, pd.DataFrame] = None,
expectation_suite: ExpectationSuite = None,
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
statistics_config: Union[StatisticsConfig, bool] = True,
transformation_id: str = None,
name: str = None,
description: str = None,
compartment_id: str = None,
) -> "FeatureGroup":
"""Creates FeatureGroup resource.
Expand Down Expand Up @@ -411,7 +411,7 @@ def delete_feature_group(self):

@classmethod
def list_feature_group(
cls, compartment_id: str = None, **kwargs
cls, compartment_id: str = None, **kwargs
) -> List["FeatureGroup"]:
"""Lists FeatureGroup resources in a given compartment.
Expand All @@ -432,7 +432,7 @@ def list_feature_group(

@classmethod
def list_feature_group_df(
cls, compartment_id: str = None, **kwargs
cls, compartment_id: str = None, **kwargs
) -> "pandas.DataFrame":
"""Lists FeatureGroup resources in a given compartment as pandas dataframe.
Expand All @@ -452,14 +452,15 @@ def list_feature_group_df(
return FeatureGroup.list_df(compartment_id, **kwargs)

def _build_dataset(
self,
query: str,
name: str = None,
description: str = None,
compartment_id: str = None,
expectation_suite: ExpectationSuite = None,
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
statistics_config: Union[StatisticsConfig, bool] = True,
self,
query: str,
name: str = None,
description: str = None,
compartment_id: str = None,
expectation_suite: ExpectationSuite = None,
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
statistics_config: Union[StatisticsConfig, bool] = True,
partition_keys: List[str] = None,
):
dataset_resource = (
Dataset()
Expand All @@ -472,6 +473,7 @@ def _build_dataset(
compartment_id if compartment_id else self.compartment_id
)
.with_statistics_config(statistics_config)
.with_partition_keys(partition_keys)
)

if expectation_suite:
Expand All @@ -483,14 +485,15 @@ def _build_dataset(
return dataset_resource

def create_dataset(
self,
query: str,
name: str = None,
description: str = None,
compartment_id: str = None,
expectation_suite: ExpectationSuite = None,
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
statistics_config: Union[StatisticsConfig, bool] = True,
self,
query: str,
name: str = None,
description: str = None,
compartment_id: str = None,
expectation_suite: ExpectationSuite = None,
expectation_type: ExpectationType = ExpectationType.NO_EXPECTATION,
statistics_config: Union[StatisticsConfig, bool] = True,
partition_keys: List[str] = None,
) -> "Dataset":
"""Creates Dataset resource.
Expand All @@ -510,6 +513,8 @@ def create_dataset(
Type of the expectation.
statistics_config: StatisticsConfig = None
Config details for the Statistics.
partition_keys: List[str]
Partition keys for the datset.
Returns
-------
Expand All @@ -529,6 +534,7 @@ def create_dataset(
expectation_suite,
expectation_type,
statistics_config,
partition_keys,
)

return self.oci_fs_dataset.create()
Expand Down Expand Up @@ -566,7 +572,7 @@ def list_dataset(cls, compartment_id: str = None, **kwargs) -> List["Dataset"]:

@classmethod
def list_dataset_df(
cls, compartment_id: str = None, **kwargs
cls, compartment_id: str = None, **kwargs
) -> "pandas.DataFrame":
"""Lists Dataset resources in a given compartment as pandas dataframe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ def write_dataframe_to_delta_lake(
Returns:
None.
"""

print(delta_table_primary_key, partition_keys, "Aaaya Kya")

logger.info(f"target table name {target_table_name}")
if (
self.spark_engine.is_delta_table_exists(target_table_name)
Expand Down Expand Up @@ -113,7 +110,11 @@ def write_dataframe_to_delta_lake(
logger.info(f"Upsert ops for target table {target_table_name} ended")
else:
self.save_delta_dataframe(
dataflow_output, ingestion_mode, target_table_name, feature_options, partition_keys
dataflow_output,
ingestion_mode,
target_table_name,
feature_options,
partition_keys,
)

def __execute_delta_merge_insert_update(
Expand Down Expand Up @@ -197,7 +198,12 @@ def __execute_delta_merge_insert_update_all(
)

def save_delta_dataframe(
self, dataframe, dataframe_ingestion_mode, table_name, feature_options=None, partition_keys=None
self,
dataframe,
dataframe_ingestion_mode,
table_name,
feature_options=None,
partition_keys=None,
):
"""
Saves a DataFrame to a Delta table with the specified options.
Expand All @@ -211,9 +217,12 @@ def save_delta_dataframe(
"""
delta_partition_keys = []

partition_keys_items = partition_keys["items"]
if partition_keys_items:
delta_partition_keys = [partition_key.get("name") for partition_key in partition_keys_items]
if partition_keys:
partition_keys_items = partition_keys["items"]
if partition_keys_items:
delta_partition_keys = [
partition_key.get("name") for partition_key in partition_keys_items
]

if feature_options and feature_options.get("featureOptionWriteConfigDetails"):
feature_delta_write_option_config = feature_options.get(
Expand All @@ -226,11 +235,15 @@ def save_delta_dataframe(

dataframe.write.format("delta").options(
**self.get_delta_write_config(feature_delta_write_option_config)
).mode(dataframe_ingestion_mode).partitionBy(delta_partition_keys).saveAsTable(table_name)
else:
dataframe.write.format("delta").mode(dataframe_ingestion_mode).partitionBy(delta_partition_keys).saveAsTable(
).mode(dataframe_ingestion_mode).partitionBy(
delta_partition_keys
).saveAsTable(
table_name
)
else:
dataframe.write.format("delta").mode(dataframe_ingestion_mode).partitionBy(
delta_partition_keys
).saveAsTable(table_name)

def get_delta_write_config(self, feature_delta_write_option_config):
"""Returns a dictionary containing delta schema configuration options based on a given dictionary of feature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob):
dataset_job.ingestion_mode,
target_table,
dataset_job.feature_option_details,
dataset.partition_keys,
)

# Get the output features
Expand Down
8 changes: 3 additions & 5 deletions ads/feature_store/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ class FeatureGroup(Builder):
CONST_FEATURE_STORE_ID = "featureStoreId"
CONST_ENTITY_ID = "entityId"
CONST_ITEMS = "items"
CONST_PRIMARY_KEY_NAME = "name"
CONST_PRIMARY_KEYS = "primaryKeys"
CONST_PARTITION_KEYS = "partitionKeys"
CONST_EXPECTATION_DETAILS = "expectationDetails"
Expand Down Expand Up @@ -157,7 +156,7 @@ class FeatureGroup(Builder):
CONST_OUTPUT_FEATURE_DETAILS: "output_feature_details",
CONST_STATISTICS_CONFIG: "statistics_config",
CONST_INFER_SCHEMA: "is_infer_schema",
CONST_PARTITION_KEYS: "partition_keys"
CONST_PARTITION_KEYS: "partition_keys",
}

def __init__(self, spec: Dict = None, **kwargs) -> None:
Expand Down Expand Up @@ -321,8 +320,7 @@ def with_primary_keys(self, primary_keys: List[str]) -> "FeatureGroup":
self.CONST_PRIMARY_KEYS,
{
self.CONST_ITEMS: [
{self.CONST_PRIMARY_KEY_NAME: primary_key}
for primary_key in primary_keys
{self.CONST_NAME: primary_key} for primary_key in primary_keys
]
},
)
Expand Down Expand Up @@ -352,7 +350,7 @@ def with_partition_keys(self, partition_keys: List[str]) -> "FeatureGroup":
self.CONST_PARTITION_KEYS,
{
self.CONST_ITEMS: [
{self.CONST_PRIMARY_KEY_NAME: partition_key}
{self.CONST_NAME: partition_key}
for partition_key in partition_keys or []
]
},
Expand Down
12 changes: 5 additions & 7 deletions ads/feature_store/validation_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ def to_pandas(self) -> pd.DataFrame:
The validation output information as a pandas DataFrame.
"""
if self.content:
validation_output_json = (
json.loads(self.content)
)
profile_result = pd.json_normalize(validation_output_json.get("results")).transpose()
validation_output_json = json.loads(self.content)
profile_result = pd.json_normalize(
validation_output_json.get("results")
).transpose()
return profile_result

def to_summary(self) -> pd.DataFrame:
Expand All @@ -39,9 +39,7 @@ def to_summary(self) -> pd.DataFrame:
The validation output summary information as a pandas DataFrame.
"""
if self.content:
validation_output_json = (
json.loads(self.content)
)
validation_output_json = json.loads(self.content)
profile_result = pd.json_normalize(validation_output_json).transpose()
summary_df = profile_result.drop("results")
return summary_df
Expand Down

0 comments on commit 38e19d1

Please sign in to comment.