diff --git a/THIRD_PARTY_LICENSES.txt b/THIRD_PARTY_LICENSES.txt index 404fedace..d81f2bdd5 100644 --- a/THIRD_PARTY_LICENSES.txt +++ b/THIRD_PARTY_LICENSES.txt @@ -229,6 +229,12 @@ pandavro * Source code: https://github.com/ynqa/pandavro * Project home: https://github.com/ynqa/pandavro +plotly +* Copyright (c) 2016-2018 Plotly, Inc +* License: MIT License +* Source code: https://github.com/plotly/plotly.py +* Project home: https://plotly.com/ + protobuf * Copyright 2008 Google Inc. All rights reserved. * License: Google Protobuf License diff --git a/ads/common/decorator/runtime_dependency.py b/ads/common/decorator/runtime_dependency.py index 4185a3c68..aaf00dfe7 100644 --- a/ads/common/decorator/runtime_dependency.py +++ b/ads/common/decorator/runtime_dependency.py @@ -64,10 +64,9 @@ class OptionalDependency: OPTUNA = "oracle-ads[optuna]" SPARK = "oracle-ads[spark]" HUGGINGFACE = "oracle-ads[huggingface]" - GREAT_EXPECTATIONS = "oracle-ads[great-expectations]" + FEATURE_STORE = "oracle-ads[feature-store]" GRAPHVIZ = "oracle-ads[graphviz]" MLM_INSIGHTS = "oracle-ads[mlm_insights]" - PYARROW = "oracle-ads[pyarrow]" def runtime_dependency( diff --git a/ads/feature_store/.readthedocs.yaml b/ads/feature_store/.readthedocs.yaml new file mode 100644 index 000000000..4b1aaee99 --- /dev/null +++ b/ads/feature_store/.readthedocs.yaml @@ -0,0 +1,19 @@ +# Read the Docs configuration file + +# Required +version: 2 + +# Set the version of Python and other tools you might need +build: + os: ubuntu-22.04 + tools: + python: "3.9" + +# Build documentation in the docs/ directory with Sphinx +sphinx: + configuration: ads/feature_store/docs/source/conf.py + +# Optionally declare the Python requirements required to build your docs +python: + install: + - requirements: ads/feature_store/docs/requirements.txt diff --git a/ads/feature_store/common/spark_session_singleton.py b/ads/feature_store/common/spark_session_singleton.py index ac5933f9e..9d3e2716a 100644 --- a/ads/feature_store/common/spark_session_singleton.py +++ b/ads/feature_store/common/spark_session_singleton.py @@ -84,6 +84,7 @@ def __init__(self, metastore_id: str = None): ) .enableHiveSupport() ) + _managed_table_location = None if not developer_enabled() and metastore_id: # Get the authentication credentials for the OCI data catalog service @@ -94,12 +95,11 @@ def __init__(self, metastore_id: str = None): data_catalog_client = OCIClientFactory(**auth).data_catalog metastore = data_catalog_client.get_metastore(metastore_id).data + _managed_table_location = metastore.default_managed_table_location # Configure the Spark session builder object to use the specified metastore spark_builder.config( "spark.hadoop.oracle.dcat.metastore.id", metastore_id - ).config( - "spark.sql.warehouse.dir", metastore.default_managed_table_location - ).config( + ).config("spark.sql.warehouse.dir", _managed_table_location).config( "spark.driver.memory", "16G" ) @@ -114,7 +114,12 @@ def __init__(self, metastore_id: str = None): self.spark_session.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") self.spark_session.sparkContext.setLogLevel("OFF") + self.managed_table_location = _managed_table_location def get_spark_session(self): """Access method to get the spark session.""" return self.spark_session + + def get_managed_table_location(self): + """Returns the managed table location for the spark""" + return self.managed_table_location diff --git a/ads/feature_store/common/utils/transformation_utils.py b/ads/feature_store/common/utils/transformation_utils.py index 39265fc01..7a6445ccf 100644 --- a/ads/feature_store/common/utils/transformation_utils.py +++ b/ads/feature_store/common/utils/transformation_utils.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8; -*- import json + # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ @@ -64,9 +65,13 @@ def apply_transformation( dataframe.createOrReplaceTempView(temporary_table_view) transformed_data = spark.sql( - transformation_function_caller(temporary_table_view, **transformation_kwargs_dict) + transformation_function_caller( + temporary_table_view, **transformation_kwargs_dict + ) ) elif transformation.transformation_mode == TransformationMode.PANDAS.value: - transformed_data = transformation_function_caller(dataframe, **transformation_kwargs_dict) + transformed_data = transformation_function_caller( + dataframe, **transformation_kwargs_dict + ) return transformed_data diff --git a/ads/feature_store/data_validation/great_expectation.py b/ads/feature_store/data_validation/great_expectation.py index bcfdf28cd..4d904441d 100644 --- a/ads/feature_store/data_validation/great_expectation.py +++ b/ads/feature_store/data_validation/great_expectation.py @@ -21,8 +21,8 @@ from great_expectations.validator.validator import Validator except ModuleNotFoundError: raise ModuleNotFoundError( - f"The `great-expectations` module was not found. Please run `pip install " - f"{OptionalDependency.GREAT_EXPECTATIONS}`." + f"The `feature-store` module was not found. Please run `pip install " + f"{OptionalDependency.FEATURE_STORE}`." ) except Exception as e: raise diff --git a/ads/feature_store/dataset.py b/ads/feature_store/dataset.py index f1ca3478d..e505a8a76 100644 --- a/ads/feature_store/dataset.py +++ b/ads/feature_store/dataset.py @@ -1,13 +1,20 @@ #!/usr/bin/env python # -*- coding: utf-8; -*- -import json import logging from copy import deepcopy from datetime import datetime from typing import Dict, List, Union import pandas +import pandas as pd from great_expectations.core import ExpectationSuite + +from ads import deprecated +from oci.feature_store.models import ( + DatasetFeatureGroupCollection, + DatasetFeatureGroupSummary, +) + from ads.common import utils from ads.common.oci_mixin import OCIModelMixin from ads.feature_store.common.enums import ( @@ -15,6 +22,7 @@ ExpectationType, EntityType, ) +from ads.feature_store.common.exceptions import NotMaterializedError from ads.feature_store.common.utils.utility import ( get_metastore_id, validate_delta_format_parameters, @@ -26,10 +34,11 @@ OciExecutionStrategyProvider, ) from ads.feature_store.feature import DatasetFeature +from ads.feature_store.feature_group import FeatureGroup from ads.feature_store.feature_group_expectation import Expectation from ads.feature_store.feature_option_details import FeatureOptionDetails from ads.feature_store.service.oci_dataset import OCIDataset -from ads.feature_store.statistics import Statistics +from ads.feature_store.statistics.statistics import Statistics from ads.feature_store.statistics_config import StatisticsConfig from ads.feature_store.service.oci_lineage import OCILineage from ads.feature_store.model_details import ModelDetails @@ -113,6 +122,7 @@ class Dataset(Builder): CONST_ITEMS = "items" CONST_LAST_JOB_ID = "jobId" CONST_MODEL_DETAILS = "modelDetails" + CONST_FEATURE_GROUP = "datasetFeatureGroups" attribute_map = { CONST_ID: "id", @@ -130,6 +140,7 @@ class Dataset(Builder): CONST_LIFECYCLE_STATE: "lifecycle_state", CONST_MODEL_DETAILS: "model_details", CONST_PARTITION_KEYS: "partition_keys", + CONST_FEATURE_GROUP: "dataset_feature_groups", } def __init__(self, spec: Dict = None, **kwargs) -> None: @@ -148,11 +159,12 @@ def __init__(self, spec: Dict = None, **kwargs) -> None: super().__init__(spec=spec, **deepcopy(kwargs)) # Specify oci Dataset instance self.dataset_job = None + self._is_manual_association: bool = False self._spark_engine = None self.oci_dataset = self._to_oci_dataset(**kwargs) self.lineage = OCILineage(**kwargs) - def _to_oci_dataset(self, **kwargs): + def _to_oci_dataset(self, **kwargs) -> OCIDataset: """Creates an `OCIDataset` instance from the `Dataset`. kwargs @@ -180,6 +192,16 @@ def spark_engine(self): self._spark_engine = SparkEngine(get_metastore_id(self.feature_store_id)) return self._spark_engine + @property + def is_manual_association(self): + collection: DatasetFeatureGroupCollection = self.get_spec( + self.CONST_FEATURE_GROUP + ) + if collection and collection.is_manual_association is not None: + return collection.is_manual_association + else: + return self._is_manual_association + @property def kind(self) -> str: """The kind of the object as showing in a YAML.""" @@ -213,8 +235,8 @@ def name(self) -> str: return self.get_spec(self.CONST_NAME) @name.setter - def name(self, name: str) -> "Dataset": - return self.with_name(name) + def name(self, name: str): + self.with_name(name) def with_name(self, name: str) -> "Dataset": """Sets the name. @@ -242,6 +264,16 @@ def id(self) -> str: """ return self.get_spec(self.CONST_ID) + @property + def features(self) -> List[DatasetFeature]: + return [ + DatasetFeature(**feature_dict) + for feature_dict in self.get_spec(self.CONST_OUTPUT_FEATURE_DETAILS)[ + self.CONST_ITEMS + ] + or [] + ] + def with_id(self, id: str) -> "Dataset": return self.set_spec(self.CONST_ID, id) @@ -475,6 +507,20 @@ def with_statistics_config( self.CONST_STATISTICS_CONFIG, statistics_config_in.to_dict() ) + def target_delta_table(self): + """ + Returns the fully-qualified name of the target table for storing delta data. + + The name of the target table is constructed by concatenating the entity ID + and the name of the table, separated by a dot. The resulting string has the + format 'entity_id.table_name'. + + Returns: + str: The fully-qualified name of the target delta table. + """ + target_table = f"{self.entity_id}.{self.name}" + return target_table + @property def model_details(self) -> "ModelDetails": return self.get_spec(self.CONST_MODEL_DETAILS) @@ -503,6 +549,54 @@ def with_model_details(self, model_details: ModelDetails) -> "Dataset": return self.set_spec(self.CONST_MODEL_DETAILS, model_details.to_dict()) + @property + def feature_groups(self) -> List["FeatureGroup"]: + collection: "DatasetFeatureGroupCollection" = self.get_spec( + self.CONST_FEATURE_GROUP + ) + feature_groups: List["FeatureGroup"] = [] + if collection and collection.items: + for datasetFGSummary in collection.items: + feature_groups.append( + FeatureGroup.from_id(datasetFGSummary.feature_group_id) + ) + + return feature_groups + + @feature_groups.setter + def feature_groups(self, feature_groups: List["FeatureGroup"]): + self.with_feature_groups(feature_groups) + + def with_feature_groups(self, feature_groups: List["FeatureGroup"]) -> "Dataset": + """Sets the model details for the dataset. + + Parameters + ---------- + feature_groups: List of feature groups + Returns + ------- + Dataset + The Dataset instance (self). + + """ + collection: List["DatasetFeatureGroupSummary"] = [] + for group in feature_groups: + collection.append(DatasetFeatureGroupSummary(feature_group_id=group.id)) + + self._is_manual_association = True + return self.set_spec( + self.CONST_FEATURE_GROUP, + DatasetFeatureGroupCollection(items=collection, is_manual_association=True), + ) + + def feature_groups_to_df(self): + return pd.DataFrame.from_records( + [ + feature_group.oci_feature_group.to_df_record() + for feature_group in self.feature_groups + ] + ) + @property def partition_keys(self) -> List[str]: return self.get_spec(self.CONST_PARTITION_KEYS) @@ -560,7 +654,9 @@ def add_models(self, model_details: ModelDetails) -> "Dataset": f"Dataset update Failed with : {type(ex)} with error message: {ex}" ) if existing_model_details: - self.with_model_details(ModelDetails().with_items(existing_model_details["items"])) + self.with_model_details( + ModelDetails().with_items(existing_model_details["items"]) + ) else: self.with_model_details(ModelDetails().with_items([])) return self @@ -612,7 +708,7 @@ def show(self, rankdir: str = GraphOrientation.LEFT_RIGHT) -> None: f"Can't get lineage information for Feature group id {self.id}" ) - def create(self, **kwargs) -> "Dataset": + def create(self, validate_sql=False, **kwargs) -> "Dataset": """Creates dataset resource. !!! note "Lazy" @@ -625,6 +721,8 @@ def create(self, **kwargs) -> "Dataset": kwargs Additional kwargs arguments. Can be any attribute that `oci.feature_store.models.Dataset` accepts. + validate_sql: + Boolean value indicating whether to validate sql before creating dataset Returns ------- @@ -645,6 +743,9 @@ def create(self, **kwargs) -> "Dataset": if self.statistics_config is None: self.statistics_config = StatisticsConfig() + if validate_sql is True: + self.spark_engine.sql(self.get_spec(self.CONST_QUERY)) + payload = deepcopy(self._spec) payload.pop("id", None) logger.debug(f"Creating a dataset resource with payload {payload}") @@ -652,6 +753,7 @@ def create(self, **kwargs) -> "Dataset": # Create dataset logger.info("Saving dataset.") self.oci_dataset = self._to_oci_dataset(**kwargs).create() + self._update_from_oci_dataset_model(self.oci_dataset) self.with_id(self.oci_dataset.id) return self @@ -690,6 +792,28 @@ def delete(self): dataset_execution_strategy.delete_dataset(self, dataset_job) + def get_features(self) -> List[DatasetFeature]: + """ + Returns all the features in the dataset. + + Returns: + List[DatasetFeature] + """ + + return self.features + + def get_features_df(self) -> "pandas.DataFrame": + """ + Returns all the features as pandas dataframe. + + Returns: + pandas.DataFrame + """ + records = [] + for feature in self.features: + records.append({"name": feature.feature_name, "type": feature.feature_type}) + return pandas.DataFrame.from_records(records) + def update(self, **kwargs) -> "Dataset": """Updates Dataset in the feature store. @@ -733,8 +857,19 @@ def _update_from_oci_dataset_model(self, oci_dataset: OCIDataset) -> "Dataset": for infra_attr, dsc_attr in self.attribute_map.items(): if infra_attr in dataset_details: - self.set_spec(infra_attr, dataset_details[infra_attr]) - + if infra_attr == self.CONST_OUTPUT_FEATURE_DETAILS: + # May not need if we fix the backend and add dataset_id to the output_feature + features_list = [] + for output_feature in dataset_details[infra_attr]["items"]: + output_feature["datasetId"] = dataset_details[self.CONST_ID] + features_list.append(output_feature) + + value = {self.CONST_ITEMS: features_list} + elif infra_attr == self.CONST_FEATURE_GROUP: + value = getattr(self.oci_dataset, dsc_attr) + else: + value = dataset_details[infra_attr] + self.set_spec(infra_attr, value) return self def materialise( @@ -773,6 +908,34 @@ def materialise( dataset_execution_strategy.ingest_dataset(self, dataset_job) + def get_last_job(self) -> "DatasetJob": + """Gets the Job details for the last running Dataset job. + + Returns: + DatasetJob + """ + + if not self.id: + raise ValueError( + "Dataset needs to be saved to the feature store before getting associated jobs." + ) + + if not self.job_id: + ds_job = DatasetJob.list( + dataset_id=self.id, + compartment_id=self.compartment_id, + sort_by="timeCreated", + limit="1", + ) + if not ds_job: + raise ValueError( + "Unable to retrieve the associated last job. Please make sure you materialized the data." + ) + self.with_job_id(ds_job[0].id) + return ds_job[0] + return DatasetJob.from_id(self.job_id) + + @deprecated(details="preview functionality is deprecated. Please use as_of.") def preview( self, row_count: int = 10, @@ -797,6 +960,8 @@ def preview( spark dataframe The preview result in spark dataframe """ + self.check_resource_materialization() + validate_delta_format_parameters(timestamp, version_number) target_table = f"{self.entity_id}.{self.name}" @@ -806,6 +971,43 @@ def preview( return self.spark_engine.sql(sql_query) + def check_resource_materialization(self): + """Checks whether the target Delta table for this resource has been materialized in Spark. + If the target Delta table doesn't exist, raises a NotMaterializedError with the type and name of this resource. + """ + if not self.spark_engine.is_delta_table_exists(self.target_delta_table()): + raise NotMaterializedError(self.type, self.name) + + def as_of( + self, + version_number: int = None, + commit_timestamp: datetime = None, + ): + """preview the feature definition and return the response in dataframe. + + Parameters + ---------- + commit_timestamp: datetime + commit date time to preview in format yyyy-MM-dd or yyyy-MM-dd HH:mm:ss + commit date time is maintained for every ingestion commit using delta lake + version_number: int + commit version number for the preview. Version numbers are automatically versioned for every ingestion + commit using delta lake + + Returns + ------- + spark dataframe + The preview result in spark dataframe + """ + self.check_resource_materialization() + + validate_delta_format_parameters(commit_timestamp, version_number) + target_table = self.target_delta_table() + + return self.spark_engine.get_time_version_data( + target_table, version_number, commit_timestamp + ) + def profile(self): """Get the dataset profile information and return the response in dataframe. @@ -814,6 +1016,8 @@ def profile(self): spark dataframe The profile result in spark dataframe """ + self.check_resource_materialization() + target_table = f"{self.entity_id}.{self.name}" sql_query = f"DESCRIBE DETAIL {target_table}" @@ -835,6 +1039,8 @@ def restore(self, version_number: int = None, timestamp: datetime = None): spark dataframe The restore output as spark dataframe """ + self.check_resource_materialization() + validate_delta_format_parameters(timestamp, version_number, True) target_table = f"{self.entity_id}.{self.name}" if version_number is not None: @@ -884,14 +1090,8 @@ def get_statistics(self, job_id: str = None) -> "Statistics": raise ValueError( "Dataset needs to be saved to the feature store before retrieving the statistics" ) - stat_job_id = job_id - if job_id is None: - if self.job_id is None: - raise ValueError( - "Unable to retrieve the last job,please provide the job id,make sure you materialised the data'" - ) - else: - stat_job_id = self.job_id + + stat_job_id = job_id if job_id is not None else self.get_last_job().id # TODO: take the one in memory or will list down job ids and find the latest dataset_job = DatasetJob.from_id(stat_job_id) @@ -917,14 +1117,8 @@ def get_validation_output(self, job_id: str = None) -> "ValidationOutput": raise ValueError( "Dataset needs to be saved to the feature store before retrieving the validation report" ) - validation_job_id = job_id - if job_id is None: - if self.job_id is None: - raise ValueError( - "Unable to retrieve the last job,please provide the job id,make sure you materialised the data'" - ) - else: - validation_job_id = self.job_id + + validation_job_id = job_id if job_id is not None else self.get_last_job().id # retrieve the validation output JSON from data_flow_batch_execution_output dataset_job = DatasetJob.from_id(validation_job_id) @@ -1013,8 +1207,14 @@ def to_dict(self) -> Dict: for key, value in spec.items(): if hasattr(value, "to_dict"): value = value.to_dict() - spec[key] = value - + if key == self.CONST_FEATURE_GROUP: + spec[ + key + ] = self.oci_dataset.client.base_client.sanitize_for_serialization( + value + ) + else: + spec[key] = value return { "kind": self.kind, "type": self.type, diff --git a/ads/feature_store/docs/Makefile b/ads/feature_store/docs/Makefile index f93e7ea53..36284073e 100644 --- a/ads/feature_store/docs/Makefile +++ b/ads/feature_store/docs/Makefile @@ -18,7 +18,7 @@ help: .PHONY: help Makefile # Catch-all target: route all unknown targets to Sphinx using the new -# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +# "make mode" option. $(O) is me`ant as a shortcut for $(SPHINXOPTS). %: Makefile @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/ads/feature_store/docs/requirements.txt b/ads/feature_store/docs/requirements.txt index f1b91c78f..4e340b00a 100644 --- a/ads/feature_store/docs/requirements.txt +++ b/ads/feature_store/docs/requirements.txt @@ -1,10 +1,15 @@ -sphinx -nbsphinx -pandoc -ipython -sphinx_rtd_theme autodoc +nbsphinx +sphinx sphinxcontrib-napoleon -sphinx_code_tabs sphinx_copybutton +sphinx_code_tabs sphinx-autobuild +sphinx-autorun +oracle_ads +furo +IPython +pandoc +rstcheck +restructuredtext_lint +doc8 diff --git a/ads/feature_store/docs/source/conf.py b/ads/feature_store/docs/source/conf.py index 222fb7116..65c948a3e 100644 --- a/ads/feature_store/docs/source/conf.py +++ b/ads/feature_store/docs/source/conf.py @@ -9,7 +9,7 @@ sys.path.insert(0, os.path.abspath("../../")) -version = "1.2" +version = "1.0.2" release = version @@ -28,10 +28,13 @@ "sphinx.ext.napoleon", "sphinx.ext.autodoc", "sphinx.ext.doctest", + "sphinx.ext.mathjax", "sphinx.ext.ifconfig", + "sphinx.ext.autodoc", "sphinx.ext.todo", "sphinx.ext.extlinks", "sphinx.ext.intersphinx", + "sphinx.ext.graphviz", "nbsphinx", "sphinx_code_tabs", "sphinx_copybutton", @@ -54,7 +57,7 @@ # Get version -version = "1.2" +version = "1.0.2" release = version # Unless we want to expose real buckets and namespaces @@ -77,6 +80,9 @@ html_theme_options = { "light_logo": "logo-light-mode.png", "dark_logo": "logo-dark-mode.png", + "collapse_navigation": True, + "sticky_navigation": True, + "navigation_depth": 4, } # Add any paths that contain custom static files (such as style sheets) here, diff --git a/ads/feature_store/docs/source/dataset.rst b/ads/feature_store/docs/source/dataset.rst index 5275fe20a..0fe9700af 100644 --- a/ads/feature_store/docs/source/dataset.rst +++ b/ads/feature_store/docs/source/dataset.rst @@ -220,6 +220,14 @@ The ``get_statistics()`` method takes the following optional parameter: .. image:: figures/dataset_statistics.png +.. code-block:: python3 + + # Fetch and visualize stats for a dataset job + df = dataset.get_statistics(job_id).to_viz() + +.. image:: figures/dataset_statistics_viz.png + + .. seealso:: :ref:`Statistics` @@ -227,27 +235,21 @@ The ``get_statistics()`` method takes the following optional parameter: Get features ============ -You can call the ``get_features_dataframe()`` method of the Dataset instance to fetch features in a dataset. +You can call the ``get_features_df()`` method of the Dataset instance to fetch features in a dataset. .. code-block:: python3 # Fetch features for a dataset - df = dataset.get_features_dataframe() + df = dataset.get_features_df() df.show() -Get input schema details -======================== -You can call the ``get_input_schema_dataframe()`` method of the Dataset instance to fetch input schema details of a dataset. - -.. code-block:: python3 - - # Fetch input schema details for a dataset - df = dataset.get_input_schema_dataframe() - df.show() Preview ======== +.. deprecated:: 1.0.3 + Use :func:`as_of` instead. + You can call the ``preview()`` method of the Dataset instance to preview the dataset. The ``.preview()`` method takes the following optional parameter: @@ -261,6 +263,21 @@ The ``.preview()`` method takes the following optional parameter: df = dataset.preview(row_count=50) df.show() +as_of +======= + +You can call the ``as_of()`` method of the Dataset instance to get specified point in time and time traveled data. + +The ``.as_of()`` method takes the following optional parameter: + +- ``commit_timestamp: date-time``. Commit timestamp for dataset +- ``version_number: int``. Version number for dataset + +.. code-block:: python3 + + # as_of feature group + df = dataset.as_of(version_number=1) + Restore ======= diff --git a/ads/feature_store/docs/source/feature_group.rst b/ads/feature_store/docs/source/feature_group.rst index 999bfdadb..860280336 100644 --- a/ads/feature_store/docs/source/feature_group.rst +++ b/ads/feature_store/docs/source/feature_group.rst @@ -18,15 +18,19 @@ A ``FeatureGroup`` instance will be created. :caption: Python from ads.feature_store.feature_group import FeatureGroup + # Dictionary containing arguments for the feature group for the transformation function. + transformation_kwargs = {} feature_group_flights = ( FeatureGroup() .with_feature_store_id(feature_store.id) .with_primary_keys(["col1"]) + .with_partition_keys(["col1", "col2"]) .with_name("flights_feature_group") .with_entity_id("") .with_compartment_id("ocid1.compartment..") .with_schema_details_from_dataframe(dataframe) + .with_transformation_kwargs(transformation_kwargs) ) .. code-tab:: Python3 @@ -52,6 +56,9 @@ A ``FeatureGroup`` instance will be created. primaryKeys: items: - name: col1 + partitionKeys: + items: + - name: col1 statisticsConfig: isEnabled: true type: featureGroup @@ -247,6 +254,13 @@ The ``get_statistics()`` method takes the following optional parameter: .. image:: figures/stats_1.png +.. code-block:: python3 + + # Fetch and visualize stats for a dataset job + df = feature_group.get_statistics(job_id).to_viz() + +.. image:: figures/feature_group_statistics_viz.png + .. seealso:: :ref:`Statistics` @@ -283,9 +297,13 @@ Feature store provides an API similar to Pandas to join feature groups together # Filter feature group feature_group.filter(feature_group.col1 > 10).show() + Preview ======= +.. deprecated:: 1.0.3 + Use :func:`as_of` instead. + You can call the ``preview()`` method of the FeatureGroup instance to preview the feature group. The ``.preview()`` method takes the following optional parameter: @@ -299,6 +317,21 @@ The ``.preview()`` method takes the following optional parameter: # Preview feature group df = feature_group.preview(row_count=50) +as_of +======= + +You can call the ``as_of()`` method of the FeatureGroup instance to get specified point in time and time traveled data. + +The ``.as_of()`` method takes the following optional parameter: + +- ``commit_timestamp: date-time``. Commit timestamp for feature group +- ``version_number: int``. Version number for feature group + +.. code-block:: python3 + + # as_of feature group + df = feature_group.as_of(version_number=1) + Restore ======= diff --git a/ads/feature_store/docs/source/figures/dataset_statistics_viz.png b/ads/feature_store/docs/source/figures/dataset_statistics_viz.png new file mode 100644 index 000000000..3f79725e2 Binary files /dev/null and b/ads/feature_store/docs/source/figures/dataset_statistics_viz.png differ diff --git a/ads/feature_store/docs/source/figures/feature_group_statistics_viz.png b/ads/feature_store/docs/source/figures/feature_group_statistics_viz.png new file mode 100644 index 000000000..29cc74780 Binary files /dev/null and b/ads/feature_store/docs/source/figures/feature_group_statistics_viz.png differ diff --git a/ads/feature_store/docs/source/index.rst b/ads/feature_store/docs/source/index.rst index ac55d4021..ac73383b9 100644 --- a/ads/feature_store/docs/source/index.rst +++ b/ads/feature_store/docs/source/index.rst @@ -3,7 +3,7 @@ Welcome to oci-feature-store's documentation! ============================================= .. toctree:: - :maxdepth: 2 + :maxdepth: 3 :caption: Contents: overview diff --git a/ads/feature_store/docs/source/quickstart.rst b/ads/feature_store/docs/source/quickstart.rst index 4b3655cf7..e7942aa10 100644 --- a/ads/feature_store/docs/source/quickstart.rst +++ b/ads/feature_store/docs/source/quickstart.rst @@ -6,8 +6,7 @@ Quick start .. code-block:: shell - odsc conda install --uri https://objectstorage.us-ashburn-1.oraclecloud.com/n/bigdatadatasciencelarge/b/service-conda-packs-fs/o/service_pack/cpu/PySpark_3.2_and_Feature_Store/1.0/fspyspark32_p38_cpu_v1#conda - + odsc conda install -s fspyspark32_p38_cpu_v1 3. Download the notebooks from the example notebook section. .. seealso:: @@ -73,19 +72,24 @@ Background reading to understand the concepts of Feature Store and OCI Data Scie # step2: Create feature store - def transactions_df(dataframe): + def transactions_df(dataframe, **kwargs): + columns = kwargs.get('columns', '*') # Default to select all columns if 'columns' not provided + where_clause = kwargs.get('where_clause', '') # Default to empty where clause if 'where_clause' not provided + sql_query = f""" SELECT - col1, - col2 + {columns} FROM - {dataframe} + {table_name} + {where_clause} """ return sql_query + transformation = feature_store.create_transformation( + transformation_mode=TransformationMode.SQL, + source_code_func=transactions_df + ) - transformation = feature_store.create_transformation(transformation_mode=TransformationMode.SQL, - source_code_func=transactions_df) # step3: Create expectation expectation_suite = ExpectationSuite(expectation_suite_name="feature_definition") @@ -102,14 +106,17 @@ Background reading to understand the concepts of Feature Store and OCI Data Scie stats_config = StatisticsConfig().with_is_enabled(False) # step5: Create feature group + transformation_args = {"columns": "col1, col2", "where_clause": "col3 > 100"} feature_group = entity.create_feature_group( - ["name"], + primary_keys=["name"], + partition_keys=["name"], input_feature_details, - expectation_suite, - ExpectationType.LENIENT, - stats_config, + expectation_suite=expectation_suite, + expectation_type=ExpectationType.LENIENT, + statistics_config=stats_config, name="", - transformation_id=transformation.id + transformation_id=transformation.id, + transformation_kwargs=transformation_args ) diff --git a/ads/feature_store/docs/source/release_notes.rst b/ads/feature_store/docs/source/release_notes.rst index 2b21a683b..66b17e702 100644 --- a/ads/feature_store/docs/source/release_notes.rst +++ b/ads/feature_store/docs/source/release_notes.rst @@ -3,8 +3,35 @@ ============= Release Notes ============= -1.2 ---- +1.0.3 +----- +.. note:: + + .. list-table:: + :header-rows: 1 + + * - Package Name + - Latest Version + - Notes + * - SERVICE_VERSION + - 0.1.225.master + - + +Release notes: August 10, 2023 + +* [FEATURE] Addition of ``featurestore_dataset`` as optional parameter in GenericModel ``save`` function. +* [FEATURE] Addition of ``transformation_kwargs`` in ``Transformation`` entity. +* [FEATURE] Addition of partition keys in ``FeatureGroup`` and ``Dataset`` +* [FEATURE] OCI Marketplace integration of the stack. +* [FEATURE] Conda release of feature store pack ``fspyspark32_p38_cpu_v1`` +* [FIX] Validation of model ids when associated with ``Dataset`` +* [UI] Stats visualisation of feature group and dataset. +* [UI] Global search for feature store entities. +* [UI] Addition of onboarding page for feature store. +* [UI] Redirection of entities within the lineage tab of feature group and dataset. + +1.0.2 +----- .. note:: .. list-table:: @@ -32,8 +59,8 @@ Release notes: July 18, 2023 * [DOCS] For ``ValidationOutput`` instance, addition of ``to_summary()`` method for validation summary details. * [DOCS] For ``ValidationOutput`` instance, addition of ``to_pandas()`` method for validation detailed report. -1.1 ---- +1.0.1 +----- .. note:: @@ -63,8 +90,8 @@ Release notes: July 5, 2023 * [DOCS] Data Type update for Offline Feature Type COMPLEX * [DOCS] Updated terraform default version as 1.1.x -1.0 ---- +1.0.0 +---- .. note:: diff --git a/ads/feature_store/execution_strategy/engine/spark_engine.py b/ads/feature_store/execution_strategy/engine/spark_engine.py index b0ddc6b13..77b2b3747 100644 --- a/ads/feature_store/execution_strategy/engine/spark_engine.py +++ b/ads/feature_store/execution_strategy/engine/spark_engine.py @@ -5,6 +5,7 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import logging +from datetime import datetime from ads.common.decorator.runtime_dependency import OptionalDependency @@ -17,7 +18,7 @@ ) except Exception as e: raise -from typing import List +from typing import List, Dict from ads.feature_store.common.utils.feature_schema_mapper import ( map_spark_type_to_feature_type, @@ -36,6 +37,71 @@ def __init__(self, metastore_id: str = None, spark_session: SparkSession = None) else: self.spark = SparkSessionSingleton(metastore_id).get_spark_session() + self.managed_table_location = ( + SparkSessionSingleton().get_managed_table_location() + ) + + def get_time_version_data( + self, + delta_table_name: str, + version_number: int = None, + timestamp: datetime = None, + ): + split_db_name = delta_table_name.split(".") + + # Get the Delta table path + delta_table_path = ( + f"{self.managed_table_location}/{split_db_name[0].lower()}.db/{split_db_name[1]}" + if self.managed_table_location + else self._get_delta_table_path(delta_table_name) + ) + + # Set read options based on version_number and timestamp + read_options = {} + if version_number is not None: + read_options["versionAsOf"] = version_number + if timestamp: + read_options["timestampAsOf"] = timestamp + + # Load the data from the Delta table using specified read options + df = self._read_delta_table(delta_table_path, read_options) + return df + + def _get_delta_table_path(self, delta_table_name: str) -> str: + """ + Get the path of the Delta table using DESCRIBE EXTENDED SQL command. + + Args: + delta_table_name (str): The name of the Delta table. + + Returns: + str: The path of the Delta table. + """ + delta_table_path = ( + self.spark.sql(f"DESCRIBE EXTENDED {delta_table_name}") + .filter("col_name = 'Location'") + .collect()[0][1] + ) + return delta_table_path + + def _read_delta_table(self, delta_table_path: str, read_options: Dict): + """ + Read the Delta table using specified read options. + + Args: + delta_table_path (str): The path of the Delta table. + read_options (dict): Dictionary of read options for Delta table. + + Returns: + DataFrame: The loaded DataFrame from the Delta table. + """ + df = ( + self.spark.read.format("delta") + .options(**read_options) + .load(delta_table_path) + ) + return df + def sql( self, query: str, diff --git a/ads/feature_store/execution_strategy/spark/spark_execution.py b/ads/feature_store/execution_strategy/spark/spark_execution.py index 53ba19898..caa74dd46 100644 --- a/ads/feature_store/execution_strategy/spark/spark_execution.py +++ b/ads/feature_store/execution_strategy/spark/spark_execution.py @@ -1,7 +1,5 @@ #!/usr/bin/env python # -*- coding: utf-8; -*- -import json - # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ @@ -16,6 +14,7 @@ show_validation_summary, ) from ads.feature_store.execution_strategy.engine.spark_engine import SparkEngine +import traceback try: from pyspark.sql import DataFrame @@ -28,8 +27,6 @@ raise from ads.feature_store.common.enums import ( - FeatureStoreJobType, - LifecycleState, EntityType, ExpectationType, ) @@ -46,6 +43,11 @@ from ads.feature_store.feature_statistics.statistics_service import StatisticsService from ads.feature_store.common.utils.utility import validate_input_feature_details +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ads.feature_store.feature_group import FeatureGroup + from ads.feature_store.dataset import Dataset logger = logging.getLogger(__name__) @@ -75,7 +77,10 @@ def __init__(self, metastore_id: str = None): self._jvm = self._spark_context._jvm def ingest_feature_definition( - self, feature_group, feature_group_job: FeatureGroupJob, dataframe + self, + feature_group: "FeatureGroup", + feature_group_job: FeatureGroupJob, + dataframe, ): try: self._save_offline_dataframe(dataframe, feature_group, feature_group_job) @@ -89,7 +94,7 @@ def ingest_dataset(self, dataset, dataset_job: DatasetJob): raise SparkExecutionException(e).with_traceback(e.__traceback__) def delete_feature_definition( - self, feature_group, feature_group_job: FeatureGroupJob + self, feature_group: "FeatureGroup", feature_group_job: FeatureGroupJob ): """ Deletes a feature definition from the system. @@ -121,7 +126,7 @@ def delete_feature_definition( output_details=output_details, ) - def delete_dataset(self, dataset, dataset_job: DatasetJob): + def delete_dataset(self, dataset: "Dataset", dataset_job: DatasetJob): """ Deletes a dataset from the system. @@ -153,7 +158,7 @@ def delete_dataset(self, dataset, dataset_job: DatasetJob): ) @staticmethod - def _validate_expectation(expectation_type, validation_output): + def _validate_expectation(expectation_type, validation_output: dict): """ Validates the expectation based on the given expectation type and the validation output. @@ -291,8 +296,9 @@ def _save_offline_dataframe( except Exception as ex: error_details = str(ex) + tb = traceback.format_exc() logger.error( - f"FeatureGroup Materialization Failed with : {type(ex)} with error message: {ex}" + f"FeatureGroup Materialization Failed with : {type(ex)} with error message: {ex} and stacktrace {tb}", ) show_ingestion_summary( @@ -427,8 +433,9 @@ def _save_dataset_input(self, dataset, dataset_job: DatasetJob): except Exception as ex: error_details = str(ex) + tb = traceback.format_exc() logger.error( - f"Dataset Materialization Failed with : {type(ex)} with error message: {ex}" + f"Dataset Materialization Failed with : {type(ex)} with error message: {ex} and stacktrace {tb}" ) show_ingestion_summary( diff --git a/ads/feature_store/feature_group.py b/ads/feature_store/feature_group.py index 0afd4dcd4..c11fc34a9 100644 --- a/ads/feature_store/feature_group.py +++ b/ads/feature_store/feature_group.py @@ -13,6 +13,7 @@ import pandas as pd from great_expectations.core import ExpectationSuite +from ads import deprecated from ads.common import utils from ads.common.decorator.runtime_dependency import OptionalDependency from ads.common.oci_mixin import OCIModelMixin @@ -43,7 +44,7 @@ from ads.feature_store.service.oci_feature_group import OCIFeatureGroup from ads.feature_store.service.oci_feature_group_job import OCIFeatureGroupJob from ads.feature_store.service.oci_lineage import OCILineage -from ads.feature_store.statistics import Statistics +from ads.feature_store.statistics.statistics import Statistics from ads.feature_store.statistics_config import StatisticsConfig from ads.feature_store.validation_output import ValidationOutput @@ -179,7 +180,7 @@ def __init__(self, spec: Dict = None, **kwargs) -> None: # Specify oci FeatureGroup instance self.feature_group_job = None self._spark_engine = None - self.oci_feature_group = self._to_oci_feature_group(**kwargs) + self.oci_feature_group: OCIFeatureGroup = self._to_oci_feature_group(**kwargs) self.dsc_job = OCIFeatureGroupJob() self.lineage = OCILineage(**kwargs) @@ -243,8 +244,8 @@ def name(self) -> str: return self.get_spec(self.CONST_NAME) @name.setter - def name(self, name: str) -> "FeatureGroup": - return self.with_name(name) + def name(self, name: str): + self.with_name(name) def with_name(self, name: str) -> "FeatureGroup": """Sets the name. @@ -337,7 +338,7 @@ def transformation_kwargs(self, value: Dict): self.with_transformation_kwargs(value) def with_transformation_kwargs( - self, transformation_kwargs: Dict = {} + self, transformation_kwargs: Dict = () ) -> "FeatureGroup": """Sets the primary keys of the feature group. @@ -431,6 +432,11 @@ def with_transformation_id(self, transformation_id: str) -> "FeatureGroup": FeatureGroup The FeatureGroup instance (self) """ + + # Initialize the empty dictionary as transformation arguemnts if not specified + if not self.transformation_kwargs: + self.with_transformation_kwargs() + return self.set_spec(self.CONST_TRANSFORMATION_ID, transformation_id) def _with_lifecycle_state(self, lifecycle_state: str) -> "FeatureGroup": @@ -598,7 +604,6 @@ def with_statistics_config( FeatureGroup The FeatureGroup instance (self). """ - statistics_config_in = None if isinstance(statistics_config, StatisticsConfig): statistics_config_in = statistics_config elif isinstance(statistics_config, bool): @@ -748,7 +753,6 @@ def get_features_df(self) -> "pd.DataFrame": { "name": feature.feature_name, "type": feature.feature_type, - "feature_group_id": feature.feature_group_id, } ) return pd.DataFrame.from_records(records) @@ -921,13 +925,21 @@ def get_last_job(self) -> "FeatureGroupJob": ) if not self.job_id: - raise ValueError( - "Associated jobs cannot be retrieved before calling 'materialise' or 'delete'." + fg_job = FeatureGroupJob.list( + feature_group_id=self.id, + compartment_id=self.compartment_id, + sort_by="timeCreated", + limit="1", ) - + if not fg_job: + raise ValueError( + "Unable to retrieve the associated last job. Please make sure you materialized the data." + ) + self.with_job_id(fg_job[0].id) + return fg_job[0] return FeatureGroupJob.from_id(self.job_id) - def select(self, features: Optional[List[str]] = []) -> Query: + def select(self, features: Optional[List[str]] = ()) -> Query: """ Selects a subset of features from the feature group and returns a Query object that can be used to view the resulting dataframe. @@ -989,6 +1001,7 @@ def filter(self, f: Union[Filter, Logic]): """ return self.select().filter(f) + @deprecated(details="preview functionality is deprecated. Please use as_of.") def preview( self, row_count: int = 10, @@ -1020,10 +1033,41 @@ def preview( if version_number is not None: logger.warning("Time travel queries are not supported in current version") + sql_query = f"select * from {target_table} LIMIT {row_count}" return self.spark_engine.sql(sql_query) + def as_of( + self, + version_number: int = None, + commit_timestamp: datetime = None, + ): + """preview the feature definition and return the response in dataframe. + + Parameters + ---------- + commit_timestamp: datetime + commit date time to preview in format yyyy-MM-dd or yyyy-MM-dd HH:mm:ss + commit date time is maintained for every ingestion commit using delta lake + version_number: int + commit version number for the preview. Version numbers are automatically versioned for every ingestion + commit using delta lake + + Returns + ------- + spark dataframe + The preview result in spark dataframe + """ + self.check_resource_materialization() + + validate_delta_format_parameters(commit_timestamp, version_number) + target_table = self.target_delta_table() + + return self.spark_engine.get_time_version_data( + target_table, version_number, commit_timestamp + ) + def profile(self): """get the profile information for feature definition and return the response in dataframe. @@ -1063,7 +1107,6 @@ def restore(self, version_number: int = None, timestamp: datetime = None): f"RESTORE TABLE {target_table} TO VERSION AS OF {version_number}" ) else: - iso_timestamp = timestamp.isoformat(" ", "seconds").__str__() sql_query = f"RESTORE TABLE {target_table} TO TIMESTAMP AS OF {timestamp}" restore_output = self.spark_engine.sql(sql_query) @@ -1085,7 +1128,6 @@ def check_resource_materialization(self): """Checks whether the target Delta table for this resource has been materialized in Spark. If the target Delta table doesn't exist, raises a NotMaterializedError with the type and name of this resource. """ - print(self.target_delta_table()) if not self.spark_engine.is_delta_table_exists(self.target_delta_table()): raise NotMaterializedError(self.type, self.name) @@ -1121,28 +1163,9 @@ def list_df(cls, compartment_id: str = None, **kwargs) -> "pd.DataFrame": for oci_feature_group in OCIFeatureGroup.list_resource( compartment_id, **kwargs ): - records.append( - { - "id": oci_feature_group.id, - "name": oci_feature_group.name, - "description": oci_feature_group.description, - "time_created": oci_feature_group.time_created.strftime( - utils.date_format - ), - "time_updated": oci_feature_group.time_updated.strftime( - utils.date_format - ), - "lifecycle_state": oci_feature_group.lifecycle_state, - "created_by": f"...{oci_feature_group.created_by[-6:]}", - "compartment_id": f"...{oci_feature_group.compartment_id[-6:]}", - "primary_keys": oci_feature_group.primary_keys, - "feature_store_id": oci_feature_group.feature_store_id, - "entity_id": oci_feature_group.entity_id, - "input_feature_details": oci_feature_group.input_feature_details, - "expectation_details": oci_feature_group.expectation_details, - "statistics_config": oci_feature_group.statistics_config, - } - ) + oci_feature_group: OCIFeatureGroup = oci_feature_group + records.append(oci_feature_group.to_df_record()) + return pd.DataFrame.from_records(records) @classmethod @@ -1313,7 +1336,7 @@ def get_statistics(self, job_id: str = None) -> "Statistics": "FeatureGroup needs to be saved to the feature store before retrieving the statistics" ) - stat_job_id = self._get_job_id(job_id) + stat_job_id = job_id if job_id is not None else self.get_last_job().id # TODO: take the one in memory or will list down job ids and find the latest fg_job = FeatureGroupJob.from_id(stat_job_id) @@ -1342,7 +1365,7 @@ def get_validation_output(self, job_id: str = None) -> "ValidationOutput": "FeatureGroup needs to be saved to the feature store before retrieving the validation report" ) - validation_job_id = self._get_job_id(job_id) + validation_job_id = job_id if job_id is not None else self.get_last_job().id # Retrieve the validation output JSON from data_flow_batch_execution_output. fg_job = FeatureGroupJob.from_id(validation_job_id) diff --git a/ads/feature_store/mixin/oci_feature_store.py b/ads/feature_store/mixin/oci_feature_store.py index 813d36017..c78087a1a 100644 --- a/ads/feature_store/mixin/oci_feature_store.py +++ b/ads/feature_store/mixin/oci_feature_store.py @@ -1,23 +1,74 @@ #!/usr/bin/env python # -*- coding: utf-8; -*- +from ads.common.decorator.utils import class_or_instance_method # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +import logging +import os + +logger = logging.getLogger(__name__) from ads.common.oci_mixin import OCIModelMixin import oci.feature_store -import os +import yaml + + +try: + from yaml import CDumper as dumper + from yaml import CLoader as loader +except: + from yaml import Dumper as dumper + from yaml import Loader as loader + +try: + from odsc_cli.utils import user_fs_config_loc, FsTemplate +except ImportError: + pass class OCIFeatureStoreMixin(OCIModelMixin): + __mod_time = 0 + __template: "FsTemplate" = None + FS_SERVICE_ENDPOINT = "fs_service_endpoint" + SERVICE_ENDPOINT = "service_endpoint" + @classmethod def init_client( cls, **kwargs ) -> oci.feature_store.feature_store_client.FeatureStoreClient: - # TODO: Getting the endpoint from authorizer - fs_service_endpoint = os.environ.get("OCI_FS_SERVICE_ENDPOINT") + default_kwargs: dict = cls._get_auth().get("client_kwargs", {}) + + fs_service_endpoint = ( + kwargs.get(cls.FS_SERVICE_ENDPOINT, None) + or kwargs.get(cls.SERVICE_ENDPOINT, None) + or default_kwargs.get(cls.FS_SERVICE_ENDPOINT, None) + ) + + if not fs_service_endpoint: + try: + mod_time = os.stat(user_fs_config_loc()).st_mtime + if mod_time > cls.__mod_time: + with open(user_fs_config_loc()) as ccf: + cls.__template = FsTemplate(yaml.load(ccf, Loader=loader)) + cls.__mod_time = mod_time + except NameError: + logger.info( + "%s", + "Feature store configuration helpers are missing. " + "Support for reading service endpoint from config file is disabled", + ) + except FileNotFoundError: + logger.info( + "%s", + "ODSC cli config for feature store was not found", + ) + pass + if cls.__template: + fs_service_endpoint = cls.__template.service_endpoint + if fs_service_endpoint: - kwargs = {"service_endpoint": fs_service_endpoint} + kwargs[cls.SERVICE_ENDPOINT] = fs_service_endpoint client = cls._init_client( client=oci.feature_store.feature_store_client.FeatureStoreClient, **kwargs @@ -27,3 +78,44 @@ def init_client( @property def client(self) -> oci.feature_store.feature_store_client.FeatureStoreClient: return super().client + + @class_or_instance_method + def list_resource( + cls, compartment_id: str = None, limit: int = 0, **kwargs + ) -> list: + """Generic method to list OCI resources + + Parameters + ---------- + compartment_id : str + Compartment ID of the OCI resources. Defaults to None. + If compartment_id is not specified, + the value of NB_SESSION_COMPARTMENT_OCID in environment variable will be used. + limit : int + The maximum number of items to return. Defaults to 0, All items will be returned + **kwargs : + Additional keyword arguments to filter the resource. + The kwargs are passed into OCI API. + + Returns + ------- + list + A list of OCI resources + + Raises + ------ + NotImplementedError + List method is not supported or implemented. + + """ + if limit: + items = cls._find_oci_method("list")( + cls.check_compartment_id(compartment_id), limit=limit, **kwargs + ).data.items + else: + items = oci.pagination.list_call_get_all_results( + cls._find_oci_method("list"), + cls.check_compartment_id(compartment_id), + **kwargs, + ).data + return [cls.from_oci_model(item) for item in items] diff --git a/ads/feature_store/service/oci_feature_group.py b/ads/feature_store/service/oci_feature_group.py index 0e735d35a..f856ed548 100644 --- a/ads/feature_store/service/oci_feature_group.py +++ b/ads/feature_store/service/oci_feature_group.py @@ -5,6 +5,8 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import datetime +import pandas as pd +from ads.common import utils import oci from oci.feature_store.models import ( @@ -64,9 +66,9 @@ class OCIFeatureGroup(OCIFeatureStoreMixin, oci.feature_store.models.FeatureGrou Gets feature group by OCID. Examples -------- - >>> oci_feature_group = OCIFeatureGroup.from_id("") - >>> oci_feature_group.description = "A brand new description" - >>> oci_feature_group.delete() + >>> self = OCIFeatureGroup.from_id("") + >>> self.description = "A brand new description" + >>> self.delete() """ def create(self) -> "OCIFeatureGroup": @@ -122,6 +124,27 @@ def delete(self): """ self.client.delete_feature_group(self.id) + def to_df(self): + return pd.DataFrame.from_records([self.to_df_record()]) + + def to_df_record(self): + return { + "id": self.id, + "name": self.name, + "description": self.description, + "time_created": self.time_created.strftime(utils.date_format), + "time_updated": self.time_updated.strftime(utils.date_format), + "lifecycle_state": self.lifecycle_state, + "created_by": f"...{self.created_by[-6:]}", + "compartment_id": f"...{self.compartment_id[-6:]}", + "primary_keys": self.primary_keys, + "feature_store_id": self.feature_store_id, + "entity_id": self.entity_id, + "input_feature_details": self.input_feature_details, + "expectation_details": self.expectation_details, + "statistics_config": self.statistics_config, + } + @classmethod def from_id(cls, id: str) -> "OCIFeatureGroup": """Gets feature group resource by id. diff --git a/ads/feature_store/statistics.py b/ads/feature_store/statistics.py deleted file mode 100644 index 2ce0589c7..000000000 --- a/ads/feature_store/statistics.py +++ /dev/null @@ -1,25 +0,0 @@ -import pandas as pd -from typing import Dict -from copy import deepcopy - -from ads.feature_store.response.response_builder import ResponseBuilder -from ads.jobs.builders.base import Builder -from ads.common import utils - - -class Statistics(ResponseBuilder): - """ - Represents statistical information. - """ - - @property - def kind(self) -> str: - """ - Gets the kind of the statistics object. - - Returns - ------- - str - The kind of the statistics object, which is always "statistics". - """ - return "statistics" diff --git a/ads/feature_store/statistics/__init__.py b/ads/feature_store/statistics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ads/feature_store/statistics/charts/__init__.py b/ads/feature_store/statistics/charts/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ads/feature_store/statistics/charts/abstract_feature_stat.py b/ads/feature_store/statistics/charts/abstract_feature_stat.py new file mode 100644 index 000000000..814929b80 --- /dev/null +++ b/ads/feature_store/statistics/charts/abstract_feature_stat.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# -*- coding: utf-8; -*- +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +from abc import abstractmethod +from typing import Union + +from ads.common.decorator.runtime_dependency import OptionalDependency + +try: + from plotly.graph_objs import Figure +except ModuleNotFoundError: + raise ModuleNotFoundError( + f"The `plotly` module was not found. Please run `pip install " + f"{OptionalDependency.FEATURE_STORE}`." + ) + + +class AbsFeatureStat: + class ValidationFailedException(Exception): + def __init__(self): + pass + + def __init__(self): + self.__validate__() + + @abstractmethod + def __validate__(self): + pass + + @abstractmethod + def add_to_figure(self, fig: Figure, xaxis: int, yaxis: int): + pass + + @classmethod + @abstractmethod + def __from_json__(cls, json_dict: dict): + pass + + @staticmethod + def get_x_y_str_axes(xaxis: int, yaxis: int) -> (): + return ( + ("xaxis" + str(xaxis + 1)), + ("yaxis" + str(yaxis + 1)), + ("x" + str(xaxis + 1)), + ("y" + str(yaxis + 1)), + ) + + @classmethod + def from_json( + cls, json_dict: dict, ignore_errors: bool = False + ) -> Union["AbsFeatureStat", None]: + try: + return cls.__from_json__(json_dict=json_dict) + except Exception as e: + if ignore_errors: + return None + else: + raise e diff --git a/ads/feature_store/statistics/charts/box_plot.py b/ads/feature_store/statistics/charts/box_plot.py new file mode 100644 index 000000000..0923a8412 --- /dev/null +++ b/ads/feature_store/statistics/charts/box_plot.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +# -*- coding: utf-8; -*- +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +from typing import List + +from ads.common.decorator.runtime_dependency import OptionalDependency +from ads.feature_store.statistics.charts.abstract_feature_stat import AbsFeatureStat +from ads.feature_store.statistics.charts.frequency_distribution import ( + FrequencyDistribution, +) +from ads.feature_store.statistics.generic_feature_value import GenericFeatureValue + +try: + from plotly.graph_objs import Figure +except ModuleNotFoundError: + raise ModuleNotFoundError( + f"The `plotly` module was not found. Please run `pip install " + f"{OptionalDependency.FEATURE_STORE}`." + ) + + +class BoxPlot(AbsFeatureStat): + CONST_MIN = "Min" + CONST_MAX = "Max" + CONST_QUARTILES = "Quartiles" + CONST_SD = "StandardDeviation" + CONST_MEAN = "Mean" + CONST_BOX_PLOT_TITLE = "Box Plot" + CONST_IQR = "IQR" + CONST_BOX_POINTS = "box_points" + + class Quartiles: + CONST_Q1 = "q1" + CONST_Q2 = "q2" + CONST_Q3 = "q3" + + def __init__(self, q1: float, q2: float, q3: float): + self.q1 = q1 + self.q2 = q2 + self.q3 = q3 + + @classmethod + def from_json(cls, json_dict: dict) -> "BoxPlot.Quartiles": + return cls( + json_dict.get(cls.CONST_Q1), + json_dict.get(cls.CONST_Q2), + json_dict.get(cls.CONST_Q3), + ) + + def __init__( + self, + mean: float, + median: float, + sd: float, + q1: float, + q3: float, + box_points: List[float], + ): + self.mean = mean + self.median = median + self.q1 = q1 + self.q3 = q3 + self.sd = sd + self.iqr = self.q3 - self.q1 + self.box_points = box_points + super().__init__() + + def __validate__(self): + if ( + self.q1 is None + or self.q3 is None + or self.iqr is None + or type(self.box_points) is not list + or len(self.box_points) == 0 + ): + return self.ValidationFailedException() + + def add_to_figure(self, fig: Figure, xaxis: int, yaxis: int): + xaxis_str, yaxis_str, x_str, y_str = self.get_x_y_str_axes(xaxis, yaxis) + fig.add_box( + notched=False, + boxmean=False, + mean=[self.mean], + median=[self.median], + q1=[self.q1], + q3=[self.q3], + sd=[self.sd], + y=[self.box_points], + upperfence=[self.q3 + 1.5 * self.iqr], + lowerfence=[self.q1 - 1.5 * self.iqr], + xaxis=x_str, + yaxis=y_str, + name="", + jitter=0, + ) + fig.layout.annotations[xaxis].text = self.CONST_BOX_PLOT_TITLE + fig.layout[yaxis_str]["title"] = "Values" + + @staticmethod + def get_box_points_from_frequency_distribution( + frequency_distribution: FrequencyDistribution, + ) -> List[float]: + # box_points = [] + if ( + frequency_distribution is not None + and frequency_distribution.frequency is not None + and frequency_distribution.bins is not None + ): + return [ + bin_dist + for frequency, bin_dist in zip( + frequency_distribution.frequency, frequency_distribution.bins + ) + if frequency > 0 + ] + else: + return [] + + @classmethod + def __from_json__(cls, json_dict: dict) -> "BoxPlot": + quartiles = cls.Quartiles.from_json(json_dict.get(cls.CONST_QUARTILES)) + return cls( + mean=GenericFeatureValue.from_json(json_dict.get(cls.CONST_MEAN)).val, + median=quartiles.q2, + sd=GenericFeatureValue.from_json(json_dict.get(cls.CONST_SD)).val, + q1=quartiles.q1, + q3=quartiles.q3, + box_points=json_dict.get(cls.CONST_BOX_POINTS), + ) diff --git a/ads/feature_store/statistics/charts/frequency_distribution.py b/ads/feature_store/statistics/charts/frequency_distribution.py new file mode 100644 index 000000000..697c4ff72 --- /dev/null +++ b/ads/feature_store/statistics/charts/frequency_distribution.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +# -*- coding: utf-8; -*- +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + +from typing import List +from ads.common.decorator.runtime_dependency import OptionalDependency +from ads.feature_store.statistics.charts.abstract_feature_stat import AbsFeatureStat + +try: + from plotly.graph_objs import Figure +except ModuleNotFoundError: + raise ModuleNotFoundError( + f"The `plotly` module was not found. Please run `pip install " + f"{OptionalDependency.FEATURE_STORE}`." + ) + + +class FrequencyDistribution(AbsFeatureStat): + CONST_FREQUENCY = "frequency" + CONST_BINS = "bins" + CONST_FREQUENCY_DISTRIBUTION_TITLE = "Frequency Distribution" + + def __validate__(self): + if not ( + type(self.frequency) == list + and type(self.bins) == list + and 0 < len(self.frequency) == len(self.bins) > 0 + ): + raise self.ValidationFailedException() + + def __init__(self, frequency: List, bins: List): + self.frequency = frequency + self.bins = bins + super().__init__() + + @classmethod + def __from_json__(cls, json_dict: dict) -> "FrequencyDistribution": + return FrequencyDistribution( + frequency=json_dict.get(cls.CONST_FREQUENCY), + bins=json_dict.get(cls.CONST_BINS), + ) + + def add_to_figure(self, fig: Figure, xaxis: int, yaxis: int): + xaxis_str, yaxis_str, x_str, y_str = self.get_x_y_str_axes(xaxis, yaxis) + if ( + type(self.frequency) == list + and type(self.bins) == list + and 0 < len(self.frequency) == len(self.bins) > 0 + ): + fig.add_bar( + x=self.bins, y=self.frequency, xaxis=x_str, yaxis=y_str, name="" + ) + fig.layout.annotations[xaxis].text = self.CONST_FREQUENCY_DISTRIBUTION_TITLE + fig.layout[xaxis_str]["title"] = "Bins" + fig.layout[yaxis_str]["title"] = "Frequency" diff --git a/ads/feature_store/statistics/charts/probability_distribution.py b/ads/feature_store/statistics/charts/probability_distribution.py new file mode 100644 index 000000000..d64be76fa --- /dev/null +++ b/ads/feature_store/statistics/charts/probability_distribution.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# -*- coding: utf-8; -*- +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +from typing import List + +from ads.common.decorator.runtime_dependency import OptionalDependency +from ads.feature_store.statistics.charts.abstract_feature_stat import AbsFeatureStat + +try: + from plotly.graph_objs import Figure +except ModuleNotFoundError: + raise ModuleNotFoundError( + f"The `plotly` module was not found. Please run `pip install " + f"{OptionalDependency.FEATURE_STORE}`." + ) + + +class ProbabilityDistribution(AbsFeatureStat): + def __validate__(self): + if not ( + type(self.density) == list + and type(self.bins) == list + and 0 < len(self.density) == len(self.bins) > 0 + ): + raise self.ValidationFailedException() + + CONST_DENSITY = "density" + CONST_BINS = "bins" + CONST_PROBABILITY_DISTRIBUTION_TITLE = "Probability Distribution" + + def __init__(self, density: List, bins: List): + self.density = density + self.bins = bins + super().__init__() + + @classmethod + def __from_json__(cls, json_dict: dict) -> "ProbabilityDistribution": + return cls( + density=json_dict.get(ProbabilityDistribution.CONST_DENSITY), + bins=json_dict.get(ProbabilityDistribution.CONST_BINS), + ) + + def add_to_figure(self, fig: Figure, xaxis: int, yaxis: int): + xaxis_str, yaxis_str, x_str, y_str = self.get_x_y_str_axes(xaxis, yaxis) + if ( + type(self.density) == list + and type(self.bins) == list + and 0 < len(self.density) == len(self.bins) > 0 + ): + fig.add_bar( + x=self.bins, + y=self.density, + xaxis=x_str, + yaxis=y_str, + name="", + ) + fig.layout.annotations[xaxis].text = self.CONST_PROBABILITY_DISTRIBUTION_TITLE + fig.layout[xaxis_str]["title"] = "Bins" + fig.layout[yaxis_str]["title"] = "Density" diff --git a/ads/feature_store/statistics/charts/top_k_frequent_elements.py b/ads/feature_store/statistics/charts/top_k_frequent_elements.py new file mode 100644 index 000000000..d68840890 --- /dev/null +++ b/ads/feature_store/statistics/charts/top_k_frequent_elements.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# -*- coding: utf-8; -*- +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +from typing import List +from ads.common.decorator.runtime_dependency import OptionalDependency + +from ads.feature_store.statistics.charts.abstract_feature_stat import AbsFeatureStat + +try: + from plotly.graph_objs import Figure +except ModuleNotFoundError: + raise ModuleNotFoundError( + f"The `plotly` module was not found. Please run `pip install " + f"{OptionalDependency.FEATURE_STORE}`." + ) + + +class TopKFrequentElements(AbsFeatureStat): + def __validate__(self): + if not (type(self.elements) == list and len(self.elements) > 0): + raise self.ValidationFailedException + + CONST_VALUE = "value" + CONST_TOP_K_FREQUENT_TITLE = "Top K Frequent Elements" + + class TopKFrequentElement: + CONST_VALUE = "value" + CONST_ESTIMATE = "estimate" + CONST_LOWER_BOUND = "lower_bound" + CONST_UPPER_BOUND = "upper_bound" + + def __init__( + self, value: str, estimate: int, lower_bound: int, upper_bound: int + ): + self.value = value + self.estimate = estimate + self.lower_bound = lower_bound + self.upper_bound = upper_bound + + @classmethod + def from_json( + cls, json_dict: dict + ) -> "TopKFrequentElements.TopKFrequentElement": + return cls( + value=json_dict.get(cls.CONST_VALUE), + estimate=json_dict.get(cls.CONST_ESTIMATE), + lower_bound=json_dict.get(cls.CONST_LOWER_BOUND), + upper_bound=json_dict.get(cls.CONST_UPPER_BOUND), + ) + + def __init__(self, elements: List[TopKFrequentElement]): + self.elements = elements + super().__init__() + + @classmethod + def __from_json__(cls, json_dict: dict) -> "TopKFrequentElements": + elements = json_dict.get(cls.CONST_VALUE) + return cls([cls.TopKFrequentElement.from_json(element) for element in elements]) + + def add_to_figure(self, fig: Figure, xaxis: int, yaxis: int): + xaxis_str, yaxis_str, x_str, y_str = self.get_x_y_str_axes(xaxis, yaxis) + if type(self.elements) == list and len(self.elements) > 0: + y_axis = [element.value for element in self.elements] + x_axis = [element.estimate for element in self.elements] + fig.add_bar( + x=x_axis, y=y_axis, xaxis=x_str, yaxis=y_str, name="", orientation="h" + ) + fig.layout.annotations[xaxis].text = self.CONST_TOP_K_FREQUENT_TITLE + fig.layout[yaxis_str]["title"] = "Element" + fig.layout[xaxis_str]["title"] = "Count" diff --git a/ads/feature_store/statistics/feature_stat.py b/ads/feature_store/statistics/feature_stat.py new file mode 100644 index 000000000..27a7e9647 --- /dev/null +++ b/ads/feature_store/statistics/feature_stat.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python +# -*- coding: utf-8; -*- +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + +from ads.common.decorator.runtime_dependency import OptionalDependency +from typing import List +from ads.feature_store.statistics.charts.abstract_feature_stat import AbsFeatureStat +from ads.feature_store.statistics.charts.box_plot import BoxPlot +from ads.feature_store.statistics.charts.frequency_distribution import ( + FrequencyDistribution, +) +from ads.feature_store.statistics.charts.probability_distribution import ( + ProbabilityDistribution, +) +from ads.feature_store.statistics.charts.top_k_frequent_elements import ( + TopKFrequentElements, +) + +try: + import plotly + from plotly.graph_objs import Figure + import plotly.graph_objects as go + from plotly.subplots import make_subplots +except ModuleNotFoundError: + raise ModuleNotFoundError( + f"The `plotly` module was not found. Please run `pip install " + f"{OptionalDependency.FEATURE_STORE}`." + ) + + +class FeatureStatistics: + CONST_FREQUENCY_DISTRIBUTION = "FrequencyDistribution" + CONST_TITLE_FORMAT = "{}" + CONST_PLOT_FORMAT = "{}_plot" + CONST_PROBABILITY_DISTRIBUTION = "ProbabilityDistribution" + CONST_TOP_K_FREQUENT = "TopKFrequentElements" + + def __init__( + self, + feature_name: str, + top_k_frequent_elements: TopKFrequentElements = None, + frequency_distribution: FrequencyDistribution = None, + probability_distribution: ProbabilityDistribution = None, + box_plot: BoxPlot = None, + ): + self.feature_name: str = feature_name + self.top_k_frequent_elements = top_k_frequent_elements + self.frequency_distribution = frequency_distribution + self.probability_distribution = probability_distribution + self.box_plot = box_plot + + @classmethod + def from_json(cls, feature_name: str, json_dict: dict) -> "FeatureStatistics": + if json_dict is not None: + frequency_distribution = FrequencyDistribution.from_json( + json_dict.get(cls.CONST_FREQUENCY_DISTRIBUTION), ignore_errors=True + ) + + # inject box points for boxplot creation + json_dict[ + BoxPlot.CONST_BOX_POINTS + ] = BoxPlot.get_box_points_from_frequency_distribution( + frequency_distribution + ) + return cls( + feature_name, + TopKFrequentElements.from_json( + json_dict.get(cls.CONST_TOP_K_FREQUENT), ignore_errors=True + ), + frequency_distribution, + ProbabilityDistribution.from_json( + json_dict.get(cls.CONST_PROBABILITY_DISTRIBUTION), + ignore_errors=True, + ), + BoxPlot.from_json(json_dict, ignore_errors=True), + ) + else: + return cls(feature_name) + + @property + def __feature_stat_objects__(self) -> List[AbsFeatureStat]: + return [ + stat + for stat in [ + self.box_plot, + self.top_k_frequent_elements, + self.frequency_distribution, + self.probability_distribution, + ] + if stat is not None + ] + + def to_viz(self): + # TODO: make it generic + def next_graph_position_generator(): + yield 1 + yield 0 + yield 2 + + if len(self.__feature_stat_objects__) > 0: + fig = make_subplots(cols=3, column_titles=[" "] * 3) + for idx, stat in zip( + next_graph_position_generator(), + [stat for stat in self.__feature_stat_objects__ if stat is not None], + ): + stat.add_to_figure(fig, idx, idx) + + fig.layout.title = self.CONST_TITLE_FORMAT.format(self.feature_name) + fig.update_layout(title_font_size=20) + fig.update_layout(title_x=0.5) + fig.update_layout(showlegend=False) + plotly.offline.iplot( + fig, + filename=self.CONST_PLOT_FORMAT.format(self.feature_name), + ) + else: + print( + f"No statistical information for feature {self.feature_name} can be visualised" + ) diff --git a/ads/feature_store/statistics/generic_feature_value.py b/ads/feature_store/statistics/generic_feature_value.py new file mode 100644 index 000000000..6b52e3001 --- /dev/null +++ b/ads/feature_store/statistics/generic_feature_value.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# -*- coding: utf-8; -*- + + +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +class GenericFeatureValue: + CONST_VALUE = "value" + + def __init__(self, val: any): + self.val = val + + @classmethod + def from_json(cls, json_dict: dict) -> "GenericFeatureValue": + return GenericFeatureValue( + val=json_dict.get(cls.CONST_VALUE), + ) diff --git a/ads/feature_store/statistics/statistics.py b/ads/feature_store/statistics/statistics.py new file mode 100644 index 000000000..f8b09ff0c --- /dev/null +++ b/ads/feature_store/statistics/statistics.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- coding: utf-8; -*- +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + +from typing import List +from ads.feature_store.statistics.feature_stat import FeatureStatistics +from ads.feature_store.response.response_builder import ResponseBuilder +import json + + +class Statistics(ResponseBuilder): + """ + Represents statistical information. + """ + + @property + def kind(self) -> str: + """ + Gets the kind of the statistics object. + + Returns + ------- + str + The kind of the statistics object, which is always "statistics". + """ + return "statistics" + + def to_viz(self, feature_list: List[str] = None): + """Visualises statistics inside notebook + Parameters + ---------- + feature_list: (str, optional). Defaults to `None`. + The specific features of the FeatureGroup or Dataset we want to visualise + """ + if self.content is not None: + [ + FeatureStatistics.from_json(feature, stat).to_viz() + for feature, stat in json.loads(self.content).items() + if (feature_list is None or feature in feature_list) + ] diff --git a/setup.py b/setup.py index 4e825f6c2..379f97f01 100644 --- a/setup.py +++ b/setup.py @@ -72,9 +72,14 @@ "bds": ["ibis-framework[impala]", "hdfs[kerberos]", "sqlalchemy"], "spark": ["pyspark>=3.0.0", "delta-spark"], "huggingface": ["transformers"], - "great-expectations": ["great-expectations==0.15.39"], + "feature-store": [ + "pyspark>=3.0.0", + "delta-spark", + "great-expectations==0.15.39", + "pyarrow", + "plotly" + ], "mlm_insights": ["mlm_insights==0.1.0.dev1"], - "pyarrow": ["pyarrow"], } this_directory = Path(__file__).parent diff --git a/tests/integration/feature_store/test_as_of_for_featuregroup_and_dataset.py b/tests/integration/feature_store/test_as_of_for_featuregroup_and_dataset.py new file mode 100644 index 000000000..2f3b0ede8 --- /dev/null +++ b/tests/integration/feature_store/test_as_of_for_featuregroup_and_dataset.py @@ -0,0 +1,211 @@ +import ast +import copy +import json +import random +import string + +import pandas as pd +import pytest + +from ads.feature_store.common.enums import IngestionMode +from ads.feature_store.common.exceptions import NotMaterializedError +from ads.feature_store.dataset import Dataset +from ads.feature_store.feature_option_details import FeatureOptionDetails +from ads.feature_store.statistics_config import StatisticsConfig +from tests.integration.feature_store.test_base import FeatureStoreTestCase +from ads.feature_store.feature_group import FeatureGroup + + +class TestAsOfForFeatureGroupAndDataset(FeatureStoreTestCase): + """Contains integration tests for as_of support for feature groups and datasets""" + + # Generate random data + test_as_of_data = { + 'Name': [random.choice(['Alice', 'Bob', 'Charlie', 'David']) for _ in range(10)], + 'Age': [random.randint(20, 40) for _ in range(10)], + 'Score': [round(random.uniform(0, 100), 2) for _ in range(10)], + 'City': [random.choice(['New York', 'Los Angeles', 'Chicago', 'Houston']) for _ in range(10)], + 'Gender': [random.choice(['Male', 'Female']) for _ in range(10)], + 'ID': [''.join(random.choices(string.ascii_uppercase, k=5)) for _ in range(10)] + } + + as_of_data_frame = pd.DataFrame(test_as_of_data) + + def define_feature_group_resource( + self, entity_id, feature_store_id + ) -> "FeatureGroup": + feature_group_resource = ( + FeatureGroup() + .with_description("feature group with statistics disabled") + .with_compartment_id(self.COMPARTMENT_ID) + .with_name(self.get_name("petals2")) + .with_entity_id(entity_id) + .with_feature_store_id(feature_store_id) + .with_primary_keys(['ID']) + .with_schema_details_from_dataframe(self.as_of_data_frame) + .with_statistics_config(False) + ) + return feature_group_resource + + def define_dataset_resource( + self, entity_id, feature_store_id, feature_group_name + ) -> "Dataset": + name = self.get_name("petals_ds") + dataset_resource = ( + Dataset() + .with_description("dataset description") + .with_compartment_id(self.COMPARTMENT_ID) + .with_name(name) + .with_entity_id(entity_id) + .with_feature_store_id(feature_store_id) + .with_query(f"SELECT * FROM `{entity_id}`.{feature_group_name}") + .with_statistics_config( + StatisticsConfig(True, columns=["sepal_length", "petal_width"]) + ) + ) + return dataset_resource + + def test_as_of_for_non_materialized_feature_group(self): + fs = self.define_feature_store_resource().create() + assert fs.oci_fs.id + + entity = self.create_entity_resource(fs) + assert entity.oci_fs_entity.id + + fg = self.define_feature_group_resource( + entity.oci_fs_entity.id, fs.oci_fs.id + ).create() + assert fg.oci_feature_group.id + + with pytest.raises(NotMaterializedError): + fg.as_of() + + self.clean_up_feature_group(fg) + self.clean_up_entity(entity) + self.clean_up_feature_store(fs) + + def test_as_of_for_non_materialized_dataset(self): + fs = self.define_feature_store_resource().create() + assert fs.oci_fs.id + + entity = self.create_entity_resource(fs) + assert entity.oci_fs_entity.id + + fg = self.define_feature_group_resource( + entity.oci_fs_entity.id, fs.oci_fs.id + ).create() + assert fg.oci_feature_group.id + + fg.materialise(self.as_of_data_frame) + + dataset = self.define_dataset_resource( + entity.oci_fs_entity.id, fs.oci_fs.id, fg.name + ).create() + assert fg.oci_feature_group.id + + with pytest.raises(NotMaterializedError): + dataset.as_of() + + self.clean_up_dataset(dataset) + self.clean_up_feature_group(fg) + self.clean_up_entity(entity) + self.clean_up_feature_store(fs) + + def test_as_of_for_materialized_feature_group_for_only_one_version(self): + fs = self.define_feature_store_resource().create() + assert fs.oci_fs.id + + entity = self.create_entity_resource(fs) + assert entity.oci_fs_entity.id + + fg = self.define_feature_group_resource( + entity.oci_fs_entity.id, fs.oci_fs.id + ).create() + assert fg.oci_feature_group.id + + fg.materialise(self.as_of_data_frame) + df = fg.as_of(version_number=0) + + assert df + assert len(df.columns) == len(self.test_as_of_data) + + self.clean_up_feature_group(fg) + self.clean_up_entity(entity) + self.clean_up_feature_store(fs) + + def test_as_of_for_materialized_feature_group_for_multiple_version(self): + fs = self.define_feature_store_resource().create() + assert fs.oci_fs.id + + entity = self.create_entity_resource(fs) + assert entity.oci_fs_entity.id + + fg = self.define_feature_group_resource( + entity.oci_fs_entity.id, fs.oci_fs.id + ).create() + assert fg.oci_feature_group.id + + fg.materialise(self.as_of_data_frame) + + # Now Update the Payload and materialize again + new_data = copy.deepcopy(self.test_as_of_data) + new_data.pop("Score") + df = pd.DataFrame(new_data) + + fg.with_schema_details_from_dataframe(df) + fg.update() + + feature_option_write_config_details = ( + FeatureOptionDetails().with_feature_option_write_config_details( + overwrite_schema=True + ) + ) + fg.materialise( + input_dataframe=df, + feature_option_details=feature_option_write_config_details, + ingestion_mode=IngestionMode.OVERWRITE, + ) + df = fg.as_of(version_number=0) + + assert df + assert len(df.columns) == len(self.test_as_of_data) + + df = fg.as_of(version_number=1) + + assert df + assert len(df.columns) == len(new_data) + + self.clean_up_feature_group(fg) + self.clean_up_entity(entity) + self.clean_up_feature_store(fs) + + def test_as_of_for_materialized_dataset(self): + fs = self.define_feature_store_resource().create() + assert fs.oci_fs.id + + entity = self.create_entity_resource(fs) + assert entity.oci_fs_entity.id + + fg = self.define_feature_group_resource( + entity.oci_fs_entity.id, fs.oci_fs.id + ).create() + assert fg.oci_feature_group.id + + fg.materialise(self.as_of_data_frame) + + dataset = self.define_dataset_resource( + entity.oci_fs_entity.id, fs.oci_fs.id, fg.name + ).create() + assert fg.oci_feature_group.id + + dataset.materialise() + + df = dataset.as_of(version_number=0) + + assert df + assert len(df.columns) == len(self.test_as_of_data) + + self.clean_up_dataset(dataset) + self.clean_up_feature_group(fg) + self.clean_up_entity(entity) + self.clean_up_feature_store(fs) \ No newline at end of file diff --git a/tests/integration/feature_store/test_base.py b/tests/integration/feature_store/test_base.py index 1871907b6..6ea257e86 100644 --- a/tests/integration/feature_store/test_base.py +++ b/tests/integration/feature_store/test_base.py @@ -10,6 +10,7 @@ import oci import pandas as pd +from ads.feature_store.entity import Entity from great_expectations.core import ExpectationSuite, ExpectationConfiguration import ads import os @@ -21,8 +22,8 @@ client_kwargs = dict( - retry_strategy=oci.retry.NoneRetryStrategy, - service_endpoint=os.getenv("service_endpoint"), + retry_strategy=oci.retry.NoneRetryStrategy(), + fs_service_endpoint=os.getenv("service_endpoint"), ) ads.set_auth(client_kwargs=client_kwargs) @@ -36,12 +37,16 @@ def transformation_with_kwargs(data_frame, **kwargs): - is_area_enabled = kwargs.get('is_area_enabled') + is_area_enabled = kwargs.get("is_area_enabled") if is_area_enabled: # Calculate petal area and sepal area - data_frame["petal_area"] = data_frame["petal_length"] * data_frame["petal_width"] - data_frame["sepal_area"] = data_frame["sepal_length"] * data_frame["sepal_width"] + data_frame["petal_area"] = ( + data_frame["petal_length"] * data_frame["petal_width"] + ) + data_frame["sepal_area"] = ( + data_frame["sepal_length"] * data_frame["sepal_width"] + ) # Return the updated DataFrame return data_frame @@ -49,7 +54,9 @@ def transformation_with_kwargs(data_frame, **kwargs): class FeatureStoreTestCase: # networks compartment in feature store - TIME_NOW = str.format("{}_{}",datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S"),int(random()*1000)) + TIME_NOW = str.format( + "{}_{}", datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S"), int(random() * 1000) + ) TENANCY_ID = "ocid1.tenancy.oc1..aaaaaaaa462hfhplpx652b32ix62xrdijppq2c7okwcqjlgrbknhgtj2kofa" COMPARTMENT_ID = "ocid1.tenancy.oc1..aaaaaaaa462hfhplpx652b32ix62xrdijppq2c7okwcqjlgrbknhgtj2kofa" METASTORE_ID = "ocid1.datacatalogmetastore.oc1.iad.amaaaaaabiudgxyap7tizm4gscwz7amu7dixz7ml3mtesqzzwwg3urvvdgua" @@ -376,9 +383,11 @@ def create_entity_resource(self, feature_store) -> "Entity": return entity def create_transformation_resource(self, feature_store) -> "Transformation": - transformation = feature_store.create_transformation(source_code_func=transformation_with_kwargs, - display_name="transformation_with_kwargs", - transformation_mode=TransformationMode.PANDAS) + transformation = feature_store.create_transformation( + source_code_func=transformation_with_kwargs, + display_name="transformation_with_kwargs", + transformation_mode=TransformationMode.PANDAS, + ) return transformation def define_feature_group_resource( diff --git a/tests/integration/feature_store/test_dataset_complex.py b/tests/integration/feature_store/test_dataset_complex.py new file mode 100644 index 000000000..315a87ffa --- /dev/null +++ b/tests/integration/feature_store/test_dataset_complex.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*-- + +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + +import pytest +import unittest + +from ads.feature_store.dataset import Dataset + +from ads.feature_store.entity import Entity + +from ads.feature_store.feature_store import FeatureStore + +from ads.feature_store.feature_group import FeatureGroup + +from ads.feature_store.feature_option_details import FeatureOptionDetails +from tests.integration.feature_store.test_base import FeatureStoreTestCase + + +class TestDatasetComplex(FeatureStoreTestCase): + """Contains integration tests for Dataset Delta changes.""" + + @pytest.fixture() + def feature_store(self) -> FeatureStore: + feature_store = self.define_feature_store_resource().create() + yield feature_store + self.clean_up_feature_store(feature_store) + + @pytest.fixture() + def entity(self, feature_store: FeatureStore): + entity = self.create_entity_resource(feature_store) + yield entity + self.clean_up_entity(entity) + + @pytest.fixture() + def feature_group(self, entity, feature_store) -> "FeatureGroup": + feature_group = self.define_feature_group_resource( + entity.oci_fs_entity.id, feature_store.oci_fs.id + ).create() + yield feature_group + self.clean_up_feature_group(feature_group) + + def test_manual_dataset( + self, + feature_store: FeatureStore, + entity: Entity, + feature_group: FeatureGroup, + ): + query = """ + (SELECT + name, games, goals + FROM tblMadrid WHERE name = 'ronaldo') + UNION + (SELECT + name, games, goals + FROM tblBarcelona WHERE name = 'messi') + ORDER BY goals""" + name = self.get_name("fireside_football_debate") + dataset_resource = ( + Dataset() + .with_description("dataset description") + .with_compartment_id(self.COMPARTMENT_ID) + .with_name(name) + .with_entity_id(entity_id=entity.id) + .with_feature_store_id(feature_store_id=feature_store.id) + .with_query(query) + .with_feature_groups([feature_group]) + ).create() + assert len(dataset_resource.feature_groups) == 1 + assert dataset_resource.feature_groups[0].id == feature_group.id + assert dataset_resource.is_manual_association + dataset_resource.delete() + return dataset_resource diff --git a/tests/integration/feature_store/test_dataset_delta.py b/tests/integration/feature_store/test_dataset_delta.py index 5923f69a2..dee95b421 100644 --- a/tests/integration/feature_store/test_dataset_delta.py +++ b/tests/integration/feature_store/test_dataset_delta.py @@ -7,6 +7,8 @@ import pytest import unittest +from ads.feature_store.dataset import Dataset + from ads.feature_store.feature_option_details import FeatureOptionDetails from tests.integration.feature_store.test_base import FeatureStoreTestCase @@ -125,6 +127,9 @@ def test_dataset_materialise_overwrite(self, feature_group, dataset): dataset.materialise(ingestion_mode=IngestionMode.OVERWRITE) df = dataset.preview(row_count=50) + assert ( + dataset.get_spec(Dataset.CONST_FEATURE_GROUP).is_manual_association == False + ) assert df.count() == 14 assert len(df.columns) == 6 diff --git a/tests/integration/feature_store/test_feature_group_dataset_listing.py b/tests/integration/feature_store/test_feature_group_dataset_listing.py new file mode 100644 index 000000000..c7aeddd4d --- /dev/null +++ b/tests/integration/feature_store/test_feature_group_dataset_listing.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*-- + +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + +import pytest +from ads.feature_store.feature_group_job import FeatureGroupJob + +from ads.feature_store.dataset import Dataset +from ads.feature_store.feature_group import FeatureGroup +from tests.integration.feature_store.test_base import FeatureStoreTestCase + + +class TestFeatureGroupDatasetListing(FeatureStoreTestCase): + """Contains integration tests for Feature Group and Dataset Listing.""" + + def define_feature_group_resource_with_default_config( + self, entity_id, feature_store_id + ) -> "FeatureGroup": + feature_group_resource = ( + FeatureGroup() + .with_description("feature group with default stats config") + .with_compartment_id(self.COMPARTMENT_ID) + .with_name(self.get_name("petals3")) + .with_entity_id(entity_id) + .with_feature_store_id(feature_store_id) + .with_primary_keys([]) + .with_input_feature_details(self.INPUT_FEATURE_DETAILS) + ) + return feature_group_resource + + def define_feature_group_resource_with_stats_disabled( + self, entity_id, feature_store_id + ) -> "FeatureGroup": + feature_group_resource = ( + FeatureGroup() + .with_description("feature group with statistics disabled") + .with_compartment_id(self.COMPARTMENT_ID) + .with_name(self.get_name("petals2")) + .with_entity_id(entity_id) + .with_feature_store_id(feature_store_id) + .with_primary_keys([]) + .with_input_feature_details(self.INPUT_FEATURE_DETAILS) + .with_statistics_config(False) + ) + return feature_group_resource + + def define_dataset_resource_with_default_config( + self, entity_id, feature_store_id, feature_group_name + ) -> "Dataset": + name = self.get_name("petals1") + dataset_resource = ( + Dataset() + .with_description("dataset with default statistics configuration") + .with_compartment_id(self.COMPARTMENT_ID) + .with_name(self.get_name("petals_ds_default_stat")) + .with_entity_id(entity_id) + .with_feature_store_id(feature_store_id) + .with_query(f"SELECT * FROM `{entity_id}`.{feature_group_name}") + ) + return dataset_resource + + def define_dataset_resource_with_stats_disabled( + self, entity_id, feature_store_id, feature_group_name + ) -> "Dataset": + name = self.get_name("petals4") + dataset_resource = ( + Dataset() + .with_description("dataset with statistics disabled") + .with_compartment_id(self.COMPARTMENT_ID) + .with_name(self.get_name("petals_ds_stat_disabled")) + .with_entity_id(entity_id) + .with_feature_store_id(feature_store_id) + .with_query(f"SELECT * FROM `{entity_id}`.{feature_group_name}") + .with_statistics_config(False) + ) + return dataset_resource + + def test_feature_group_listing_without_limit(self): + """Tests listing of feature group resources with user defined limit.""" + fs = self.define_feature_store_resource().create() + assert fs.oci_fs.id + + entity = self.create_entity_resource(fs) + assert entity.oci_fs_entity.id + + fg1 = self.define_feature_group_resource_with_default_config( + entity.oci_fs_entity.id, fs.oci_fs.id + ).create() + assert fg1.oci_feature_group.id + fg1.materialise(self.data) + fg1.materialise(self.data2) + + fg1_job_list = FeatureGroupJob.list( + compartment_id=self.COMPARTMENT_ID, feature_group_id=fg1.id + ) + assert fg1_job_list is not None + assert len(fg1_job_list) == 2 + + fg2 = self.define_feature_group_resource_with_stats_disabled( + entity.oci_fs_entity.id, fs.oci_fs.id + ).create() + assert fg2.oci_feature_group.id + fg2.materialise(self.data3) + + fg_list = FeatureGroup.list( + compartment_id=self.COMPARTMENT_ID, feature_store_id=fs.id + ) + assert fg_list is not None + assert len(fg_list) == 2 + + self.clean_up_feature_group(fg1) + self.clean_up_feature_group(fg2) + self.clean_up_entity(entity) + self.clean_up_feature_store(fs) + + def test_feature_group_listing_with_limit(self): + """Tests listing of feature group resources with user defined limit.""" + fs = self.define_feature_store_resource().create() + assert fs.oci_fs.id + + entity = self.create_entity_resource(fs) + assert entity.oci_fs_entity.id + + fg1 = self.define_feature_group_resource_with_default_config( + entity.oci_fs_entity.id, fs.oci_fs.id + ).create() + assert fg1.oci_feature_group.id + fg1.materialise(self.data) + fg1.materialise(self.data2) + + fg1_job_list = FeatureGroupJob.list( + compartment_id=self.COMPARTMENT_ID, + feature_group_id=fg1.id, + sort_by="timeCreated", + limit="1", + ) + assert fg1_job_list is not None + assert len(fg1_job_list) == 1 + + fg2 = self.define_feature_group_resource_with_stats_disabled( + entity.oci_fs_entity.id, fs.oci_fs.id + ).create() + assert fg2.oci_feature_group.id + fg2.materialise(self.data3) + + fg_list = FeatureGroup.list( + compartment_id=self.COMPARTMENT_ID, + sort_by="timeCreated", + limit="1", + ) + assert fg_list is not None + assert len(fg_list) == 1 + + self.clean_up_feature_group(fg1) + self.clean_up_feature_group(fg2) + self.clean_up_entity(entity) + self.clean_up_feature_store(fs) + + def test_dataset_listing_without_limit(self): + """Tests listing of dataset resources without any limit.""" + fs = self.define_feature_store_resource().create() + assert fs.oci_fs.id + + entity = self.create_entity_resource(fs) + assert entity.oci_fs_entity.id + + fg = self.define_feature_group_resource( + entity.oci_fs_entity.id, fs.oci_fs.id + ).create() + assert fg.oci_feature_group.id + + fg.materialise(self.data) + + dataset = self.define_dataset_resource( + entity.oci_fs_entity.id, fs.oci_fs.id, fg.oci_feature_group.name + ).create() + assert dataset.oci_dataset.id + + dataset.materialise() + ds_list = Dataset.list( + compartment_id=self.COMPARTMENT_ID, feature_store_id=fs.id + ) + assert ds_list is not None + assert len(ds_list) == 1 + + self.clean_up_dataset(dataset) + self.clean_up_feature_group(fg) + self.clean_up_entity(entity) + self.clean_up_feature_store(fs) diff --git a/tests/unitary/with_extras/feature_store/test_dataset.py b/tests/unitary/with_extras/feature_store/test_dataset.py index 09b2cb27e..a5586d0ed 100644 --- a/tests/unitary/with_extras/feature_store/test_dataset.py +++ b/tests/unitary/with_extras/feature_store/test_dataset.py @@ -43,6 +43,14 @@ "ingestionMode": "OVERWRITE", } +DATASET_JOB_RESPONSE_PAYLOAD = { + "compartmentId": "ocid1.compartment.oc1.iad.xxx", + "datasetId": "861AA4E9C8E811A79D74C464A01CDF42", + "id": "d40265b7-d66e-49a3-ae26-699012e0df5d", + "ingestionMode": "OVERWRITE", + "lifecycleState": "SUCCEEDED", +} + @pytest.fixture def dataframe_fixture_basic(): @@ -259,12 +267,12 @@ def test__to_oci_fs_entity(self, mock_load_key_file, mock_config_from_file): @patch.object(SparkSessionSingleton, "__init__", return_value=None) @patch.object(SparkSessionSingleton, "get_spark_session") def test_materialise(self, spark, get_spark_session, mock_update): - with patch.object(DatasetJob, "create") as mock_dataset_job: - with patch.object(FeatureStore, "from_id"): - with patch.object(DatasetJob, "_mark_job_complete"): - mock_dataset_job.return_value = self.mock_dsc_dataset_job - self.mock_dsc_dataset.with_id(DATASET_OCID) - self.mock_dsc_dataset.materialise() + with patch.object(DatasetJob, "create") as mock_dataset_job: + with patch.object(FeatureStore, "from_id"): + with patch.object(DatasetJob, "_mark_job_complete"): + mock_dataset_job.return_value = self.mock_dsc_dataset_job + self.mock_dsc_dataset.with_id(DATASET_OCID) + self.mock_dsc_dataset.materialise() @patch.object(SparkSessionSingleton, "__init__", return_value=None) @patch.object(SparkSessionSingleton, "get_spark_session") @@ -306,3 +314,13 @@ def test_restore(self, spark, get_spark_session, feature_store, mock_update): self.mock_dsc_dataset.with_id(DATASET_OCID) self.mock_dsc_dataset.restore(1) mock_execution_strategy.assert_called_once() + + def test_get_last_job(self): + """Tests getting most recent dataset job for a dataset.""" + with patch.object(DatasetJob, "list") as mock_dataset_job: + self.mock_dsc_dataset.with_id(DATASET_OCID) + mock_dataset_job.return_value = [ + DatasetJob.from_dict({"spec": DATASET_JOB_RESPONSE_PAYLOAD}) + ] + ds_job = self.mock_dsc_dataset.get_last_job() + assert ds_job is not None diff --git a/tests/unitary/with_extras/feature_store/test_feature_group.py b/tests/unitary/with_extras/feature_store/test_feature_group.py index e3e78522f..5cff8eb35 100644 --- a/tests/unitary/with_extras/feature_store/test_feature_group.py +++ b/tests/unitary/with_extras/feature_store/test_feature_group.py @@ -20,6 +20,7 @@ from ads.feature_store.feature_store import FeatureStore from ads.feature_store.input_feature_detail import FeatureDetail, FeatureType from ads.feature_store.service.oci_feature_group import OCIFeatureGroup +from ads.feature_store.service.oci_feature_group_job import OCIFeatureGroupJob from ads.feature_store.service.oci_feature_store import OCIFeatureStore from tests.unitary.with_extras.feature_store.test_feature_group_job import ( FEATURE_GROUP_JOB_PAYLOAD, @@ -44,6 +45,14 @@ "isInferSchema": False, } +FEATURE_GROUP_JOB_RESPONSE_PAYLOAD = { + "compartmentId": "ocid1.compartment.oc1.iad.xxx", + "featureGroupId": "861AA4E9C8E811A79D74C464A01CDF42", + "id": "d40265b7-d66e-49a3-ae26-699012e0df5d", + "ingestionMode": "OVERWRITE", + "lifecycleState": "SUCCEEDED", +} + @pytest.fixture(autouse=True) def dataframe_fixture_basic(): @@ -211,14 +220,14 @@ def test_from_id(self, mock_oci_from_id, mock__update_from_oci_fs_model): @patch.object(OCIFeatureGroup, "create") def test_create_success( - self, - mock_oci_dsc_model_create, + self, + mock_oci_dsc_model_create, ): """Tests creating datascience feature_group.""" oci_dsc_model = OCIFeatureGroup(**FEATURE_GROUP_PAYLOAD) mock_oci_dsc_model_create.return_value = oci_dsc_model - # to check rundom display name + # to check random display name self.mock_dsc_feature_group.with_name("") result = self.mock_dsc_feature_group.create() mock_oci_dsc_model_create.assert_called() @@ -281,7 +290,7 @@ def test__to_oci_fs_entity(self, mock_load_key_file, mock_config_from_file): @patch.object(SparkSessionSingleton, "__init__", return_value=None) @patch.object(SparkSessionSingleton, "get_spark_session") def test_materialise( - self, spark_session, get_spark_session, mocke_update, dataframe_fixture_basic + self, spark_session, get_spark_session, mocke_update, dataframe_fixture_basic ): with patch.object(FeatureGroupJob, "create") as mock_feature_group_job: with patch.object(FeatureStore, "from_id"): @@ -324,10 +333,20 @@ def test_history(self, feature_store, spark_session, get_spark_session): @patch.object(SparkSessionSingleton, "get_spark_session") @patch.object(OCIFeatureStore, "from_id") def test_restore( - self, feature_store, spark_session, get_spark_session, mock_update + self, feature_store, spark_session, get_spark_session, mock_update ): with patch.object(SparkEngine, "sql") as mock_execution_strategy: mock_execution_strategy.return_value = None self.mock_dsc_feature_group.with_id(FEATURE_GROUP_OCID) self.mock_dsc_feature_group.restore(1) mock_execution_strategy.assert_called_once() + + def test_get_last_job(self): + """Tests getting most recent feature group job for a feature group.""" + with patch.object(FeatureGroupJob, "list") as mock_feature_group_job: + self.mock_dsc_feature_group.with_id(FEATURE_GROUP_OCID) + mock_feature_group_job.return_value = [ + FeatureGroupJob.from_dict({"spec": FEATURE_GROUP_JOB_RESPONSE_PAYLOAD}) + ] + fg_job = self.mock_dsc_feature_group.get_last_job() + assert fg_job is not None diff --git a/tests/unitary/with_extras/feature_store/test_mixin_endpoints.py b/tests/unitary/with_extras/feature_store/test_mixin_endpoints.py new file mode 100644 index 000000000..e04972259 --- /dev/null +++ b/tests/unitary/with_extras/feature_store/test_mixin_endpoints.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# -*- coding: utf-8; -*- + +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + +import ads +from ads.feature_store.mixin.oci_feature_store import OCIFeatureStoreMixin +from ads.common.auth import AuthType + +TEST_URL_1 = "https://test1.com" +TEST_URL_2 = "https://test2.com" +TEST_URL_3 = "https://test3.com" +TEST_URL_4 = "https://test4.com" + + +def test_global_service_endpoint(): + ads.set_auth(auth=AuthType.API_KEY, client_kwargs={"service_endpoint": TEST_URL_1}) + client = OCIFeatureStoreMixin.init_client() + assert client.base_client.endpoint == f"{TEST_URL_1}/20230101" + + +def test_global_service_and_fs_endpoints(): + ads.set_auth( + auth=AuthType.API_KEY, + client_kwargs={ + "fs_service_endpoint": TEST_URL_1, + "service_endpoint": TEST_URL_2, + }, + ) + client = OCIFeatureStoreMixin.init_client() + assert client.base_client.endpoint == f"{TEST_URL_1}/20230101" + + +def test_override_service_endpoint(): + ads.set_auth(auth=AuthType.API_KEY) + client = OCIFeatureStoreMixin.init_client(service_endpoint=TEST_URL_1) + assert client.base_client.endpoint == f"{TEST_URL_1}/20230101" + + +def test_override_service_and_fs_endpoints(): + ads.set_auth(auth=AuthType.API_KEY) + client = OCIFeatureStoreMixin.init_client( + service_endpoint=TEST_URL_1, fs_service_endpoint=TEST_URL_2 + ) + assert client.base_client.endpoint == f"{TEST_URL_2}/20230101" + + +def test_override_service_and_fs_endpoints_with_global_service_and_fs_endpoints(): + ads.set_auth( + auth=AuthType.API_KEY, + client_kwargs={ + "fs_service_endpoint": TEST_URL_3, + "service_endpoint": TEST_URL_4, + }, + ) + client = OCIFeatureStoreMixin.init_client( + service_endpoint=TEST_URL_1, fs_service_endpoint=TEST_URL_2 + ) + assert client.base_client.endpoint == f"{TEST_URL_2}/20230101"