Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1522] [APPEND] Fix hsml deduplication #439

Merged
merged 6 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions python/hopsworks_common/core/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,28 +100,26 @@ def download(
# Build the path to download the file on the local fs and return to the user, it should be absolute for consistency
# Download in CWD if local_path not specified
if local_path is None:
local_path = os.path.join(os.getcwd(), os.path.basename(path))
local_path = os.getcwd()
# If local_path specified, ensure it is absolute
else:
if os.path.isabs(local_path):
local_path = os.path.join(local_path, os.path.basename(path))
else:
local_path = os.path.join(
os.getcwd(), local_path, os.path.basename(path)
)
elif not os.path.isabs(local_path):
local_path = os.path.join(os.getcwd(), local_path)

if os.path.exists(local_path):
if overwrite:
if os.path.isfile:
os.remove(local_path)
else:
shutil.rmtree(local_path)
else:
raise IOError(
"{} already exists, set overwrite=True to overwrite it".format(
local_path
)
# If local_path is a directory, download into the directory
if os.path.isdir(local_path):
local_path = os.path.join(local_path, os.path.basename(path))

if overwrite:
if os.path.isfile(local_path):
os.remove(local_path)
elif os.path.isdir(local_path):
shutil.rmtree(local_path)
elif os.path.exists(local_path):
raise IOError(
"{} already exists, set overwrite=True to overwrite it".format(
local_path
)
)

file_size = int(self._get(path)["attributes"]["size"])
with _client._send_request(
Expand Down
5 changes: 0 additions & 5 deletions python/hopsworks_common/kafka_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ def _validate_topic_config(cls, name, num_replicas, num_partitions):
)
)
num_partitions = KAFKA_TOPIC.NUM_PARTITIONS
else:
if num_replicas is not None or num_partitions is not None:
raise ValueError(
"Number of replicas or partitions cannot be changed in existing kafka topics."
)
elif name is None or name == KAFKA_TOPIC.NONE:
num_replicas = None
num_partitions = None
Expand Down
10 changes: 2 additions & 8 deletions python/hopsworks_common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import time
from datetime import date, datetime, timezone
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Expand All @@ -47,7 +46,7 @@
from six import string_types


if TYPE_CHECKING:
if HAS_PANDAS:
import pandas as pd


Expand All @@ -74,9 +73,6 @@ def convert(self, obj):

import numpy as np

if HAS_PANDAS:
import pandas as pd

def encode_binary(x):
return base64.encodebytes(x).decode("ascii")

Expand All @@ -88,7 +84,7 @@ def encode_binary(x):
else:
return obj.tolist(), True

if isinstance(obj, datetime.date) or (
if isinstance(obj, datetime) or (
HAS_PANDAS and isinstance(obj, pd.Timestamp)
):
return obj.isoformat(), True
Expand Down Expand Up @@ -520,8 +516,6 @@ def _handle_tensor_input(input_tensor):


def _handle_dataframe_input(input_ex):
if HAS_PANDAS:
import pandas as pd
if HAS_PANDAS and isinstance(input_ex, pd.DataFrame):
if not input_ex.empty:
return input_ex.iloc[0].tolist()
Expand Down
18 changes: 0 additions & 18 deletions python/tests/test_kafka_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import copy

import humps
import pytest
from hopsworks_common import kafka_topic
from hopsworks_common.constants import KAFKA_TOPIC

Expand Down Expand Up @@ -160,23 +159,6 @@ def test_validate_topic_config_existing_with_name_only(self, backend_fixtures):
assert num_repl is None
assert num_part is None

def test_validate_topic_config_existing_with_name_and_config(
self, backend_fixtures
):
# Arrange
json = backend_fixtures["kafka_topic"]["get_existing_with_name_and_config"][
"response"
]["kafka_topic_dto"]

# Act
with pytest.raises(ValueError) as e_info:
num_repl, num_part = kafka_topic.KafkaTopic._validate_topic_config(
json["name"], json["num_replicas"], json["num_partitions"]
)

# Assert
assert "Number of replicas or partitions cannot be changed" in str(e_info.value)

def test_validate_topic_config_none(self, backend_fixtures):
# Arrange
json = backend_fixtures["kafka_topic"]["get_none"]["response"][
Expand Down
Loading