Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1574] Transformation functions bug fixes #412

Merged
merged 4 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>4.1.0</version>
<version>4.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion java/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>4.1.0</version>
<version>4.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion java/hsfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>4.1.0</version>
<version>4.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.logicalclocks</groupId>
<artifactId>hsfs-parent</artifactId>
<packaging>pom</packaging>
<version>4.1.0</version>
<version>4.1.1</version>
<modules>
<module>hsfs</module>
<module>spark</module>
Expand Down
2 changes: 1 addition & 1 deletion java/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>4.1.0</version>
<version>4.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion python/hopsworks_common/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# limitations under the License.
#

__version__ = "4.1.0"
__version__ = "4.1.1"
43 changes: 38 additions & 5 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import annotations

import warnings
from typing import List
from typing import List, Union

from hsfs import engine, feature, util
from hsfs import feature_group as fg
Expand Down Expand Up @@ -67,7 +67,7 @@ def _update_feature_group_schema_on_demand_transformations(

def save(
self,
feature_group,
feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup],
feature_dataframe,
write_options,
validation_options: dict = None,
Expand All @@ -80,6 +80,21 @@ def save(
feature_group=feature_group, features=dataframe_features
)
)

# Currently on-demand transformation functions not supported in external feature groups.
if feature_group.transformation_functions:
if not isinstance(feature_group, fg.ExternalFeatureGroup):
feature_dataframe = (
engine.get_instance()._apply_transformation_function(
feature_group.transformation_functions, feature_dataframe
)
)
else:
warnings.warn(
"On-Demand features were not created because On-Demand Transformations are not supported for External Feature Groups.",
stacklevel=1,
)

util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
)
Expand Down Expand Up @@ -119,7 +134,7 @@ def save(

def insert(
self,
feature_group,
feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup],
feature_dataframe,
overwrite,
operation,
Expand All @@ -132,6 +147,16 @@ def insert(
feature_group.time_travel_format,
features=feature_group.features,
)

# Currently on-demand transformation functions not supported in external feature groups.
if (
not isinstance(feature_group, fg.ExternalFeatureGroup)
and feature_group.transformation_functions
):
feature_dataframe = engine.get_instance()._apply_transformation_function(
feature_group.transformation_functions, feature_dataframe
)

dataframe_features = (
self._update_feature_group_schema_on_demand_transformations(
feature_group=feature_group, features=dataframe_features
Expand Down Expand Up @@ -299,7 +324,9 @@ def append_features(self, feature_group, new_features):
if feature_group.time_travel_format == "DELTA":
engine.get_instance().add_cols_to_delta_table(feature_group, new_features)
else:
engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features)
engine.get_instance().save_empty_dataframe(
feature_group, new_features=new_features
)

def update_description(self, feature_group, description):
"""Updates the description of a feature group."""
Expand All @@ -326,7 +353,7 @@ def update_deprecated(self, feature_group, deprecate):

def insert_stream(
self,
feature_group,
feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup],
dataframe,
query_name,
output_mode,
Expand All @@ -349,6 +376,12 @@ def insert_stream(
feature_group=feature_group, features=dataframe_features
)
)

if feature_group.transformation_functions:
dataframe = engine.get_instance()._apply_transformation_function(
feature_group.transformation_functions, dataframe
)

util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
)
Expand Down
9 changes: 9 additions & 0 deletions python/hsfs/core/vector_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,15 @@ def identify_missing_features_pre_fetch(
passed_feature_names = passed_feature_names.union(
vector_db_features.keys()
)
if self._on_demand_feature_names and len(self._on_demand_feature_names) > 0:
# Remove on-demand features from validation check as they would be computed.
_logger.debug(
"Appending on_demand_feature_names : %s, to passed_feature_names for pre-fetch missing",
self._on_demand_feature_names,
)
passed_feature_names = passed_feature_names.union(
self._on_demand_feature_names
)
neither_fetched_nor_passed = fetched_features.difference(
passed_feature_names
)
Expand Down
25 changes: 13 additions & 12 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,15 +808,6 @@ def save_dataframe(
online_write_options: Dict[str, Any],
validation_id: Optional[int] = None,
) -> Optional[job.Job]:
# Currently on-demand transformation functions not supported in external feature groups.
if (
not isinstance(feature_group, ExternalFeatureGroup)
and feature_group.transformation_functions
):
dataframe = self._apply_transformation_function(
feature_group.transformation_functions, dataframe
)

if (
hasattr(feature_group, "EXTERNAL_FEATURE_GROUP")
and feature_group.online_enabled
Expand Down Expand Up @@ -1298,9 +1289,19 @@ def _apply_transformation_function(
dataset.columns
)
if missing_features:
raise FeatureStoreException(
f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly."
)
if (
tf.transformation_type
== transformation_function.TransformationType.ON_DEMAND
):
# On-demand transformation are applied using the python/spark engine during insertion, the transformation while retrieving feature vectors are performed in the vector_server.
raise FeatureStoreException(
f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the on-demand transformation function '{hopsworks_udf.function_name}' are not present in the dataframe being inserted into the feature group. "
+ "Please verify that the correct feature names are used in the transformation function and that these features exist in the dataframe being inserted."
)
else:
raise FeatureStoreException(
f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the model-dependent transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please verify that the correct features are specified in the transformation function."
)
if tf.hopsworks_udf.dropped_features:
dropped_features.update(tf.hopsworks_udf.dropped_features)

Expand Down
60 changes: 32 additions & 28 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,14 +415,6 @@ def save_dataframe(
validation_id=None,
):
try:
# Currently on-demand transformation functions not supported in external feature groups.
if (
not isinstance(feature_group, fg_mod.ExternalFeatureGroup)
and feature_group.transformation_functions
):
dataframe = self._apply_transformation_function(
feature_group.transformation_functions, dataframe
)
if (
isinstance(feature_group, fg_mod.ExternalFeatureGroup)
and feature_group.online_enabled
Expand Down Expand Up @@ -467,11 +459,6 @@ def save_stream_dataframe(
checkpoint_dir: Optional[str],
write_options: Optional[Dict[str, Any]],
):
if feature_group.transformation_functions:
dataframe = self._apply_transformation_function(
feature_group.transformation_functions, dataframe
)

write_options = kafka_engine.get_kafka_config(
feature_group.feature_store_id, write_options, engine="spark"
)
Expand Down Expand Up @@ -1314,13 +1301,16 @@ def save_empty_dataframe(self, feature_group, new_features=None):

dataframe = self._spark_session.read.format("hudi").load(location)

if (new_features is not None):
if new_features is not None:
if isinstance(new_features, list):
for new_feature in new_features:
dataframe = dataframe.withColumn(new_feature.name, lit(None).cast(new_feature.type))
dataframe = dataframe.withColumn(
new_feature.name, lit(None).cast(new_feature.type)
)
else:
dataframe = dataframe.withColumn(new_features.name, lit(None).cast(new_features.type))

dataframe = dataframe.withColumn(
new_features.name, lit(None).cast(new_features.type)
)

self.save_dataframe(
feature_group,
Expand All @@ -1337,18 +1327,22 @@ def add_cols_to_delta_table(self, feature_group, new_features):

dataframe = self._spark_session.read.format("delta").load(location)

if (new_features is not None):
if new_features is not None:
if isinstance(new_features, list):
for new_feature in new_features:
dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type))
dataframe = dataframe.withColumn(
new_feature.name, lit("").cast(new_feature.type)
)
else:
dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type))
dataframe = dataframe.withColumn(
new_features.name, lit("").cast(new_features.type)
)

dataframe.limit(0).write.format("delta").mode(
"append"
).option("mergeSchema", "true").option(
"spark.databricks.delta.schema.autoMerge.enabled", "true"
).save(location)
dataframe.limit(0).write.format("delta").mode("append").option(
"mergeSchema", "true"
).option("spark.databricks.delta.schema.autoMerge.enabled", "true").save(
location
)

def _apply_transformation_function(
self,
Expand Down Expand Up @@ -1378,9 +1372,19 @@ def _apply_transformation_function(
)

if missing_features:
raise FeatureStoreException(
f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly."
)
if (
tf.transformation_type
== transformation_function.TransformationType.ON_DEMAND
):
# On-demand transformation are applied using the python/spark engine during insertion, the transformation while retrieving feature vectors are performed in the vector_server.
raise FeatureStoreException(
f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the on-demand transformation function '{hopsworks_udf.function_name}' are not present in the dataframe being inserted into the feature group. "
+ "Please verify that the correct feature names are used in the transformation function and that these features exist in the dataframe being inserted."
)
else:
raise FeatureStoreException(
f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the model-dependent transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please verify that the correct features are specified in the transformation function."
)
if tf.hopsworks_udf.dropped_features:
dropped_features.update(hopsworks_udf.dropped_features)

Expand Down
Loading
Loading