Skip to content

Commit

Permalink
adopting changes in backend for UI
Browse files Browse the repository at this point in the history
  • Loading branch information
manu-sj committed Jul 8, 2024
1 parent 99001d2 commit c71af3b
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 78 deletions.
7 changes: 4 additions & 3 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,8 @@ def parse_schema_feature_group(
on_demand=True,
)
)
dropped_features.extend(tf.hopsworks_udf.dropped_features)
if tf.hopsworks_udf.dropped_features:
dropped_features.extend(tf.hopsworks_udf.dropped_features)
for feat_name in arrow_schema.names:
name = util.autofix_feature_name(feat_name)
try:
Expand Down Expand Up @@ -1364,8 +1365,8 @@ def _apply_transformation_function(
raise FeatureStoreException(
f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly."
)

dropped_features.update(tf.hopsworks_udf.dropped_features)
if tf.hopsworks_udf.dropped_features:
dropped_features.update(tf.hopsworks_udf.dropped_features)
dataset = pd.concat(
[
dataset,
Expand Down
7 changes: 4 additions & 3 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,8 @@ def parse_schema_feature_group(
on_demand=True,
)
)
dropped_features.extend(tf.hopsworks_udf.dropped_features)
if tf.hopsworks_udf.dropped_features:
dropped_features.extend(tf.hopsworks_udf.dropped_features)

using_hudi = time_travel_format == "HUDI"
for feat in dataframe.schema:
Expand Down Expand Up @@ -1290,8 +1291,8 @@ def _apply_transformation_function(
raise FeatureStoreException(
f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly."
)

dropped_features.update(tf.hopsworks_udf.dropped_features)
if tf.hopsworks_udf.dropped_features:
dropped_features.update(tf.hopsworks_udf.dropped_features)

pandas_udf = hopsworks_udf.get_udf()
output_col_name = hopsworks_udf.output_column_names[0]
Expand Down
34 changes: 19 additions & 15 deletions python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2135,21 +2135,25 @@ def __init__(
self._writer: Optional[callable] = None

# On-Demand Transformation Functions
self._transformation_functions: List[TransformationFunction] = (
[
TransformationFunction(
featurestore_id,
hopsworks_udf=transformation_function,
version=1,
transformation_type=UDFType.ON_DEMAND,
)
if not isinstance(transformation_function, TransformationFunction)
else transformation_function
for transformation_function in transformation_functions
]
if transformation_functions
else []
)
self._transformation_functions: List[TransformationFunction] = []

if transformation_functions:
for transformation_function in transformation_functions:
if not isinstance(transformation_function, TransformationFunction):
self._transformation_functions.append(
TransformationFunction(
featurestore_id,
hopsworks_udf=transformation_function,
version=1,
transformation_type=UDFType.ON_DEMAND,
)
)
else:
if not transformation_function.hopsworks_udf.udf_type:
transformation_function.hopsworks_udf.udf_type = (
UDFType.ON_DEMAND
)
self._transformation_functions.append(transformation_function)

if self._transformation_functions:
self._transformation_functions = (
Expand Down
34 changes: 19 additions & 15 deletions python/hsfs/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,25 @@ def __init__(
training_helper_columns if training_helper_columns else []
)

self._transformation_functions: List[TransformationFunction] = (
[
TransformationFunction(
self.featurestore_id,
hopsworks_udf=transformation_function,
version=1,
transformation_type=UDFType.MODEL_DEPENDENT,
)
if not isinstance(transformation_function, TransformationFunction)
else transformation_function
for transformation_function in transformation_functions
]
if transformation_functions
else []
)
self._transformation_functions: List[TransformationFunction] = []

if transformation_functions:
for transformation_function in transformation_functions:
if not isinstance(transformation_function, TransformationFunction):
self._transformation_functions.append(
TransformationFunction(
self.featurestore_id,
hopsworks_udf=transformation_function,
version=1,
transformation_type=UDFType.MODEL_DEPENDENT,
)
)
else:
if not transformation_function.hopsworks_udf.udf_type:
transformation_function.hopsworks_udf.udf_type = (
UDFType.MODEL_DEPENDENT
)
self._transformation_functions.append(transformation_function)

if self._transformation_functions:
self._transformation_functions = FeatureView._sort_transformation_functions(
Expand Down
85 changes: 59 additions & 26 deletions python/hsfs/hopsworks_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ def add_one(data1 : pd.Series):
"""

def wrapper(func: Callable) -> HopsworksUdf:
udf = HopsworksUdf(func=func, return_types=return_type, dropped_features=drop)
udf = HopsworksUdf(
func=func, return_types=return_type, dropped_argument_names=drop
)
return udf

return wrapper
Expand Down Expand Up @@ -143,7 +145,11 @@ def __init__(
return_types: Union[List[type], type, List[str], str],
name: Optional[str] = None,
transformation_features: Optional[List[TransformationFeature]] = None,
dropped_features: Optional[List[str]] = None,
transformation_function_argument_names: Optional[
List[TransformationFeature]
] = None,
dropped_argument_names: Optional[List[str]] = None,
dropped_feature_names: Optional[List[str]] = None,
feature_name_prefix: Optional[str] = None,
):
self._return_types: List[str] = HopsworksUdf._validate_and_convert_output_types(
Expand All @@ -162,24 +168,41 @@ def __init__(
else func
)
if not transformation_features:
# New transformation function being declared so extract source code from function
self._transformation_features: List[TransformationFeature] = (
HopsworksUdf._extract_function_arguments(func)
if not transformation_features
else transformation_features
)

self._transformation_function_argument_names = [
feature.feature_name for feature in self._transformation_features
]

self._dropped_argument_names: List[str] = (
HopsworksUdf._validate_and_convert_drop_features(
dropped_argument_names,
self.transformation_features,
feature_name_prefix,
)
)
self._dropped_features = self._dropped_argument_names
else:
self._transformation_features = transformation_features
self._transformation_function_argument_names = (
transformation_function_argument_names
)
self._dropped_argument_names = dropped_argument_names
self._dropped_features = (
dropped_feature_names
if dropped_feature_names
else dropped_argument_names
)

self._formatted_function_source, self._module_imports = (
HopsworksUdf._format_source_code(self._function_source)
)

self._dropped_features: List[str] = (
HopsworksUdf._validate_and_convert_drop_features(
dropped_features, self.transformation_features, feature_name_prefix
)
)

self._statistics: Optional[TransformationStatistics] = None

self._udf_type: UDFType = None
Expand All @@ -201,7 +224,7 @@ def _validate_and_convert_drop_features(
`List[str]`: A list of features to be dropped.
"""
if not dropped_features:
return []
return None

dropped_features = (
[dropped_features]
Expand Down Expand Up @@ -554,11 +577,16 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf":
f'Feature names provided must be string "{arg}" is not string'
)
transformation_feature_name = self.transformation_features
index_dropped_features = [
transformation_feature_name.index(dropped_feature)
for dropped_feature in self.dropped_features
]
updated_dropped_features = [features[index] for index in index_dropped_features]
if self.dropped_features:
index_dropped_features = [
transformation_feature_name.index(dropped_feature)
for dropped_feature in self.dropped_features
]
updated_dropped_features = [
features[index] for index in index_dropped_features
]
else:
updated_dropped_features = None

# Create a copy of the UDF to associate it with new feature names.
udf = copy.deepcopy(self)
Expand Down Expand Up @@ -601,6 +629,8 @@ def get_udf(self, force_python_udf: bool = False) -> Callable:
# Returns
`Callable`: Pandas UDF in the spark engine otherwise returns a python function for the UDF.
"""
if self.udf_type is None:
raise FeatureStoreException("UDF Type cannot be None")

if engine.get_type() in ["hive", "python", "training"] or force_python_udf:
return self.hopsworksUdf_wrapper()
Expand All @@ -623,7 +653,8 @@ def to_dict(self) -> Dict[str, Any]:
"sourceCode": self._function_source,
"outputTypes": self.return_types,
"transformationFeatures": self.transformation_features,
"droppedFeatures": self.dropped_features,
"transformationFunctionArgumentNames": self._transformation_function_argument_names,
"droppedArgumentNames": self._dropped_argument_names,
"statisticsArgumentNames": self._statistics_argument_names
if self.statistics_required
else None,
Expand Down Expand Up @@ -663,12 +694,12 @@ def from_response_json(
transformation_features = [
feature.strip() for feature in json_decamelized["transformation_features"]
]
dropped_features = (
dropped_argument_names = (
[
dropped_feature.strip()
for dropped_feature in json_decamelized["dropped_features"]
for dropped_feature in json_decamelized["dropped_argument_names"]
]
if "dropped_features" in json_decamelized
if "dropped_argument_names" in json_decamelized
else None
)
statistics_features = (
Expand All @@ -687,11 +718,14 @@ def from_response_json(
arg_list if not transformation_features else transformation_features
)

if dropped_features:
dropped_features = [
transformation_features[arg_list.index(dropped_feature)]
for dropped_feature in dropped_features
dropped_feature_names = (
[
transformation_features[arg_list.index(dropped_argument_name)]
for dropped_argument_name in dropped_argument_names
]
if dropped_argument_names
else None
)

if statistics_features:
transformation_features = [
Expand All @@ -714,7 +748,8 @@ def from_response_json(
return_types=output_types,
name=function_name,
transformation_features=transformation_features,
dropped_features=dropped_features,
dropped_argument_names=dropped_argument_names,
dropped_feature_names=dropped_feature_names,
feature_name_prefix=feature_name_prefix,
)

Expand All @@ -728,8 +763,6 @@ def _validate_udf_type(self):
# Raises
`hsfs.client.exceptions.FeatureStoreException` : If the UDF Type is None or if statistics or multiple columns has been output by a on-demand transformation function
"""
if self.udf_type is None:
raise FeatureStoreException("UDF Type cannot be None")

if self._udf_type == UDFType.ON_DEMAND:
if len(self.return_types) > 1:
Expand Down Expand Up @@ -843,7 +876,7 @@ def dropped_features(self) -> List[str]:
"""
List of features that will be dropped after the UDF is applied.
"""
if self._feature_name_prefix:
if self._feature_name_prefix and self._dropped_features:
return [
self._feature_name_prefix + dropped_feature
for dropped_feature in self._dropped_features
Expand Down
4 changes: 3 additions & 1 deletion python/hsfs/transformation_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,5 +241,7 @@ def __repr__(self):
return (
f"Model-Dependent Transformation Function : {repr(self.hopsworks_udf)}"
)
else:
elif self.hopsworks_udf._udf_type == UDFType.ON_DEMAND:
return f"On-Demand Transformation Function : {repr(self.hopsworks_udf)}"
else:
return f"Transformation Function : {repr(self.hopsworks_udf)}"
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def test_apply_builtin_minmax_from_backend(self, mocker):
"transformationFeatures": [],
"statisticsArgumentNames": ["feature"],
"name": "min_max_scaler",
"droppedFeatures": ["feature"],
"droppedArgumentNames": ["feature"],
}

tf_fun = HopsworksUdf.from_response_json(udf_response)
Expand Down Expand Up @@ -304,7 +304,7 @@ def test_apply_builtin_standard_scaler_from_backend(self, mocker):
"transformationFeatures": [],
"statisticsArgumentNames": ["feature"],
"name": "standard_scaler",
"droppedFeatures": ["feature"],
"droppedArgumentNames": ["feature"],
}

tf_fun = HopsworksUdf.from_response_json(udf_response)
Expand Down Expand Up @@ -451,7 +451,7 @@ def test_apply_builtin_robust_scaler_from_backend(self, mocker):
"transformationFeatures": [],
"statisticsArgumentNames": ["feature"],
"name": "robust_scaler",
"droppedFeatures": ["feature"],
"droppedArgumentNames": ["feature"],
}

tf_fun = HopsworksUdf.from_response_json(udf_response)
Expand Down
4 changes: 2 additions & 2 deletions python/tests/fixtures/feature_group_fixtures.json
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@
"name": "add_two",
"outputTypes":["double"],
"transformationFeatures":["data"],
"dropped_features":["data1"]
"dropped_argument_names":["data1"]
}
},
{
Expand All @@ -707,7 +707,7 @@
"name": "add_one_fs",
"outputTypes":["double"],
"transformationFeatures":["col1"],
"dropped_features":["data1"]
"dropped_argument_names":["data1"]
}
}
],
Expand Down
4 changes: 2 additions & 2 deletions python/tests/fixtures/feature_view_fixtures.json
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@
"outputTypes":["double"],
"transformationFeatures":["data"],
"statisticsArgumentNames":["data1"],
"dropped_features":["data1"]
"dropped_argument_names":["data1"]
}
},
{
Expand All @@ -947,7 +947,7 @@
"name": "add_one_fs",
"outputTypes":["double"],
"transformationFeatures":["col1"],
"dropped_features":["data1"]
"dropped_argument_names":["data1"]
}
}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
"name": "add_one_fs",
"outputTypes":["double"],
"transformationFeatures":["col1"],
"dropped_features":["data1"]
"dropped_argument_names":["data1"]
}
},
"featuregroup": {
Expand Down
Loading

0 comments on commit c71af3b

Please sign in to comment.