Skip to content

Commit

Permalink
updated the documents
Browse files Browse the repository at this point in the history
  • Loading branch information
yogesh266 committed Jul 8, 2023
1 parent e18ad22 commit d2646a8
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 60 deletions.
5 changes: 3 additions & 2 deletions ads/feature_store/common/spark_session_singleton.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ def __init__(self, metastore_id: str = None):
"spark.hadoop.oracle.dcat.metastore.id", metastore_id
).config(
"spark.sql.warehouse.dir", metastore.default_managed_table_location
)\
.config("spark.driver.memory", "16G")
).config(
"spark.driver.memory", "16G"
)

if developer_enabled():
# Configure spark session with delta jars only in developer mode. In other cases,
Expand Down
1 change: 0 additions & 1 deletion ads/feature_store/common/utils/feature_schema_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ def map_feature_type_to_pandas(feature_type):
raise TypeError(f"Feature Type {feature_type} is not supported for pandas")



def map_spark_type_to_stats_data_type(spark_type):
"""Maps the spark data types to MLM library data types
args:
Expand Down
35 changes: 21 additions & 14 deletions ads/feature_store/common/utils/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@


def get_execution_engine_type(
data_frame: Union[DataFrame, pd.DataFrame]
data_frame: Union[DataFrame, pd.DataFrame]
) -> ExecutionEngine:
"""
Determines the execution engine type for a given DataFrame.
Expand Down Expand Up @@ -84,7 +84,7 @@ def get_metastore_id(feature_store_id: str):


def validate_delta_format_parameters(
timestamp: datetime = None, version_number: int = None, is_restore: bool = False
timestamp: datetime = None, version_number: int = None, is_restore: bool = False
):
"""
Validate the user input provided as part of preview, restore APIs for ingested data, Ingested data is
Expand Down Expand Up @@ -118,9 +118,9 @@ def validate_delta_format_parameters(


def get_features(
output_columns: List[dict],
parent_id: str,
entity_type: EntityType = EntityType.FEATURE_GROUP,
output_columns: List[dict],
parent_id: str,
entity_type: EntityType = EntityType.FEATURE_GROUP,
) -> List[Feature]:
"""
Returns a list of features, given a list of output_columns and a feature_group_id.
Expand Down Expand Up @@ -155,7 +155,9 @@ def get_features(


def get_schema_from_pandas_df(df: pd.DataFrame, feature_store_id: str):
spark = SparkSessionSingleton(get_metastore_id(feature_store_id)).get_spark_session()
spark = SparkSessionSingleton(
get_metastore_id(feature_store_id)
).get_spark_session()
converted_df = spark.createDataFrame(df)
return get_schema_from_spark_df(converted_df)

Expand All @@ -174,7 +176,9 @@ def get_schema_from_spark_df(df: DataFrame):
return schema_details


def get_schema_from_df(data_frame: Union[DataFrame, pd.DataFrame], feature_store_id: str) -> List[dict]:
def get_schema_from_df(
data_frame: Union[DataFrame, pd.DataFrame], feature_store_id: str
) -> List[dict]:
"""
Given a DataFrame, returns a list of dictionaries that describe its schema.
If the DataFrame is a pandas DataFrame, it uses pandas methods to get the schema.
Expand All @@ -187,8 +191,7 @@ def get_schema_from_df(data_frame: Union[DataFrame, pd.DataFrame], feature_store


def get_input_features_from_df(
data_frame: Union[DataFrame, pd.DataFrame],
feature_store_id: str
data_frame: Union[DataFrame, pd.DataFrame], feature_store_id: str
) -> List[FeatureDetail]:
"""
Given a DataFrame, returns a list of FeatureDetail objects that represent its input features.
Expand All @@ -205,7 +208,7 @@ def get_input_features_from_df(


def convert_expectation_suite_to_expectation(
expectation_suite: ExpectationSuite, expectation_type: ExpectationType
expectation_suite: ExpectationSuite, expectation_type: ExpectationType
):
"""
Convert an ExpectationSuite object to an Expectation object with detailed rule information.
Expand Down Expand Up @@ -264,7 +267,7 @@ def largest_matching_subset_of_primary_keys(left_feature_group, right_feature_gr


def convert_pandas_datatype_with_schema(
raw_feature_details: List[dict], input_df: pd.DataFrame
raw_feature_details: List[dict], input_df: pd.DataFrame
) -> pd.DataFrame:
feature_detail_map = {}
columns_to_remove = []
Expand All @@ -281,21 +284,25 @@ def convert_pandas_datatype_with_schema(
.where(pd.notnull(input_df[column]), None)
)
else:
logger.warning("column" + column + "doesn't exist in the input feature details")
logger.warning(
"column" + column + "doesn't exist in the input feature details"
)
columns_to_remove.append(column)
return input_df.drop(columns=columns_to_remove)


def convert_spark_dataframe_with_schema(
raw_feature_details: List[dict], input_df: DataFrame
raw_feature_details: List[dict], input_df: DataFrame
) -> DataFrame:
feature_detail_map = {}
columns_to_remove = []
for feature_details in raw_feature_details:
feature_detail_map[feature_details.get("name")] = feature_details
for column in input_df.columns:
if column not in feature_detail_map.keys():
logger.warning("column" + column + "doesn't exist in the input feature details")
logger.warning(
"column" + column + "doesn't exist in the input feature details"
)
columns_to_remove.append(column)

return input_df.drop(*columns_to_remove)
Expand Down
4 changes: 3 additions & 1 deletion ads/feature_store/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ def create_feature_group(
raw_feature_details = (
input_feature_details
if input_feature_details
else get_input_features_from_df(schema_details_dataframe, self.feature_store_id)
else get_input_features_from_df(
schema_details_dataframe, self.feature_store_id
)
)

self.oci_feature_group = self._build_feature_group(
Expand Down
8 changes: 4 additions & 4 deletions ads/feature_store/execution_strategy/spark/spark_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
from ads.feature_store.transformation import Transformation

from ads.feature_store.feature_statistics.statistics_service import StatisticsService
from ads.feature_store.common.utils.utility import (
validate_input_feature_details
)
from ads.feature_store.common.utils.utility import validate_input_feature_details

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -178,7 +176,9 @@ def _save_offline_dataframe(
self.spark_engine.create_database(database)

if not feature_group.is_infer_schema:
data_frame = validate_input_feature_details(feature_group.input_feature_details, data_frame)
data_frame = validate_input_feature_details(
feature_group.input_feature_details, data_frame
)

# TODO: Get event timestamp column and apply filtering basis from and to timestamp

Expand Down
5 changes: 3 additions & 2 deletions ads/feature_store/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,10 @@ def with_input_feature_details(
def with_schema_details_from_dataframe(
self, data_frame: Union[DataFrame, pd.DataFrame]
) -> "FeatureGroup":

if not self.feature_store_id:
raise ValueError("FeatureStore id must be set before calling `with_schema_details_from_dataframe`")
raise ValueError(
"FeatureStore id must be set before calling `with_schema_details_from_dataframe`"
)

schema_details = get_schema_from_df(data_frame, self.feature_store_id)
feature_details = []
Expand Down
68 changes: 33 additions & 35 deletions ads/feature_store/feature_store_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ def __init__(self):

class _ModelBuilderHashDict(Generic[_ModelBuilderT]):
def __init__(
self,
builders: Optional[List[_ModelBuilderT]],
hash_fn: Callable = lambda model: model.name
if model.attribute_map.get("name")
else model.display_name,
self,
builders: Optional[List[_ModelBuilderT]],
hash_fn: Callable = lambda model: model.name
if model.attribute_map.get("name")
else model.display_name,
):
self.__hash_fn = hash_fn
self._dict: Dict[str, _ModelBuilderT] = {}
Expand Down Expand Up @@ -125,10 +125,10 @@ def get(self, key: str) -> Union[None, _ModelBuilderT]:

class _ElementMap(Generic[_ModelBuilderT]):
def __init__(
self,
element_type: _ModelBuilderT,
element_dict: _ModelBuilderHashDict[_ModelBuilderT],
parent_child_map: Dict[str, "_ParentChildMap"] = None,
self,
element_type: _ModelBuilderT,
element_dict: _ModelBuilderHashDict[_ModelBuilderT],
parent_child_map: Dict[str, "_ParentChildMap"] = None,
):
self.element_type = element_type
self.element_dict = element_dict
Expand All @@ -140,9 +140,9 @@ def add_element_from_dict(self, config: dict) -> str:

class _ParentChildMap:
def __init__(
self,
child_mapping: "_ElementMap",
parent_child_hash_map: DefaultDict[str, List[str]],
self,
child_mapping: "_ElementMap",
parent_child_hash_map: DefaultDict[str, List[str]],
):
self.child_mapping = child_mapping
self.parent_child_hash_map = parent_child_hash_map
Expand All @@ -158,12 +158,12 @@ class FeatureStoreRegistrar:
DATASET_SPEC = "dataset"

def __init__(
self,
feature_store: FeatureStore = None,
entities: List[Entity] = None,
datasets: List[Dataset] = None,
feature_groups: List[FeatureGroup] = None,
transformations: List[Transformation] = None,
self,
feature_store: FeatureStore = None,
entities: List[Entity] = None,
datasets: List[Dataset] = None,
feature_groups: List[FeatureGroup] = None,
transformations: List[Transformation] = None,
):
"""
Initialised feature registrar resource
Expand All @@ -189,7 +189,7 @@ def __init__(
self._entity_map = {}

def create(
self,
self,
) -> Tuple[
FeatureStore,
List[Entity],
Expand Down Expand Up @@ -248,12 +248,10 @@ def _create_transformations(self) -> List[Transformation]:
for transformation in self._transformations.get_dict().values():
transformation.feature_store_id = self._feature_store_id
transformation.compartment_id = (
transformation.compartment_id or self._root_compartment_id
transformation.compartment_id or self._root_compartment_id
)
# to encode to base64
transformation.source_code_function = (
transformation.source_code_function
)
transformation.source_code_function = transformation.source_code_function
return self._transformations.create_models(self._progress)

def _create_feature_groups(self) -> List[FeatureGroup]:
Expand Down Expand Up @@ -288,7 +286,7 @@ def _create_feature_groups(self) -> List[FeatureGroup]:
feature_group.primary_keys = feature_group.primary_keys
feature_group.feature_store_id = self._feature_store_id
feature_group.compartment_id = (
feature_group.compartment_id or self._root_compartment_id
feature_group.compartment_id or self._root_compartment_id
)

return self._feature_groups.create_models(self._progress)
Expand Down Expand Up @@ -394,11 +392,11 @@ def generate_yaml(cls, uri: str = "feature_store.yaml"):

@classmethod
def from_yaml(
cls,
yaml_string: str = None,
uri: str = None,
loader: callable = Loader,
**kwargs,
cls,
yaml_string: str = None,
uri: str = None,
loader: callable = Loader,
**kwargs,
) -> "FeatureStoreRegistrar":
"""Creates an object from YAML string provided or from URI location containing YAML string
Expand Down Expand Up @@ -445,11 +443,11 @@ def _find_yaml_definition_file() -> str:

def _get_progress_steps_count(self) -> int:
return (
self._feature_store.get_count()
+ self._entities.get_count()
+ self._transformations.get_count()
+ self._feature_groups.get_count()
+ self._datasets.get_count()
self._feature_store.get_count()
+ self._entities.get_count()
+ self._transformations.get_count()
+ self._feature_groups.get_count()
+ self._datasets.get_count()
)

@staticmethod
Expand All @@ -470,7 +468,7 @@ def _read_from_file(uri: str, **kwargs) -> str:

@staticmethod
def _populate_child_mappings(
parent_dict: dict, parent_map: _ElementMap, parent_hash: str
parent_dict: dict, parent_map: _ElementMap, parent_hash: str
):
for key, elements in parent_dict.get("spec").items():
if key in parent_map.parent_child_map:
Expand Down
2 changes: 1 addition & 1 deletion ads/feature_store/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,4 +498,4 @@ def to_dict(self) -> Dict:

def __repr__(self) -> str:
"""Displays the object as YAML."""
return self.to_yaml()
return self.to_yaml()

0 comments on commit d2646a8

Please sign in to comment.