Skip to content

Commit

Permalink
Fix circular dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
aversey committed Jul 8, 2024
1 parent 28c0fd4 commit 92c4b98
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 26 deletions.
6 changes: 3 additions & 3 deletions python/hsfs/core/feature_store_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

from typing import Union

import hsfs.feature_store
from hsfs import client
from hsfs.feature_store import FeatureStore


class FeatureStoreApi:
def get(self, identifier: Union[int, str]) -> FeatureStore:
def get(self, identifier: Union[int, str]) -> hsfs.feature_store.FeatureStore:
"""Get feature store with specific id or name.
:param identifier: id or name of the feature store
Expand All @@ -32,6 +32,6 @@ def get(self, identifier: Union[int, str]) -> FeatureStore:
"""
_client = client.get_instance()
path_params = ["project", _client._project_id, "featurestores", identifier]
return FeatureStore.from_response_json(
return hsfs.feature_store.FeatureStore.from_response_json(
_client._send_request("GET", path_params)
)
65 changes: 42 additions & 23 deletions python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import avro.schema
import confluent_kafka
import hsfs.expectation_suite
import humps
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -89,7 +90,6 @@
# if great_expectations is not installed, we will default to using native Hopsworks class as return values
from hsfs.decorators import typechecked, uses_great_expectations
from hsfs.embedding import EmbeddingIndex
from hsfs.expectation_suite import ExpectationSuite
from hsfs.ge_validation_result import ValidationResult
from hsfs.statistics import Statistics
from hsfs.statistics_config import StatisticsConfig
Expand Down Expand Up @@ -117,7 +117,7 @@ def __init__(
embedding_index: Optional[EmbeddingIndex] = None,
expectation_suite: Optional[
Union[
ExpectationSuite,
hsfs.expectation_suite.ExpectationSuite,
great_expectations.core.ExpectationSuite,
Dict[str, Any],
]
Expand Down Expand Up @@ -910,7 +910,11 @@ def append_features(

def get_expectation_suite(
self, ge_type: bool = HAS_GREAT_EXPECTATIONS
) -> Union[ExpectationSuite, great_expectations.core.ExpectationSuite, None]:
) -> Union[
hsfs.expectation_suite.ExpectationSuite,
great_expectations.core.ExpectationSuite,
None,
]:
"""Return the expectation suite attached to the feature group if it exists.
!!! example
Expand Down Expand Up @@ -948,12 +952,16 @@ def get_expectation_suite(
def save_expectation_suite(
self,
expectation_suite: Union[
ExpectationSuite, great_expectations.core.ExpectationSuite
hsfs.expectation_suite.ExpectationSuite,
great_expectations.core.ExpectationSuite,
],
run_validation: bool = True,
validation_ingestion_policy: Literal["always", "strict"] = "always",
overwrite: bool = False,
) -> Union[ExpectationSuite, great_expectations.core.ExpectationSuite]:
) -> Union[
hsfs.expectation_suite.ExpectationSuite,
great_expectations.core.ExpectationSuite,
]:
"""Attach an expectation suite to a feature group and saves it for future use. If an expectation
suite is already attached, it is replaced. Note that the provided expectation suite is modified
inplace to include expectationId fields.
Expand Down Expand Up @@ -984,18 +992,22 @@ def save_expectation_suite(
if HAS_GREAT_EXPECTATIONS and isinstance(
expectation_suite, great_expectations.core.ExpectationSuite
):
tmp_expectation_suite = ExpectationSuite.from_ge_type(
ge_expectation_suite=expectation_suite,
run_validation=run_validation,
validation_ingestion_policy=validation_ingestion_policy,
feature_store_id=self._feature_store_id,
feature_group_id=self._id,
tmp_expectation_suite = (
hsfs.expectation_suite.ExpectationSuite.from_ge_type(
ge_expectation_suite=expectation_suite,
run_validation=run_validation,
validation_ingestion_policy=validation_ingestion_policy,
feature_store_id=self._feature_store_id,
feature_group_id=self._id,
)
)
elif isinstance(expectation_suite, ExpectationSuite):
elif isinstance(expectation_suite, hsfs.expectation_suite.ExpectationSuite):
tmp_expectation_suite = expectation_suite.to_json_dict(decamelize=True)
tmp_expectation_suite["feature_group_id"] = self._id
tmp_expectation_suite["feature_store_id"] = self._feature_store_id
tmp_expectation_suite = ExpectationSuite(**tmp_expectation_suite)
tmp_expectation_suite = hsfs.expectation_suite.ExpectationSuite(
**tmp_expectation_suite
)
else:
raise TypeError(
"The provided expectation suite type `{}` is not supported. Use Great Expectation `ExpectationSuite` or HSFS' own `ExpectationSuite` object.".format(
Expand Down Expand Up @@ -1242,7 +1254,7 @@ def validate(
dataframe: Optional[
Union[pd.DataFrame, TypeVar("pyspark.sql.DataFrame")] # noqa: F821
] = None,
expectation_suite: Optional[ExpectationSuite] = None,
expectation_suite: Optional[hsfs.expectation_suite.ExpectationSuite] = None,
save_report: Optional[bool] = False,
validation_options: Optional[Dict[str, Any]] = None,
ingestion_result: Literal[
Expand Down Expand Up @@ -1858,7 +1870,7 @@ def location(self) -> Optional[str]:
@property
def expectation_suite(
self,
) -> Optional[ExpectationSuite]:
) -> Optional[hsfs.expectation_suite.ExpectationSuite]:
"""Expectation Suite configuration object defining the settings for
data validation of the feature group."""
return self._expectation_suite
Expand All @@ -1867,22 +1879,24 @@ def expectation_suite(
def expectation_suite(
self,
expectation_suite: Union[
ExpectationSuite,
hsfs.expectation_suite.ExpectationSuite,
great_expectations.core.ExpectationSuite,
Dict[str, Any],
None,
],
) -> None:
if isinstance(expectation_suite, ExpectationSuite):
if isinstance(expectation_suite, hsfs.expectation_suite.ExpectationSuite):
tmp_expectation_suite = expectation_suite.to_json_dict(decamelize=True)
tmp_expectation_suite["feature_group_id"] = self._id
tmp_expectation_suite["feature_store_id"] = self._feature_store_id
self._expectation_suite = ExpectationSuite(**tmp_expectation_suite)
self._expectation_suite = hsfs.expectation_suite.ExpectationSuite(
**tmp_expectation_suite
)
elif HAS_GREAT_EXPECTATIONS and isinstance(
expectation_suite,
great_expectations.core.expectation_suite.ExpectationSuite,
):
self._expectation_suite = ExpectationSuite(
self._expectation_suite = hsfs.expectation_suite.ExpectationSuite(
**expectation_suite.to_json_dict(),
feature_store_id=self._feature_store_id,
feature_group_id=self._id,
Expand All @@ -1891,7 +1905,9 @@ def expectation_suite(
tmp_expectation_suite = expectation_suite.copy()
tmp_expectation_suite["feature_store_id"] = self._feature_store_id
tmp_expectation_suite["feature_group_id"] = self._id
self._expectation_suite = ExpectationSuite(**tmp_expectation_suite)
self._expectation_suite = hsfs.expectation_suite.ExpectationSuite(
**tmp_expectation_suite
)
elif expectation_suite is None:
self._expectation_suite = None
else:
Expand Down Expand Up @@ -2076,7 +2092,7 @@ def __init__(
expectation_suite: Optional[
Union[
great_expectations.core.ExpectationSuite,
ExpectationSuite,
hsfs.expectation_suite.ExpectationSuite,
Dict[str, Any],
]
] = None,
Expand Down Expand Up @@ -3497,7 +3513,7 @@ def __init__(
event_time: Optional[str] = None,
expectation_suite: Optional[
Union[
ExpectationSuite,
hsfs.expectation_suite.ExpectationSuite,
great_expectations.core.ExpectationSuite,
Dict[str, Any],
]
Expand Down Expand Up @@ -4037,7 +4053,10 @@ def __init__(
statistics_config: Optional[StatisticsConfig] = None,
event_time: Optional[str] = None,
expectation_suite: Optional[
Union[ExpectationSuite, great_expectations.core.ExpectationSuite]
Union[
hsfs.expectation_suite.ExpectationSuite,
great_expectations.core.ExpectationSuite,
]
] = None,
online_enabled: bool = False,
href: Optional[str] = None,
Expand Down

0 comments on commit 92c4b98

Please sign in to comment.