Skip to content

Commit

Permalink
feedback fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed Dec 20, 2024
1 parent 55c37eb commit 03847e3
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 85 deletions.
12 changes: 7 additions & 5 deletions python/hsfs/core/kafka_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,20 @@ def _init_kafka_resources(

return producer, get_headers(feature_group, num_entries), feature_writers, writer


def get_headers(
feature_group: Union[FeatureGroup, ExternalFeatureGroup],
num_entries: int,
feature_group: Union[FeatureGroup, ExternalFeatureGroup],
num_entries: int,
) -> Dict[str, bytes]:
# setup online ingestion
online_ingestion_instance = online_ingestion.OnlineIngestion(
num_entries=num_entries
)

online_ingestion_instance = online_ingestion_api.OnlineIngestionApi().create_online_ingestion(
feature_group,
online_ingestion_instance
online_ingestion_instance = (
online_ingestion_api.OnlineIngestionApi().create_online_ingestion(
feature_group, online_ingestion_instance
)
)

# custom headers for hopsworks onlineFS
Expand Down
91 changes: 57 additions & 34 deletions python/hsfs/core/online_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@
import time
import warnings
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from typing import (
Any,
Dict,
List,
Optional,
Union,
)

import humps
from hopsworks_common import util
from hsfs import feature_group as fg_mod
from hsfs.core import online_ingestion_batch_result
from tqdm.auto import tqdm


Expand All @@ -35,77 +43,84 @@ def __init__(
self,
id: Optional[int] = None,
num_entries: int = None,
current_offsets: int = None,
processed_entries: int = None,
inserted_entries: int = None,
aborted_entries: int = None,
batch_results = None,
current_offsets: Optional[str] = None,
processed_entries: Optional[int] = None,
inserted_entries: Optional[int] = None,
aborted_entries: Optional[int] = None,
batch_results: Union[
List[online_ingestion_batch_result.OnlineIngestionBatchResult],
List[Dict[str, Any]],
] = [],

Check failure on line 53 in python/hsfs/core/online_ingestion.py

View workflow job for this annotation

GitHub Actions / Lint and Stylecheck

Ruff (B006)

python/hsfs/core/online_ingestion.py:53:13: B006 Do not use mutable data structures for argument defaults
feature_group: fg_mod.FeatureGroup = None,
**kwargs,
):
self._id = id
self._num_entries = num_entries # specified when inserting
self._num_entries = num_entries # specified when inserting
self._current_offsets = current_offsets
self._processed_entries = processed_entries
self._inserted_entries = inserted_entries
self._aborted_entries = aborted_entries
self._batch_results = batch_results # batch inserts performed by onlinefs
self._batch_results = [
(
online_ingestion_batch_result.OnlineIngestionBatchResult.from_response_json(
batch_result
)
if isinstance(batch_result, dict)
else batch_result
)
for batch_result in batch_results
] # batch inserts performed by onlinefs
self._feature_group = feature_group

@classmethod
def from_response_json(cls, json_dict: Dict[str, Any]) -> "OnlineIngestion":
def from_response_json(
cls, json_dict: Dict[str, Any], feature_group: fg_mod.FeatureGroup = None
) -> OnlineIngestion:
if json_dict is None:
return None

json_decamelized: dict = humps.decamelize(json_dict)

if "count" not in json_decamelized:
return cls(**json_decamelized)
return cls(**json_decamelized, feature_group=feature_group)
elif json_decamelized["count"] == 1:
return cls(**json_decamelized["items"][0])
return cls(**json_decamelized["items"][0], feature_group=feature_group)
elif json_decamelized["count"] > 1:
return [cls(**item) for item in json_decamelized["items"]]
return [
cls(**item, feature_group=feature_group)
for item in json_decamelized["items"]
]
else:
return None

def refresh(self):
from hsfs.core.online_ingestion_api import OnlineIngestionApi

online_ingestion = OnlineIngestionApi().get_online_ingestion(
self.feature_group,
query_params={"filter_by": f"ID:{self.id}"}
self.feature_group, query_params={"filter_by": f"ID:{self.id}"}
)
self.__dict__.update(online_ingestion.__dict__)

def to_dict(self):
return {
"id": self._id,
"numEntries": self._num_entries
}
return {"id": self._id, "numEntries": self._num_entries}

def json(self):
return json.dumps(self, cls=util.Encoder)

@property
def id(self) -> int:
def id(self) -> Optional[int]:
return self._id

@property
def num_entries(self) -> int:
return self._num_entries

@num_entries.setter
def num_entries(self, num_entries: str) -> None:
def num_entries(self, num_entries: int) -> None:
self._num_entries = num_entries

@property
def feature_group(self):
return self._feature_group

@feature_group.setter
def feature_group(self, feature_group) -> None:
self._feature_group = feature_group

@property
def current_offsets(self) -> str:
def current_offsets(self) -> Optional[str]:
return self._current_offsets

@property
Expand All @@ -121,9 +136,15 @@ def aborted_entries(self) -> int:
return 0 if self._aborted_entries is None else self._aborted_entries

@property
def batch_results(self):
def batch_results(
self,
) -> List[online_ingestion_batch_result.OnlineIngestionBatchResult]:
return self._batch_results

@property
def feature_group(self) -> fg_mod.FeatureGroup:
return self._feature_group

def wait_for_completion(self, options: Dict[str, Any] = None):
if options is None:
options = {}
Expand All @@ -132,10 +153,12 @@ def wait_for_completion(self, options: Dict[str, Any] = None):
timeout_delta = timedelta(seconds=options.get("timeout", 60))
timeout_time = datetime.now() + timeout_delta

with tqdm(total=self.num_entries,
bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt}",
desc="Online data ingestion progress",
mininterval=1) as progress_bar:
with tqdm(
total=self.num_entries,
bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt}",
desc="Online data ingestion progress",
mininterval=1,
) as progress_bar:
while True:
if self.aborted_entries:
progress_bar.colour = "RED"
Expand Down
28 changes: 16 additions & 12 deletions python/hsfs/core/online_ingestion_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
#
from __future__ import annotations

from typing import Dict, Optional

from hopsworks_common import client
from hsfs import feature_group as fg_mod
from hsfs.core import online_ingestion


class OnlineIngestionApi:

def create_online_ingestion(
self,
feature_group_instance: fg_mod.FeatureGroup,
online_ingestion_instance: online_ingestion.OnlineIngestion,
):
) -> online_ingestion.OnlineIngestion:
_client = client.get_instance()
path_params = [
"project",
Expand All @@ -39,17 +40,21 @@ def create_online_ingestion(
]

headers = {"content-type": "application/json"}
online_ingestion_instance = online_ingestion.OnlineIngestion.from_response_json(
_client._send_request("POST", path_params, headers=headers, data=online_ingestion_instance.json())
return online_ingestion.OnlineIngestion.from_response_json(
_client._send_request(
"POST",
path_params,
headers=headers,
data=online_ingestion_instance.json(),
),
feature_group=feature_group_instance,
)
online_ingestion_instance.feature_group = feature_group_instance
return online_ingestion_instance

def get_online_ingestion(
self,
feature_group_instance: fg_mod.FeatureGroup,
query_params: None,
):
query_params: Optional[Dict[str, str]] = None,
) -> online_ingestion.OnlineIngestion:
_client = client.get_instance()
path_params = [
"project",
Expand All @@ -61,8 +66,7 @@ def get_online_ingestion(
"online_ingestion",
]

online_ingestion_instance = online_ingestion.OnlineIngestion.from_response_json(
_client._send_request("GET", path_params, query_params)
return online_ingestion.OnlineIngestion.from_response_json(
_client._send_request("GET", path_params, query_params),
feature_group=feature_group_instance,
)
online_ingestion_instance.feature_group = feature_group_instance
return online_ingestion_instance
86 changes: 86 additions & 0 deletions python/hsfs/core/online_ingestion_batch_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#
# Copyright 2024 Hopsworks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations

import json
from typing import Any, Dict, Optional

import humps
from hopsworks_common import util


class OnlineIngestionBatchResult:
"""
Metadata object used to provide Online Ingestion Batch Result information for a feature group.
"""

def __init__(
self,
id: Optional[int] = None,
batch_size: int = None,
status: str = None,
info: Optional[str] = None,
**kwargs,
):
self._id = id
self._batch_size = batch_size
self._status = status
self._info = info

@classmethod
def from_response_json(
cls, json_dict: Dict[str, Any]
) -> OnlineIngestionBatchResult:
if json_dict is None:
return None

json_decamelized: dict = humps.decamelize(json_dict)

if "count" not in json_decamelized:
return cls(**json_decamelized)
elif json_decamelized["count"] == 1:
return cls(**json_decamelized["items"][0])
elif json_decamelized["count"] > 1:
return [cls(**item) for item in json_decamelized["items"]]
else:
return None

def to_dict(self):
return {
"id": self._id,
"batchSize": self._batch_size,
"status": self._status,
"info": self._info,
}

def json(self):
return json.dumps(self, cls=util.Encoder)

@property
def id(self) -> Optional[int]:
return self._id

@property
def batch_size(self) -> int:
return self._batch_size

@property
def status(self) -> str:
return self._status

@property
def info(self) -> Optional[str]:
return self._info
8 changes: 6 additions & 2 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -1560,8 +1560,12 @@ def _write_dataframe_kafka(
)

# wait for online ingestion
if feature_group.online_enabled and offline_write_options.get("wait_for_online_ingestion", False):
feature_group.get_latest_online_ingestion().wait_for_completion()
if feature_group.online_enabled and offline_write_options.get(
"wait_for_online_ingestion", False
):
feature_group.get_latest_online_ingestion().wait_for_completion(
options=offline_write_options.get("online_ingestion_options", {})
)

return feature_group.materialization_job

Expand Down
Loading

0 comments on commit 03847e3

Please sign in to comment.