From c71af3bb16db654876a54da7530a448b3c07dc20 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Mon, 8 Jul 2024 11:09:40 +0200 Subject: [PATCH] adopting changes in backend for UI --- python/hsfs/engine/python.py | 7 +- python/hsfs/engine/spark.py | 7 +- python/hsfs/feature_group.py | 34 ++++---- python/hsfs/feature_view.py | 34 ++++---- python/hsfs/hopsworks_udf.py | 85 +++++++++++++------ python/hsfs/transformation_function.py | 4 +- ...t_python_spark_transformation_functions.py | 6 +- .../fixtures/feature_group_fixtures.json | 4 +- .../tests/fixtures/feature_view_fixtures.json | 4 +- .../training_dataset_feature_fixtures.json | 2 +- .../transformation_function_fixtures.json | 14 +-- 11 files changed, 123 insertions(+), 78 deletions(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 9c2a4ca279..b0efd7be0e 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -827,7 +827,8 @@ def parse_schema_feature_group( on_demand=True, ) ) - dropped_features.extend(tf.hopsworks_udf.dropped_features) + if tf.hopsworks_udf.dropped_features: + dropped_features.extend(tf.hopsworks_udf.dropped_features) for feat_name in arrow_schema.names: name = util.autofix_feature_name(feat_name) try: @@ -1364,8 +1365,8 @@ def _apply_transformation_function( raise FeatureStoreException( f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly." ) - - dropped_features.update(tf.hopsworks_udf.dropped_features) + if tf.hopsworks_udf.dropped_features: + dropped_features.update(tf.hopsworks_udf.dropped_features) dataset = pd.concat( [ dataset, diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 60f5f14854..322e9e993a 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1145,7 +1145,8 @@ def parse_schema_feature_group( on_demand=True, ) ) - dropped_features.extend(tf.hopsworks_udf.dropped_features) + if tf.hopsworks_udf.dropped_features: + dropped_features.extend(tf.hopsworks_udf.dropped_features) using_hudi = time_travel_format == "HUDI" for feat in dataframe.schema: @@ -1290,8 +1291,8 @@ def _apply_transformation_function( raise FeatureStoreException( f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly." ) - - dropped_features.update(tf.hopsworks_udf.dropped_features) + if tf.hopsworks_udf.dropped_features: + dropped_features.update(tf.hopsworks_udf.dropped_features) pandas_udf = hopsworks_udf.get_udf() output_col_name = hopsworks_udf.output_column_names[0] diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 0bbeb26552..8240f115e9 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2135,21 +2135,25 @@ def __init__( self._writer: Optional[callable] = None # On-Demand Transformation Functions - self._transformation_functions: List[TransformationFunction] = ( - [ - TransformationFunction( - featurestore_id, - hopsworks_udf=transformation_function, - version=1, - transformation_type=UDFType.ON_DEMAND, - ) - if not isinstance(transformation_function, TransformationFunction) - else transformation_function - for transformation_function in transformation_functions - ] - if transformation_functions - else [] - ) + self._transformation_functions: List[TransformationFunction] = [] + + if transformation_functions: + for transformation_function in transformation_functions: + if not isinstance(transformation_function, TransformationFunction): + self._transformation_functions.append( + TransformationFunction( + featurestore_id, + hopsworks_udf=transformation_function, + version=1, + transformation_type=UDFType.ON_DEMAND, + ) + ) + else: + if not transformation_function.hopsworks_udf.udf_type: + transformation_function.hopsworks_udf.udf_type = ( + UDFType.ON_DEMAND + ) + self._transformation_functions.append(transformation_function) if self._transformation_functions: self._transformation_functions = ( diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 0045ecd713..fc9151ae94 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -120,21 +120,25 @@ def __init__( training_helper_columns if training_helper_columns else [] ) - self._transformation_functions: List[TransformationFunction] = ( - [ - TransformationFunction( - self.featurestore_id, - hopsworks_udf=transformation_function, - version=1, - transformation_type=UDFType.MODEL_DEPENDENT, - ) - if not isinstance(transformation_function, TransformationFunction) - else transformation_function - for transformation_function in transformation_functions - ] - if transformation_functions - else [] - ) + self._transformation_functions: List[TransformationFunction] = [] + + if transformation_functions: + for transformation_function in transformation_functions: + if not isinstance(transformation_function, TransformationFunction): + self._transformation_functions.append( + TransformationFunction( + self.featurestore_id, + hopsworks_udf=transformation_function, + version=1, + transformation_type=UDFType.MODEL_DEPENDENT, + ) + ) + else: + if not transformation_function.hopsworks_udf.udf_type: + transformation_function.hopsworks_udf.udf_type = ( + UDFType.MODEL_DEPENDENT + ) + self._transformation_functions.append(transformation_function) if self._transformation_functions: self._transformation_functions = FeatureView._sort_transformation_functions( diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index f75c9f861e..697eb06f38 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -75,7 +75,9 @@ def add_one(data1 : pd.Series): """ def wrapper(func: Callable) -> HopsworksUdf: - udf = HopsworksUdf(func=func, return_types=return_type, dropped_features=drop) + udf = HopsworksUdf( + func=func, return_types=return_type, dropped_argument_names=drop + ) return udf return wrapper @@ -143,7 +145,11 @@ def __init__( return_types: Union[List[type], type, List[str], str], name: Optional[str] = None, transformation_features: Optional[List[TransformationFeature]] = None, - dropped_features: Optional[List[str]] = None, + transformation_function_argument_names: Optional[ + List[TransformationFeature] + ] = None, + dropped_argument_names: Optional[List[str]] = None, + dropped_feature_names: Optional[List[str]] = None, feature_name_prefix: Optional[str] = None, ): self._return_types: List[str] = HopsworksUdf._validate_and_convert_output_types( @@ -162,24 +168,41 @@ def __init__( else func ) if not transformation_features: + # New transformation function being declared so extract source code from function self._transformation_features: List[TransformationFeature] = ( HopsworksUdf._extract_function_arguments(func) if not transformation_features else transformation_features ) + + self._transformation_function_argument_names = [ + feature.feature_name for feature in self._transformation_features + ] + + self._dropped_argument_names: List[str] = ( + HopsworksUdf._validate_and_convert_drop_features( + dropped_argument_names, + self.transformation_features, + feature_name_prefix, + ) + ) + self._dropped_features = self._dropped_argument_names else: self._transformation_features = transformation_features + self._transformation_function_argument_names = ( + transformation_function_argument_names + ) + self._dropped_argument_names = dropped_argument_names + self._dropped_features = ( + dropped_feature_names + if dropped_feature_names + else dropped_argument_names + ) self._formatted_function_source, self._module_imports = ( HopsworksUdf._format_source_code(self._function_source) ) - self._dropped_features: List[str] = ( - HopsworksUdf._validate_and_convert_drop_features( - dropped_features, self.transformation_features, feature_name_prefix - ) - ) - self._statistics: Optional[TransformationStatistics] = None self._udf_type: UDFType = None @@ -201,7 +224,7 @@ def _validate_and_convert_drop_features( `List[str]`: A list of features to be dropped. """ if not dropped_features: - return [] + return None dropped_features = ( [dropped_features] @@ -554,11 +577,16 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf": f'Feature names provided must be string "{arg}" is not string' ) transformation_feature_name = self.transformation_features - index_dropped_features = [ - transformation_feature_name.index(dropped_feature) - for dropped_feature in self.dropped_features - ] - updated_dropped_features = [features[index] for index in index_dropped_features] + if self.dropped_features: + index_dropped_features = [ + transformation_feature_name.index(dropped_feature) + for dropped_feature in self.dropped_features + ] + updated_dropped_features = [ + features[index] for index in index_dropped_features + ] + else: + updated_dropped_features = None # Create a copy of the UDF to associate it with new feature names. udf = copy.deepcopy(self) @@ -601,6 +629,8 @@ def get_udf(self, force_python_udf: bool = False) -> Callable: # Returns `Callable`: Pandas UDF in the spark engine otherwise returns a python function for the UDF. """ + if self.udf_type is None: + raise FeatureStoreException("UDF Type cannot be None") if engine.get_type() in ["hive", "python", "training"] or force_python_udf: return self.hopsworksUdf_wrapper() @@ -623,7 +653,8 @@ def to_dict(self) -> Dict[str, Any]: "sourceCode": self._function_source, "outputTypes": self.return_types, "transformationFeatures": self.transformation_features, - "droppedFeatures": self.dropped_features, + "transformationFunctionArgumentNames": self._transformation_function_argument_names, + "droppedArgumentNames": self._dropped_argument_names, "statisticsArgumentNames": self._statistics_argument_names if self.statistics_required else None, @@ -663,12 +694,12 @@ def from_response_json( transformation_features = [ feature.strip() for feature in json_decamelized["transformation_features"] ] - dropped_features = ( + dropped_argument_names = ( [ dropped_feature.strip() - for dropped_feature in json_decamelized["dropped_features"] + for dropped_feature in json_decamelized["dropped_argument_names"] ] - if "dropped_features" in json_decamelized + if "dropped_argument_names" in json_decamelized else None ) statistics_features = ( @@ -687,11 +718,14 @@ def from_response_json( arg_list if not transformation_features else transformation_features ) - if dropped_features: - dropped_features = [ - transformation_features[arg_list.index(dropped_feature)] - for dropped_feature in dropped_features + dropped_feature_names = ( + [ + transformation_features[arg_list.index(dropped_argument_name)] + for dropped_argument_name in dropped_argument_names ] + if dropped_argument_names + else None + ) if statistics_features: transformation_features = [ @@ -714,7 +748,8 @@ def from_response_json( return_types=output_types, name=function_name, transformation_features=transformation_features, - dropped_features=dropped_features, + dropped_argument_names=dropped_argument_names, + dropped_feature_names=dropped_feature_names, feature_name_prefix=feature_name_prefix, ) @@ -728,8 +763,6 @@ def _validate_udf_type(self): # Raises `hsfs.client.exceptions.FeatureStoreException` : If the UDF Type is None or if statistics or multiple columns has been output by a on-demand transformation function """ - if self.udf_type is None: - raise FeatureStoreException("UDF Type cannot be None") if self._udf_type == UDFType.ON_DEMAND: if len(self.return_types) > 1: @@ -843,7 +876,7 @@ def dropped_features(self) -> List[str]: """ List of features that will be dropped after the UDF is applied. """ - if self._feature_name_prefix: + if self._feature_name_prefix and self._dropped_features: return [ self._feature_name_prefix + dropped_feature for dropped_feature in self._dropped_features diff --git a/python/hsfs/transformation_function.py b/python/hsfs/transformation_function.py index 65535aa539..fe30047384 100644 --- a/python/hsfs/transformation_function.py +++ b/python/hsfs/transformation_function.py @@ -241,5 +241,7 @@ def __repr__(self): return ( f"Model-Dependent Transformation Function : {repr(self.hopsworks_udf)}" ) - else: + elif self.hopsworks_udf._udf_type == UDFType.ON_DEMAND: return f"On-Demand Transformation Function : {repr(self.hopsworks_udf)}" + else: + return f"Transformation Function : {repr(self.hopsworks_udf)}" diff --git a/python/tests/engine/test_python_spark_transformation_functions.py b/python/tests/engine/test_python_spark_transformation_functions.py index 71bb48cd05..8c29128641 100644 --- a/python/tests/engine/test_python_spark_transformation_functions.py +++ b/python/tests/engine/test_python_spark_transformation_functions.py @@ -161,7 +161,7 @@ def test_apply_builtin_minmax_from_backend(self, mocker): "transformationFeatures": [], "statisticsArgumentNames": ["feature"], "name": "min_max_scaler", - "droppedFeatures": ["feature"], + "droppedArgumentNames": ["feature"], } tf_fun = HopsworksUdf.from_response_json(udf_response) @@ -304,7 +304,7 @@ def test_apply_builtin_standard_scaler_from_backend(self, mocker): "transformationFeatures": [], "statisticsArgumentNames": ["feature"], "name": "standard_scaler", - "droppedFeatures": ["feature"], + "droppedArgumentNames": ["feature"], } tf_fun = HopsworksUdf.from_response_json(udf_response) @@ -451,7 +451,7 @@ def test_apply_builtin_robust_scaler_from_backend(self, mocker): "transformationFeatures": [], "statisticsArgumentNames": ["feature"], "name": "robust_scaler", - "droppedFeatures": ["feature"], + "droppedArgumentNames": ["feature"], } tf_fun = HopsworksUdf.from_response_json(udf_response) diff --git a/python/tests/fixtures/feature_group_fixtures.json b/python/tests/fixtures/feature_group_fixtures.json index c2394ed4cb..bc967508b0 100644 --- a/python/tests/fixtures/feature_group_fixtures.json +++ b/python/tests/fixtures/feature_group_fixtures.json @@ -695,7 +695,7 @@ "name": "add_two", "outputTypes":["double"], "transformationFeatures":["data"], - "dropped_features":["data1"] + "dropped_argument_names":["data1"] } }, { @@ -707,7 +707,7 @@ "name": "add_one_fs", "outputTypes":["double"], "transformationFeatures":["col1"], - "dropped_features":["data1"] + "dropped_argument_names":["data1"] } } ], diff --git a/python/tests/fixtures/feature_view_fixtures.json b/python/tests/fixtures/feature_view_fixtures.json index 1ad25dea36..260cffd0c9 100644 --- a/python/tests/fixtures/feature_view_fixtures.json +++ b/python/tests/fixtures/feature_view_fixtures.json @@ -935,7 +935,7 @@ "outputTypes":["double"], "transformationFeatures":["data"], "statisticsArgumentNames":["data1"], - "dropped_features":["data1"] + "dropped_argument_names":["data1"] } }, { @@ -947,7 +947,7 @@ "name": "add_one_fs", "outputTypes":["double"], "transformationFeatures":["col1"], - "dropped_features":["data1"] + "dropped_argument_names":["data1"] } } ], diff --git a/python/tests/fixtures/training_dataset_feature_fixtures.json b/python/tests/fixtures/training_dataset_feature_fixtures.json index 27cd07f302..0ca85653c8 100644 --- a/python/tests/fixtures/training_dataset_feature_fixtures.json +++ b/python/tests/fixtures/training_dataset_feature_fixtures.json @@ -79,7 +79,7 @@ "name": "add_one_fs", "outputTypes":["double"], "transformationFeatures":["col1"], - "dropped_features":["data1"] + "dropped_argument_names":["data1"] } }, "featuregroup": { diff --git a/python/tests/fixtures/transformation_function_fixtures.json b/python/tests/fixtures/transformation_function_fixtures.json index 036eb2fac7..2604d5d75e 100644 --- a/python/tests/fixtures/transformation_function_fixtures.json +++ b/python/tests/fixtures/transformation_function_fixtures.json @@ -9,7 +9,7 @@ "name": "add_one_fs", "outputTypes":["double"], "transformationFeatures":["col1"], - "dropped_features":["data1"] + "dropped_argument_names":["data1"] } } }, @@ -24,7 +24,7 @@ "outputTypes":["double"], "transformationFeatures":["data"], "statisticsArgumentNames":["data1"], - "dropped_features":["data1"] + "dropped_argument_names":["data1"] } } }, @@ -39,7 +39,7 @@ "outputTypes":["string"], "transformationFeatures":["feature1", "feature2", "feature3"], "statisticsArgumentNames":["data1", "data2"], - "dropped_features":["data1", "data2", "data3"] + "dropped_argument_names":["data1", "data2", "data3"] } } }, @@ -54,7 +54,7 @@ "outputTypes":["string", "double"], "transformationFeatures":["feature1", "feature2", "feature3"], "statisticsArgumentNames":["data1", "data2"], - "dropped_features":["data1", "data2", "data3"] + "dropped_argument_names":["data1", "data2", "data3"] } } }, @@ -72,7 +72,7 @@ "outputTypes":["double"], "transformationFeatures":["data"], "statisticsArgumentNames":["data1"], - "dropped_features":["data1"] + "dropped_argument_names":["data1"] } }, { @@ -84,7 +84,7 @@ "name": "add_one_fs", "outputTypes":["double"], "transformationFeatures":["col1"], - "dropped_features":["data1"] + "dropped_argument_names":["data1"] } } ] @@ -104,7 +104,7 @@ "outputTypes":["double"], "transformationFeatures":["data"], "statisticsArgumentNames":["data1"], - "dropped_features":["data1"] + "dropped_argument_names":["data1"] } } ]