Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1533] Importing hopsworks fails due to core dump while importing polars #1379

Merged
merged 8 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>3.8.0-RC0</version>
<version>3.8.0-RC1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion java/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>3.8.0-RC0</version>
<version>3.8.0-RC1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion java/hsfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>3.8.0-RC0</version>
<version>3.8.0-RC1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.logicalclocks</groupId>
<artifactId>hsfs-parent</artifactId>
<packaging>pom</packaging>
<version>3.8.0-RC0</version>
<version>3.8.0-RC1</version>
<modules>
<module>hsfs</module>
<module>spark</module>
Expand Down
2 changes: 1 addition & 1 deletion java/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>3.8.0-RC0</version>
<version>3.8.0-RC1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
8 changes: 7 additions & 1 deletion python/hsfs/core/arrow_flight_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@
from functools import wraps
from typing import Any, Dict, Optional, Union

import polars as pl
import pyarrow
import pyarrow._flight
import pyarrow.flight
from hsfs import client, feature_group, util
from hsfs.client.exceptions import FeatureStoreException
from hsfs.constructor import query
from hsfs.core.constants import HAS_POLARS, polars_not_installed_message
from hsfs.core.variable_api import VariableApi
from hsfs.storage_connector import StorageConnector
from pyarrow.flight import FlightServerError
from retrying import retry


if HAS_POLARS:
import polars as pl


_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -399,6 +403,8 @@ def _get_dataset(self, descriptor, timeout=None, dataframe_type="pandas"):
reader = self._connection.do_get(info.endpoints[0].ticket, options)
_logger.debug("Dataset fetched. Converting to dataframe %s.", dataframe_type)
if dataframe_type.lower() == "polars":
if not HAS_POLARS:
raise ModuleNotFoundError(polars_not_installed_message)
return pl.from_arrow(reader.read_all())
else:
return reader.read_pandas()
Expand Down
28 changes: 28 additions & 0 deletions python/hsfs/core/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Copyright 2024 Hopsworks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import importlib.util


polars_not_installed_message = (
"Polars package not found. "
"If you want to use Polars with Hopsworks you can install the corresponding extras "
"""`pip install hopsworks[polars]` or `pip install "hopsworks[polars]"` if using zsh. """
"You can also install polars directly in your environment e.g `pip install polars`. "
"You will need to restart your kernel if applicable."
)

HAS_POLARS: bool = importlib.util.find_spec("polars") is not None
20 changes: 18 additions & 2 deletions python/hsfs/core/vector_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@
from base64 import b64decode
from datetime import datetime, timezone
from io import BytesIO
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union
from typing import (
Any,
Callable,
Dict,
List,
Literal,
Optional,
Set,
Tuple,
Union,
)

import avro.io
import avro.schema
import numpy as np
import pandas as pd
import polars as pl
from hsfs import (
client,
feature_view,
Expand All @@ -49,6 +58,7 @@
from hsfs.core import (
transformation_function_engine as tf_engine_mod,
)
from hsfs.core.constants import HAS_POLARS, polars_not_installed_message


HAS_FASTAVRO = False
Expand All @@ -59,6 +69,9 @@
except ImportError:
from avro.io import BinaryDecoder

if HAS_POLARS:
import polars as pl

_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -487,6 +500,9 @@ def handle_feature_vector_return_type(
return pandas_df
elif return_type.lower() == "polars":
_logger.debug("Returning feature vector as polars dataframe")
if not HAS_POLARS:
raise ModuleNotFoundError(polars_not_installed_message)

return pl.DataFrame(
feature_vectorz if batch else [feature_vectorz],
schema=self._feature_vector_col_name if not inference_helper else None,
Expand Down
Loading
Loading