diff --git a/java/beam/pom.xml b/java/beam/pom.xml index 342cae22b2..aea546d239 100644 --- a/java/beam/pom.xml +++ b/java/beam/pom.xml @@ -5,7 +5,7 @@ hsfs-parent com.logicalclocks - 3.8.0-RC0 + 3.8.0-RC1 4.0.0 diff --git a/java/flink/pom.xml b/java/flink/pom.xml index 9b0805db8e..e51122de22 100644 --- a/java/flink/pom.xml +++ b/java/flink/pom.xml @@ -5,7 +5,7 @@ hsfs-parent com.logicalclocks - 3.8.0-RC0 + 3.8.0-RC1 4.0.0 diff --git a/java/hsfs/pom.xml b/java/hsfs/pom.xml index d245d10d3d..c8e99e2a6c 100644 --- a/java/hsfs/pom.xml +++ b/java/hsfs/pom.xml @@ -5,7 +5,7 @@ hsfs-parent com.logicalclocks - 3.8.0-RC0 + 3.8.0-RC1 4.0.0 diff --git a/java/pom.xml b/java/pom.xml index 1baf33cf4e..0cc46465f8 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -7,7 +7,7 @@ com.logicalclocks hsfs-parent pom - 3.8.0-RC0 + 3.8.0-RC1 hsfs spark diff --git a/java/spark/pom.xml b/java/spark/pom.xml index 5d7cba068a..4c4dea00e1 100644 --- a/java/spark/pom.xml +++ b/java/spark/pom.xml @@ -22,7 +22,7 @@ hsfs-parent com.logicalclocks - 3.8.0-RC0 + 3.8.0-RC1 4.0.0 diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index f8dbb7d992..83679af059 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -23,19 +23,23 @@ from functools import wraps from typing import Any, Dict, Optional, Union -import polars as pl import pyarrow import pyarrow._flight import pyarrow.flight from hsfs import client, feature_group, util from hsfs.client.exceptions import FeatureStoreException from hsfs.constructor import query +from hsfs.core.constants import HAS_POLARS, polars_not_installed_message from hsfs.core.variable_api import VariableApi from hsfs.storage_connector import StorageConnector from pyarrow.flight import FlightServerError from retrying import retry +if HAS_POLARS: + import polars as pl + + _logger = logging.getLogger(__name__) @@ -399,6 +403,8 @@ def _get_dataset(self, descriptor, timeout=None, dataframe_type="pandas"): reader = self._connection.do_get(info.endpoints[0].ticket, options) _logger.debug("Dataset fetched. Converting to dataframe %s.", dataframe_type) if dataframe_type.lower() == "polars": + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) return pl.from_arrow(reader.read_all()) else: return reader.read_pandas() diff --git a/python/hsfs/core/constants.py b/python/hsfs/core/constants.py new file mode 100644 index 0000000000..cdff9369fc --- /dev/null +++ b/python/hsfs/core/constants.py @@ -0,0 +1,28 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import importlib.util + + +polars_not_installed_message = ( + "Polars package not found. " + "If you want to use Polars with Hopsworks you can install the corresponding extras " + """`pip install hopsworks[polars]` or `pip install "hopsworks[polars]"` if using zsh. """ + "You can also install polars directly in your environment e.g `pip install polars`. " + "You will need to restart your kernel if applicable." +) + +HAS_POLARS: bool = importlib.util.find_spec("polars") is not None diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 9eca5dd3cd..21106e9344 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -20,13 +20,22 @@ from base64 import b64decode from datetime import datetime, timezone from io import BytesIO -from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + List, + Literal, + Optional, + Set, + Tuple, + Union, +) import avro.io import avro.schema import numpy as np import pandas as pd -import polars as pl from hsfs import ( client, feature_view, @@ -49,6 +58,7 @@ from hsfs.core import ( transformation_function_engine as tf_engine_mod, ) +from hsfs.core.constants import HAS_POLARS, polars_not_installed_message HAS_FASTAVRO = False @@ -59,6 +69,9 @@ except ImportError: from avro.io import BinaryDecoder +if HAS_POLARS: + import polars as pl + _logger = logging.getLogger(__name__) @@ -487,6 +500,9 @@ def handle_feature_vector_return_type( return pandas_df elif return_type.lower() == "polars": _logger.debug("Returning feature vector as polars dataframe") + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) + return pl.DataFrame( feature_vectorz if batch else [feature_vectorz], schema=self._feature_vector_col_name if not inference_helper else None, diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 8e64e6ec95..3132f343c9 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -30,7 +30,16 @@ from datetime import datetime, timezone from io import BytesIO from pathlib import Path -from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + List, + Literal, + Optional, + Tuple, + Union, +) import avro import boto3 @@ -38,7 +47,6 @@ import hsfs import numpy as np import pandas as pd -import polars as pl import pyarrow as pa import pytz from botocore.response import StreamingBody @@ -70,6 +78,7 @@ transformation_function_engine, variable_api, ) +from hsfs.core.constants import HAS_POLARS, polars_not_installed_message from hsfs.core.vector_db_client import VectorDbClient from hsfs.feature_group import ExternalFeatureGroup, FeatureGroup from hsfs.training_dataset import TrainingDataset @@ -84,6 +93,9 @@ # Disable pyhive INFO logging logging.getLogger("pyhive").setLevel(logging.WARNING) +if HAS_POLARS: + import polars as pl + HAS_FAST = False try: from fastavro import schemaless_writer @@ -206,7 +218,6 @@ def _sql_offline( hive_config: Optional[Dict[str, Any]] = None, arrow_flight_config: Optional[Dict[str, Any]] = None, ) -> Union[pd.DataFrame, pl.DataFrame]: - self._validate_dataframe_type(dataframe_type) if isinstance(sql_query, dict) and "query_string" in sql_query: result_df = util.run_with_loading_animation( @@ -224,6 +235,9 @@ def _sql_offline( with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) if dataframe_type.lower() == "polars": + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) + result_df = util.run_with_loading_animation( "Reading data from Hopsworks, using Hive", pl.read_database, @@ -264,6 +278,9 @@ def _jdbc( if "sqlalchemy" in str(type(mysql_conn)): sql_query = sql.text(sql_query) if dataframe_type.lower() == "polars": + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) + result_df = pl.read_database(sql_query, mysql_conn) else: result_df = pd.read_sql(sql_query, mysql_conn) @@ -300,6 +317,8 @@ def read( # Below check performed since some files materialized when creating training data are empty # If empty dataframe is in df_list then polars cannot concatenate df_list due to schema mismatch # However if the entire split contains only empty files which can occur when the data size is very small then one of the empty dataframe is return so that the column names can be accessed. + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) non_empty_df_list = [df for df in df_list if not df.is_empty()] if non_empty_df_list: return self._return_dataframe_type( @@ -329,6 +348,8 @@ def _read_pandas(self, data_format: str, obj: Any) -> pd.DataFrame: ) def _read_polars(self, data_format: str, obj: Any) -> pl.DataFrame: + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) if data_format.lower() == "csv": return pl.read_csv(obj) elif data_format.lower() == "tsv": @@ -510,13 +531,20 @@ def show( sql_query, feature_store, online_conn, "default", read_options or {} ).head(n) - def read_vector_db(self, feature_group: "hsfs.feature_group.FeatureGroup", n: int =None, dataframe_type: str="default") -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[List[Any]]]: + def read_vector_db( + self, + feature_group: "hsfs.feature_group.FeatureGroup", + n: int = None, + dataframe_type: str = "default", + ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[List[Any]]]: dataframe_type = dataframe_type.lower() self._validate_dataframe_type(dataframe_type) results = VectorDbClient.read_feature_group(feature_group, n) feature_names = [f.name for f in feature_group.features] if dataframe_type == "polars": + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) df = pl.DataFrame(results, schema=feature_names) else: df = pd.DataFrame(results, columns=feature_names, index=None) @@ -576,7 +604,9 @@ def profile( exact_uniqueness: bool = True, ) -> str: # TODO: add statistics for correlations, histograms and exact_uniqueness - if isinstance(df, pl.DataFrame) or isinstance(df, pl.dataframe.frame.DataFrame): + if HAS_POLARS and ( + isinstance(df, pl.DataFrame) or isinstance(df, pl.dataframe.frame.DataFrame) + ): arrow_schema = df.to_arrow().schema else: arrow_schema = pa.Schema.from_pandas(df, preserve_index=False) @@ -589,8 +619,9 @@ def profile( or pa.types.is_large_list(field.type) or pa.types.is_struct(field.type) ) and PYARROW_HOPSWORKS_DTYPE_MAPPING[field.type] in ["timestamp", "date"]: - if isinstance(df, pl.DataFrame) or isinstance( - df, pl.dataframe.frame.DataFrame + if HAS_POLARS and ( + isinstance(df, pl.DataFrame) + or isinstance(df, pl.dataframe.frame.DataFrame) ): df = df.with_columns(pl.col(field.name).cast(pl.String)) else: @@ -609,8 +640,9 @@ def profile( stats[col] = df[col].describe().to_dict() final_stats = [] for col in relevant_columns: - if isinstance(df, pl.DataFrame) or isinstance( - df, pl.dataframe.frame.DataFrame + if HAS_POLARS and ( + isinstance(df, pl.DataFrame) + or isinstance(df, pl.dataframe.frame.DataFrame) ): stats[col] = dict(zip(stats["statistic"], stats[col])) # set data type @@ -694,8 +726,9 @@ def validate_with_great_expectations( ) -> ge.core.ExpectationSuiteValidationResult: # This conversion might cause a bottleneck in performance when using polars with greater expectations. # This patch is done becuase currently great_expecatations does not support polars, would need to be made proper when support added. - if isinstance(dataframe, pl.DataFrame) or isinstance( - dataframe, pl.dataframe.frame.DataFrame + if HAS_POLARS and ( + isinstance(dataframe, pl.DataFrame) + or isinstance(dataframe, pl.dataframe.frame.DataFrame) ): warnings.warn( "Currently Great Expectations does not support Polars dataframes. This operation will convert to Pandas dataframe that can be slow.", @@ -716,10 +749,12 @@ def set_job_group(self, group_id: str, description: Optional[str]) -> None: def convert_to_default_dataframe( self, dataframe: Union[pd.DataFrame, pl.DataFrame, pl.dataframe.frame.DataFrame] ) -> Optional[pd.DataFrame]: - if ( - isinstance(dataframe, pd.DataFrame) - or isinstance(dataframe, pl.DataFrame) - or isinstance(dataframe, pl.dataframe.frame.DataFrame) + if isinstance(dataframe, pd.DataFrame) or ( + HAS_POLARS + and ( + isinstance(dataframe, pl.DataFrame) + or isinstance(dataframe, pl.dataframe.frame.DataFrame) + ) ): upper_case_features = [ col for col in dataframe.columns if any(re.finditer("[A-Z]", col)) @@ -762,7 +797,7 @@ def convert_to_default_dataframe( dataframe_copy[col].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype ): dataframe_copy[col] = dataframe_copy[col].dt.tz_convert(None) - elif isinstance(dataframe_copy[col].dtype, pl.Datetime): + elif HAS_POLARS and isinstance(dataframe_copy[col].dtype, pl.Datetime): dataframe_copy = dataframe_copy.with_columns( pl.col(col).dt.replace_time_zone(None) ) @@ -782,8 +817,10 @@ def parse_schema_feature_group( ) -> List[feature.Feature]: if isinstance(dataframe, pd.DataFrame): arrow_schema = pa.Schema.from_pandas(dataframe, preserve_index=False) - elif isinstance(dataframe, pl.DataFrame) or isinstance( - dataframe, pl.dataframe.frame.DataFrame + elif ( + HAS_POLARS + and isinstance(dataframe, pl.DataFrame) + or isinstance(dataframe, pl.dataframe.frame.DataFrame) ): arrow_schema = dataframe.to_arrow().schema features = [] @@ -1005,13 +1042,16 @@ def _random_split( groups += [i] * int(df_size * split.percentage) groups += [len(splits) - 1] * (df_size - len(groups)) random.shuffle(groups) - if isinstance(df, pl.DataFrame) or isinstance(df, pl.dataframe.frame.DataFrame): + if HAS_POLARS and ( + isinstance(df, pl.DataFrame) or isinstance(df, pl.dataframe.frame.DataFrame) + ): df = df.with_columns(pl.Series(name=split_column, values=groups)) else: df[split_column] = groups for i, split in enumerate(splits): - if isinstance(df, pl.DataFrame) or isinstance( - df, pl.dataframe.frame.DataFrame + if HAS_POLARS and ( + isinstance(df, pl.DataFrame) + or isinstance(df, pl.dataframe.frame.DataFrame) ): split_df = df.filter(pl.col(split_column) == i).drop(split_column) else: @@ -1237,8 +1277,9 @@ def _apply_transformation_function( feature_name, transformation_fn, ) in transformation_functions.items(): - if isinstance(dataset, pl.DataFrame) or isinstance( - dataset, pl.dataframe.frame.DataFrame + if HAS_POLARS and ( + isinstance(dataset, pl.DataFrame) + or isinstance(dataset, pl.dataframe.frame.DataFrame) ): dataset = dataset.with_columns( pl.col(feature_name).map_elements( @@ -1251,8 +1292,11 @@ def _apply_transformation_function( ) # The below functions is not required for Polars since polars does have object types like pandas if not ( - isinstance(dataset, pl.DataFrame) - or isinstance(dataset, pl.dataframe.frame.DataFrame) + HAS_POLARS + and ( + isinstance(dataset, pl.DataFrame) + or isinstance(dataset, pl.dataframe.frame.DataFrame) + ) ): offline_type = Engine.convert_spark_type_to_offline_type( transformation_fn.output_type @@ -1439,8 +1483,11 @@ def acked(err: Exception, msg: Any) -> None: elif not isinstance( feature_group, ExternalFeatureGroup ) and self._start_offline_materialization(offline_write_options): - if (not offline_write_options.get("skip_offsets", False) - and self._job_api.last_execution(feature_group.materialization_job)): # always skip offsets if executing job for the first time + if not offline_write_options.get( + "skip_offsets", False + ) and self._job_api.last_execution( + feature_group.materialization_job + ): # always skip offsets if executing job for the first time # don't provide the current offsets (read from where the job last left off) initial_check_point = "" # provide the initial_check_point as it will reduce the read amplification of materialization job @@ -1638,12 +1685,12 @@ def _cast_column_to_offline_type( offline_type = offline_type.lower() if offline_type == "timestamp": # convert (if tz!=UTC) to utc, then make timezone unaware - if isinstance(feature_column, pl.Series): + if HAS_POLARS and isinstance(feature_column, pl.Series): return feature_column.cast(pl.Datetime(time_zone=None)) else: return pd.to_datetime(feature_column, utc=True).dt.tz_localize(None) elif offline_type == "date": - if isinstance(feature_column, pl.Series): + if HAS_POLARS and isinstance(feature_column, pl.Series): return feature_column.cast(pl.Date) else: return pd.to_datetime(feature_column, utc=True).dt.date @@ -1652,7 +1699,7 @@ def _cast_column_to_offline_type( or offline_type.startswith("struct<") or offline_type == "boolean" ): - if isinstance(feature_column, pl.Series): + if HAS_POLARS and isinstance(feature_column, pl.Series): return feature_column.map_elements( lambda x: (ast.literal_eval(x) if isinstance(x, str) else x) if (x is not None and x != "") @@ -1665,14 +1712,14 @@ def _cast_column_to_offline_type( else None ) elif offline_type == "string": - if isinstance(feature_column, pl.Series): + if HAS_POLARS and isinstance(feature_column, pl.Series): return feature_column.map_elements( lambda x: str(x) if x is not None else None ) else: return feature_column.apply(lambda x: str(x) if x is not None else None) elif offline_type.startswith("decimal"): - if isinstance(feature_column, pl.Series): + if HAS_POLARS and isinstance(feature_column, pl.Series): return feature_column.map_elements( lambda x: decimal.Decimal(x) if (x is not None) else None ) @@ -1681,7 +1728,7 @@ def _cast_column_to_offline_type( lambda x: decimal.Decimal(x) if (x is not None) else None ) else: - if isinstance(feature_column, pl.Series): + if HAS_POLARS and isinstance(feature_column, pl.Series): offline_dtype_mapping = { "bigint": pl.Int64, "int": pl.Int32, @@ -1700,7 +1747,7 @@ def _cast_column_to_offline_type( "double": np.dtype("float64"), } if offline_type in offline_dtype_mapping: - if isinstance(feature_column, pl.Series): + if HAS_POLARS and isinstance(feature_column, pl.Series): casted_feature = feature_column.cast( offline_dtype_mapping[offline_type] ) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index de5577417c..00c266eb10 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -29,7 +29,6 @@ import humps import numpy as np import pandas as pd -import polars as pl from hsfs import ( engine, feature, @@ -66,6 +65,7 @@ ) from hsfs.core import feature_monitoring_config as fmc from hsfs.core import feature_monitoring_result as fmr +from hsfs.core.constants import HAS_POLARS from hsfs.core.job import Job from hsfs.core.variable_api import VariableApi from hsfs.core.vector_db_client import VectorDbClient @@ -78,6 +78,9 @@ from hsfs.validation_report import ValidationReport +if HAS_POLARS: + import polars as pl + _logger = logging.getLogger(__name__) @@ -543,8 +546,13 @@ def get_storage_connector(self): """ storage_connector_provenance = self.get_storage_connector_provenance() - if storage_connector_provenance.inaccessible or storage_connector_provenance.deleted: - _logger.info("The parent storage connector is deleted or inaccessible. For more details access `get_storage_connector_provenance`") + if ( + storage_connector_provenance.inaccessible + or storage_connector_provenance.deleted + ): + _logger.info( + "The parent storage connector is deleted or inaccessible. For more details access `get_storage_connector_provenance`" + ) if storage_connector_provenance.accessible: return storage_connector_provenance.accessible[0] diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index c8a18dc6c0..d829b93f6d 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -25,7 +25,6 @@ import numpy import numpy as np import pandas as pd -import polars as pl from hsfs import ( expectation_suite, feature, @@ -47,12 +46,17 @@ training_dataset_api, transformation_function_engine, ) +from hsfs.core.constants import HAS_POLARS from hsfs.decorators import typechecked from hsfs.embedding import EmbeddingIndex from hsfs.statistics_config import StatisticsConfig from hsfs.transformation_function import TransformationFunction +if HAS_POLARS: + import polars as pl + + @typechecked class FeatureStore: DEFAULT_VERSION = 1 diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index d4ea5fc51d..32cd3b556b 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -20,12 +20,21 @@ import logging import warnings from datetime import date, datetime -from typing import Any, Dict, List, Literal, Optional, Set, Tuple, TypeVar, Union +from typing import ( + Any, + Dict, + List, + Literal, + Optional, + Set, + Tuple, + TypeVar, + Union, +) import humps import numpy as np import pandas as pd -import polars as pl from hsfs import ( feature_group, storage_connector, @@ -52,6 +61,7 @@ ) from hsfs.core import feature_monitoring_config as fmc from hsfs.core import feature_monitoring_result as fmr +from hsfs.core.constants import HAS_POLARS from hsfs.core.feature_view_api import FeatureViewApi from hsfs.core.vector_db_client import VectorDbClient from hsfs.decorators import typechecked @@ -61,25 +71,29 @@ from hsfs.training_dataset_split import TrainingDatasetSplit -_logger = logging.getLogger(__name__) +if HAS_POLARS: + import polars as pl + + TrainingDatasetDataFrameTypes = Union[ + pd.DataFrame, + TypeVar("pyspark.sql.DataFrame"), # noqa: F821 + TypeVar("pyspark.RDD"), # noqa: F821 + np.ndarray, + List[List[Any]], + pl.DataFrame, + ] -TrainingDatasetDataFrameTypes = Union[ - pd.DataFrame, - TypeVar("pyspark.sql.DataFrame"), # noqa: F821 - TypeVar("pyspark.RDD"), # noqa: F821 - np.ndarray, - List[List[Any]], - pl.DataFrame, -] - -SplineDataFrameTypes = Union[ - pd.DataFrame, - TypeVar("pyspark.sql.DataFrame"), # noqa: F821 - TypeVar("pyspark.RDD"), # noqa: F821 - np.ndarray, - List[List[Any]], - TypeVar("SplineGroup"), # noqa: F821 -] + SplineDataFrameTypes = Union[ + pd.DataFrame, + TypeVar("pyspark.sql.DataFrame"), # noqa: F821 + TypeVar("pyspark.RDD"), # noqa: F821 + np.ndarray, + List[List[Any]], + TypeVar("SplineGroup"), # noqa: F821 + ] + + +_logger = logging.getLogger(__name__) @typechecked diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 96596a5b0f..cd5c551149 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -26,11 +26,14 @@ import humps import numpy as np import pandas as pd -import polars as pl from hsfs import client, engine from hsfs.core import storage_connector_api +from hsfs.core.constants import HAS_POLARS +if HAS_POLARS: + import polars as pl + _logger = logging.getLogger(__name__) @@ -211,7 +214,9 @@ def get_feature_groups(self): feature_groups_provenance = self.get_feature_groups_provenance() if feature_groups_provenance.inaccessible or feature_groups_provenance.deleted: - _logger.info("There are deleted or inaccessible feature groups. For more details access `get_feature_groups_provenance`") + _logger.info( + "There are deleted or inaccessible feature groups. For more details access `get_feature_groups_provenance`" + ) if feature_groups_provenance.accessible: return feature_groups_provenance.accessible diff --git a/python/hsfs/version.py b/python/hsfs/version.py index bd2c9c4f99..0a80f3f6ba 100644 --- a/python/hsfs/version.py +++ b/python/hsfs/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "3.8.0rc0" +__version__ = "3.8.0rc1" diff --git a/python/pyproject.toml b/python/pyproject.toml index 0ba86dfc4a..f1bc1f908f 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -46,7 +46,6 @@ dependencies = [ "fsspec", "retrying", "hopsworks_aiomysql[sa]==0.2.1", - "polars>=0.20.18,<=0.21.0", "opensearch-py>=1.1.0,<=2.4.2", ] @@ -81,6 +80,7 @@ python = [ "fastavro>=1.4.11,<=1.8.4", "tqdm", ] +polars=["polars>=0.20.18,<=0.21.0"] [build-system] requires = ["setuptools", "wheel"] diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 08bc8d52a7..c885f4361d 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -18,7 +18,6 @@ import numpy as np import pandas as pd -import polars as pl import pyarrow as pa import pytest from confluent_kafka.admin import PartitionMetadata, TopicMetadata @@ -35,9 +34,14 @@ from hsfs.constructor import query from hsfs.constructor.hudi_feature_group_alias import HudiFeatureGroupAlias from hsfs.core import inode, job +from hsfs.core.constants import HAS_POLARS from hsfs.engine import python from hsfs.training_dataset_feature import TrainingDatasetFeature -from polars.testing import assert_frame_equal as polars_assert_frame_equal + + +if HAS_POLARS: + import polars as pl + from polars.testing import assert_frame_equal as polars_assert_frame_equal class TestPython: @@ -278,6 +282,10 @@ def test_read_hopsfs_connector_empty_dataframe(self, mocker): assert isinstance(dataframe, pd.DataFrame) assert len(dataframe) == 0 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_read_hopsfs_connector_empty_dataframe_polars(self, mocker): # Arrange @@ -429,6 +437,10 @@ def test_read_pandas_other(self, mocker): assert mock_pandas_read_csv.call_count == 0 assert mock_pandas_read_parquet.call_count == 0 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_read_polars_csv(self, mocker): # Arrange mock_pandas_read_csv = mocker.patch("polars.read_csv") @@ -443,6 +455,10 @@ def test_read_polars_csv(self, mocker): assert mock_pandas_read_csv.call_count == 1 assert mock_pandas_read_parquet.call_count == 0 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_read_polars_tsv(self, mocker): # Arrange mock_pandas_read_csv = mocker.patch("polars.read_csv") @@ -457,6 +473,10 @@ def test_read_polars_tsv(self, mocker): assert mock_pandas_read_csv.call_count == 1 assert mock_pandas_read_parquet.call_count == 0 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_read_polars_parquet(self, mocker): # Arrange mock_pandas_read_csv = mocker.patch("polars.read_csv") @@ -474,6 +494,10 @@ def test_read_polars_parquet(self, mocker): assert mock_pandas_read_csv.call_count == 0 assert mock_pandas_read_parquet.call_count == 1 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_read_polars_other(self, mocker): # Arrange mock_pandas_read_csv = mocker.patch("polars.read_csv") @@ -1013,6 +1037,10 @@ def test_profile_pandas_with_null_column(self, mocker): ) assert mock_python_engine_convert_pandas_statistics.call_count == 3 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_profile_polars(self, mocker): # Arrange mock_python_engine_convert_pandas_statistics = mocker.patch( @@ -1051,6 +1079,10 @@ def test_profile_polars(self, mocker): ) assert mock_python_engine_convert_pandas_statistics.call_count == 3 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_profile_polars_with_null_column(self, mocker): # Arrange mock_python_engine_convert_pandas_statistics = mocker.patch( @@ -1365,6 +1397,10 @@ def test_convert_to_default_dataframe_pandas_with_spaces(self, mocker): "Feature names are sanitized to use underscore '_' in the feature store." ) + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_convert_to_default_dataframe_polars(self, mocker): # Arrange mock_warnings = mocker.patch("warnings.warn") @@ -1431,6 +1467,10 @@ def test_parse_schema_feature_group_pandas(self, mocker): assert result[0].name == "col1" assert result[1].name == "col2" + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_parse_schema_feature_group_polars(self, mocker): # Arrange mocker.patch("hsfs.engine.python.Engine._convert_pandas_dtype_to_offline_type") @@ -2317,6 +2357,10 @@ def test_split_labels_dataframe_type_pandas(self): assert isinstance(result_df, pd.DataFrame) assert result_df_split is None + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_split_labels_dataframe_type_polars(self): # Arrange python_engine = python.Engine() @@ -2412,6 +2456,10 @@ def test_split_labels_labels_dataframe_type_pandas(self): assert isinstance(result_df, pd.DataFrame) assert isinstance(result_df_split, pd.Series) + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_split_labels_labels_dataframe_type_polars(self): # Arrange python_engine = python.Engine() @@ -3093,6 +3141,10 @@ def test_return_dataframe_type_pandas(self): # Assert assert str(result) == " col1 col2\n0 1 3\n1 2 4" + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_return_dataframe_type_polars(self): # Arrange python_engine = python.Engine() @@ -3269,6 +3321,10 @@ def plus_one(a): assert result["tf_name"][0] == 2 assert result["tf_name"][1] == 3 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_apply_transformation_function_polars(self, mocker): # Arrange mocker.patch("hsfs.client.get_instance") @@ -3763,7 +3819,7 @@ def test_materialization_kafka_first_job_execution(self, mocker): args="defaults tests_offsets", await_termination=False, ) - + def test_materialization_kafka_skip_offsets(self, mocker): # Arrange mocker.patch("hsfs.engine.python.Engine._get_kafka_config", return_value={}) @@ -3805,7 +3861,10 @@ def test_materialization_kafka_skip_offsets(self, mocker): python_engine._write_dataframe_kafka( feature_group=fg, dataframe=df, - offline_write_options={"start_offline_materialization": True, "skip_offsets": True}, + offline_write_options={ + "start_offline_materialization": True, + "skip_offsets": True, + }, ) # Assert diff --git a/requirements-docs.txt b/requirements-docs.txt index d1499a2625..0b7284da18 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -9,3 +9,4 @@ markdown==3.6 pymdown-extensions==10.7.1 mkdocs-macros-plugin==1.0.4 mkdocs-minify-plugin>=0.2.0 +polars==0.20.31 \ No newline at end of file diff --git a/utils/java/pom.xml b/utils/java/pom.xml index d45916ee87..6366083f53 100644 --- a/utils/java/pom.xml +++ b/utils/java/pom.xml @@ -5,7 +5,7 @@ com.logicalclocks hsfs-utils - 3.8.0-RC0 + 3.8.0-RC1 3.2.0.0-SNAPSHOT