Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on-deamnd tranformations working #2

Closed
wants to merge 58 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
85a27ac
hopsworks_udf first version
manu-sj Apr 15, 2024
9e4478b
working code for running hopsworks udf without saving in backend usin…
manu-sj Apr 17, 2024
75441a3
removing debugging logs
manu-sj Apr 17, 2024
7af03f2
statistics working with python client
manu-sj Apr 18, 2024
3ac5b26
basic functionality working with backend
manu-sj Apr 25, 2024
df5c969
code with statistics working and saved to backend
manu-sj Apr 25, 2024
2e9aa72
working code for feature vector
manu-sj Apr 29, 2024
fceb9b5
reformatted and documented Hopswork UDF class
manu-sj May 2, 2024
52167f1
unit tests for transformation functions
manu-sj May 3, 2024
a66f9e3
clearning transformations engine and adding unit tests
manu-sj May 3, 2024
853995a
feature view api formated
manu-sj May 3, 2024
b4a37af
reformatting and fixing feature_view_engine
manu-sj May 4, 2024
2a62500
reformatted and added unit tests for feature view
manu-sj May 4, 2024
35d72dc
updating documentation for feature store
manu-sj May 4, 2024
7ca35fd
updating documentation for feature store
manu-sj May 4, 2024
5e377e6
fixed tests for training datatset features
manu-sj May 5, 2024
fa12032
reformatted and added unit tests for python engine
manu-sj May 6, 2024
f79a349
most unit tests fixed
manu-sj May 6, 2024
5608c18
all unit tests working
manu-sj May 13, 2024
3fc94f8
removed print
manu-sj May 13, 2024
2bf5f20
adding test for hopsworks_udf
manu-sj May 13, 2024
594640c
correcting merge for vector server
manu-sj May 13, 2024
f0e9540
reformatting with ruff
manu-sj May 13, 2024
6072642
fixing vector server
manu-sj May 13, 2024
f46f0b7
fixing docs
manu-sj May 13, 2024
07348d5
fixing vector server
manu-sj May 14, 2024
41a02ac
fixing building in transformations
manu-sj May 14, 2024
2215606
correcting get feature vector
manu-sj May 14, 2024
e1d7abe
adding missed changes for build in transformations
manu-sj May 16, 2024
2d0bca3
shallow copying scope dictonary to not overwrite statistics variable …
manu-sj May 17, 2024
37f96fa
adding deep copy to create multiple transfromation functions with dif…
manu-sj May 20, 2024
37a8b23
sorting transformation function to maintain consistent order
manu-sj May 21, 2024
eb77d70
sorting transformation functions in transformation function engine to…
manu-sj May 21, 2024
68c95aa
using feature view transformation functions
manu-sj May 21, 2024
88bff75
addressing review comments
manu-sj May 23, 2024
5ea3e43
using PYARROW_EXTENSION_ENABLE during import rather than as a function
manu-sj May 27, 2024
58678bc
skiping transformation function test in windows spark udf failing due…
manu-sj May 27, 2024
be5036b
changing transformed_feature_vector_col_name to transformed_features …
manu-sj May 27, 2024
3a01ead
adding property transformed_features in feature view to obtain featur…
manu-sj May 27, 2024
2753ec4
updating doc string and adding property decorator missed during rebase
manu-sj May 28, 2024
23c7b8a
refactoring transformation functions to update parsing of statistics …
manu-sj Jun 7, 2024
659f2ab
refactoring transformation functions to update parsing of statistics …
manu-sj Jun 7, 2024
0a22fd7
reformating with ruff
manu-sj Jun 7, 2024
159da54
adding statistics to udf only if required
manu-sj Jun 9, 2024
eef2cb5
convrting extended statistics to dictonary
manu-sj Jun 9, 2024
50e944c
sorting built in label encoder to maintain consistency
manu-sj Jun 9, 2024
7111f86
adding type hints for class TransformationStatistics
manu-sj Jun 13, 2024
114a792
adapating to backend update of reaturning output_types, transformatio…
manu-sj Jun 14, 2024
bd4bb1f
fixing unit tests
manu-sj Jun 14, 2024
64f34cd
removign space in doc string
manu-sj Jun 17, 2024
9891900
replace - from output column names with _
manu-sj Jun 17, 2024
6ebd9f4
revreting unwanted spark test _ replace changes
manu-sj Jun 17, 2024
c020210
on-deamnd tranformations working
manu-sj Jul 1, 2024
e87331e
fixing unit tests
manu-sj Jul 3, 2024
202358d
adding unit tests for on-demand transformation functions
manu-sj Jul 4, 2024
99001d2
adding documentation
manu-sj Jul 4, 2024
c71af3b
adopting changes in backend for UI
manu-sj Jul 8, 2024
4681f33
fixing unit tests
manu-sj Jul 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions python/hsfs/builtin_transformations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright 2024 Hopsworks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numpy as np
import pandas as pd
from hsfs.hopsworks_udf import udf
from hsfs.transformation_statistics import TransformationStatistics


feature_statistics = TransformationStatistics("feature")


@udf(float, drop=["feature"])
def min_max_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.min) / (
statistics.feature.max - statistics.feature.min
)


@udf(float, drop=["feature"])
def standard_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.mean) / statistics.feature.stddev


@udf(float, drop=["feature"])
def robust_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.percentiles[49]) / (
statistics.feature.percentiles[74] - statistics.feature.percentiles[24]
)


@udf(int, drop=["feature"])
def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
unique_data = sorted(
[value for value in statistics.feature.extended_statistics["unique_values"]]
)
value_to_index = {value: index for index, value in enumerate(unique_data)}
return pd.Series(
[value_to_index[data] if not pd.isna(data) else np.nan for data in feature]
)


@udf(bool, drop=["feature"])
def one_hot_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
unique_data = [
value for value in statistics.feature.extended_statistics["unique_values"]
]
one_hot = pd.get_dummies(feature, dtype="bool")
for data in unique_data:
if data not in one_hot:
one_hot[data] = False
# Sorting by columns so as to maintain consistency in column order.
return one_hot.reindex(sorted(one_hot.columns), axis=1)
2 changes: 1 addition & 1 deletion python/hsfs/constructor/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(
fg_mod.ExternalFeatureGroup,
fg_mod.SpineGroup,
],
left_features: List[Union[str, "Feature"]],
left_features: List[Union[str, "Feature", Dict]],
feature_store_name: Optional[str] = None,
feature_store_id: Optional[int] = None,
left_feature_group_start_time: Optional[Union[str, int, date, datetime]] = None,
Expand Down
107 changes: 0 additions & 107 deletions python/hsfs/core/builtin_transformation_function.py

This file was deleted.

16 changes: 13 additions & 3 deletions python/hsfs/core/feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ def save(
feature_group_instance.feature_store_id,
"featuregroups",
]
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
headers = {"content-type": "application/json"}
feature_group_object = feature_group_instance.update_from_response_json(
_client._send_request(
"POST",
path_params,
headers=headers,
data=feature_group_instance.json(),
query_params=query_params,
),
)
return feature_group_object
Expand Down Expand Up @@ -93,7 +97,11 @@ def get(
"featuregroups",
name,
]
query_params = None if version is None else {"version": version}
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
if version is not None:
query_params["version"] = version

fg_objs = []
# In principle unique names are enforced across fg type and this should therefore
Expand Down Expand Up @@ -157,8 +165,10 @@ def get_by_id(
"featuregroups",
feature_group_id,
]

fg_json = _client._send_request("GET", path_params)
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
fg_json = _client._send_request("GET", path_params, query_params)
if (
fg_json["type"] == FeatureGroupApi.BACKEND_FG_STREAM
or fg_json["type"] == FeatureGroupApi.BACKEND_FG_BATCH
Expand Down
8 changes: 6 additions & 2 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def insert(
validation_options: dict = None,
):
dataframe_features = engine.get_instance().parse_schema_feature_group(
feature_dataframe, feature_group.time_travel_format
feature_dataframe,
feature_group.time_travel_format,
feature_group.transformation_functions,
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
Expand Down Expand Up @@ -281,7 +283,9 @@ def insert_stream(
)

dataframe_features = engine.get_instance().parse_schema_feature_group(
dataframe, feature_group.time_travel_format
dataframe,
feature_group.time_travel_format,
feature_group.transformation_functions,
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
Expand Down
55 changes: 35 additions & 20 deletions python/hsfs/core/feature_view_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@

from typing import List, Optional, Union

from hsfs import (
client,
feature_view,
training_dataset,
transformation_function_attached,
)
from hsfs import client, feature_view, training_dataset
from hsfs.client.exceptions import RestAPIError
from hsfs.constructor import query, serving_prepared_statement
from hsfs.core import explicit_provenance, job, training_dataset_job_conf
Expand Down Expand Up @@ -78,13 +73,28 @@ def update(self, feature_view_obj: feature_view.FeatureView) -> None:
data=feature_view_obj.json(),
)

def get_by_name(self, name: str) -> feature_view.FeatureView:
def get_by_name(self, name: str) -> List[feature_view.FeatureView]:
"""
Get a feature view from the backend using its name.

# Arguments
name `str`: Name of the feature view.

# Returns
`List[FeatureView]`: A list that contains all version of the feature view.

# Raises
`RestAPIError`: If the feature view cannot be found from the backend.
`ValueError`: If the feature group associated with the feature view cannot be found.
"""
path = self._base_path + [name]
try:
return [
feature_view.FeatureView.from_response_json(fv)
for fv in self._client._send_request(
self._GET, path, {"expand": ["query", "features"]}
self._GET,
path,
{"expand": ["query", "features", "transformationfunctions"]},
)["items"]
]
except RestAPIError as e:
Expand All @@ -98,11 +108,27 @@ def get_by_name(self, name: str) -> feature_view.FeatureView:
raise e

def get_by_name_version(self, name: str, version: int) -> feature_view.FeatureView:
"""
Get a feature view form the backend using both name and version

# Arguments
name `str`: Name of feature view.
version `version`: Version of the feature view.

# Returns
`FeatureView`

# Raises
`RestAPIError`: If the feature view cannot be found from the backend.
`ValueError`: If the feature group associated with the feature view cannot be found.
"""
path = self._base_path + [name, self._VERSION, version]
try:
return feature_view.FeatureView.from_response_json(
self._client._send_request(
self._GET, path, {"expand": ["query", "features"]}
self._GET,
path,
{"expand": ["query", "features", "transformationfunctions"]},
)
)
except RestAPIError as e:
Expand Down Expand Up @@ -180,17 +206,6 @@ def get_serving_prepared_statement(
self._client._send_request("GET", path, query_params, headers=headers)
)

def get_attached_transformation_fn(
self, name: str, version: int
) -> Union[
"transformation_function_attached.TransformationFunctionAttached",
List["transformation_function_attached.TransformationFunctionAttached"],
]:
path = self._base_path + [name, self._VERSION, version, self._TRANSFORMATION]
return transformation_function_attached.TransformationFunctionAttached.from_response_json(
self._client._send_request("GET", path)
)

def create_training_dataset(
self,
name: str,
Expand Down
Loading