From f936bc69f53bd7ed9fac387f0a7ff3f8e83b9876 Mon Sep 17 00:00:00 2001 From: bubriks Date: Tue, 3 Dec 2024 17:32:47 +0200 Subject: [PATCH 01/25] temp --- python/hsfs/core/online_ingestion.py | 142 +++++++++++++++++++++++ python/hsfs/core/online_ingestion_api.py | 68 +++++++++++ python/hsfs/engine/python.py | 13 +++ python/hsfs/feature_group.py | 5 + 4 files changed, 228 insertions(+) create mode 100644 python/hsfs/core/online_ingestion.py create mode 100644 python/hsfs/core/online_ingestion_api.py diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py new file mode 100644 index 000000000..a9960b644 --- /dev/null +++ b/python/hsfs/core/online_ingestion.py @@ -0,0 +1,142 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +import json +import time +from typing import Any, Dict, Optional + +import humps +from hopsworks_common import util +from tqdm.auto import tqdm + + +class OnlineIngestion: + """ + Metadata object used to provide Online Ingestion information for a feature group. + """ + + def __init__( + self, + id: Optional[int] = None, + num_entries: int = None, + current_offsets: int = None, + processed_entries: int = None, + inserted_entries: int = None, + aborted_entries: int = None, + batch_results = None, + **kwargs, + ): + self._id = id + self._num_entries = num_entries # specified when inserting + self._current_offsets = current_offsets + self._processed_entries = processed_entries + self._inserted_entries = inserted_entries + self._aborted_entries = aborted_entries + self._batch_results = batch_results # batch inserts performed by onlinefs + + @classmethod + def from_response_json(cls, json_dict: Dict[str, Any]) -> "OnlineIngestion": + if json_dict is None: + return None + + json_decamelized: dict = humps.decamelize(json_dict) + + if "count" not in json_decamelized: + return cls(**json_decamelized) + elif json_decamelized["count"] == 1: + return cls(**json_decamelized["items"][0]) + elif json_decamelized["count"] > 1: + return [cls(**item) for item in json_decamelized["items"]] + else: + return None + + def refresh(self): + from hsfs.core.online_ingestion_api import OnlineIngestionApi + + online_ingestion = OnlineIngestionApi().get_online_ingestion( + self.feature_group, + query_params={"filter_by": f"ID:{self.id}"} + ) + self.__dict__.update(online_ingestion.__dict__) + + def to_dict(self): + return { + "id": self._id, + "numEntries": self._num_entries + } + + def json(self): + return json.dumps(self, cls=util.Encoder) + + @property + def id(self) -> int: + return self._id + + @property + def num_entries(self) -> int: + return self._num_entries + + @num_entries.setter + def num_entries(self, num_entries: str) -> None: + self._num_entries = num_entries + + @property + def feature_group(self): + return self._feature_group + + @feature_group.setter + def feature_group(self, feature_group) -> None: + self._feature_group = feature_group + + @property + def current_offsets(self) -> str: + return self._current_offsets + + @property + def processed_entries(self) -> int: + return 0 if self._processed_entries is None else self._processed_entries + + @property + def inserted_entries(self) -> int: + return 0 if self._inserted_entries is None else self._inserted_entries + + @property + def aborted_entries(self) -> int: + return 0 if self._aborted_entries is None else self._aborted_entries + + @property + def batch_results(self): + return self._batch_results + + def wait_for_completion(self): + with tqdm(total=self.num_entries, + bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt}", + desc="Data processing progress", + mininterval=1) as progress_bar: + while True: + if self.aborted_entries: + progress_bar.colour = "RED" + + progress_bar.n = self.processed_entries + progress_bar.refresh() + + if self.processed_entries >= self.num_entries: + break + + time.sleep(1) + + self.refresh() \ No newline at end of file diff --git a/python/hsfs/core/online_ingestion_api.py b/python/hsfs/core/online_ingestion_api.py new file mode 100644 index 000000000..a8bb8f5bd --- /dev/null +++ b/python/hsfs/core/online_ingestion_api.py @@ -0,0 +1,68 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +from hopsworks_common import client +from hsfs import feature_group as fg_mod +from hsfs.core import online_ingestion + + +class OnlineIngestionApi: + + def create_online_ingestion( + self, + feature_group_instance: fg_mod.FeatureGroup, + online_ingestion_instance: online_ingestion.OnlineIngestion, + ): + _client = client.get_instance() + path_params = [ + "project", + _client._project_id, + "featurestores", + feature_group_instance.feature_store_id, + "featuregroups", + feature_group_instance.id, + "online_ingestion", + ] + + headers = {"content-type": "application/json"} + online_ingestion_instance = online_ingestion.OnlineIngestion.from_response_json( + _client._send_request("POST", path_params, headers=headers, data=online_ingestion_instance.json()) + ) + online_ingestion_instance.feature_group = feature_group_instance + return online_ingestion_instance + + def get_online_ingestion( + self, + feature_group_instance: fg_mod.FeatureGroup, + query_params: None, + ): + _client = client.get_instance() + path_params = [ + "project", + _client._project_id, + "featurestores", + feature_group_instance.feature_store_id, + "featuregroups", + feature_group_instance.id, + "online_ingestion", + ] + + online_ingestion_instance = online_ingestion.OnlineIngestion.from_response_json( + _client._send_request("GET", path_params, query_params) + ) + online_ingestion_instance.feature_group = feature_group_instance + return online_ingestion_instance \ No newline at end of file diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index a6de77364..32639358b 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -73,6 +73,8 @@ job, job_api, kafka_engine, + online_ingestion, + online_ingestion_api, statistics_api, storage_connector_api, training_dataset_api, @@ -1448,6 +1450,17 @@ def _write_dataframe_kafka( offline_write_options, project_id=client.get_instance()._project_id, ) + + online_ingestion_instance = online_ingestion.OnlineIngestion( + num_entries=len(dataframe) + ) + + online_ingestion_instance = online_ingestion_api.OnlineIngestionApi().create_online_ingestion( + feature_group, + online_ingestion_instance + ) + + headers["onlineIngestionId"] = str(online_ingestion_instance.id).encode("utf8") if not feature_group._multi_part_insert: # set initial_check_point to the current offset initial_check_point = kafka_engine.kafka_get_offsets( diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index e2c42f1a3..b6afc1ee5 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -65,6 +65,8 @@ feature_store_api, great_expectation_engine, job_api, + online_ingestion, + online_ingestion_api, spine_group_engine, statistics_engine, validation_report_engine, @@ -1786,6 +1788,9 @@ def statistics_config( type(statistics_config) ) ) + + def get_latest_online_ingestion(self) -> online_ingestion.OnlineIngestion: + return online_ingestion_api.OnlineIngestionApi().get_online_ingestion(self, query_params={"filter_by": "LATEST"}) @property def feature_store_id(self) -> Optional[int]: From 72fbbf9a632761ccaeb7c9b65bbaf7b77dfa8e30 Mon Sep 17 00:00:00 2001 From: bubriks Date: Tue, 10 Dec 2024 15:52:44 +0200 Subject: [PATCH 02/25] standardize headers --- python/hsfs/core/kafka_engine.py | 30 ++++++++--- python/hsfs/core/online_ingestion.py | 6 +-- python/hsfs/core/online_ingestion_api.py | 2 +- python/hsfs/engine/python.py | 14 +---- python/hsfs/engine/spark.py | 65 ++++++++++-------------- python/hsfs/feature_group.py | 2 +- python/tests/client/test_base_client.py | 1 - 7 files changed, 55 insertions(+), 65 deletions(-) diff --git a/python/hsfs/core/kafka_engine.py b/python/hsfs/core/kafka_engine.py index ee9e892be..df6cd1fda 100644 --- a/python/hsfs/core/kafka_engine.py +++ b/python/hsfs/core/kafka_engine.py @@ -23,7 +23,7 @@ import pandas as pd from hopsworks_common import client from hopsworks_common.core.constants import HAS_NUMPY -from hsfs.core import storage_connector_api +from hsfs.core import online_ingestion, online_ingestion_api, storage_connector_api from hsfs.core.constants import HAS_AVRO, HAS_CONFLUENT_KAFKA, HAS_FAST_AVRO from tqdm import tqdm @@ -61,7 +61,7 @@ def init_kafka_consumer( def init_kafka_resources( feature_group: Union[FeatureGroup, ExternalFeatureGroup], offline_write_options: Dict[str, Any], - project_id: int, + num_entries: int, ) -> Tuple[ Producer, Dict[str, bytes], Dict[str, Callable[..., bytes]], Callable[..., bytes] : ]: @@ -74,7 +74,7 @@ def init_kafka_resources( feature_group._writer, ) producer, headers, feature_writers, writer = _init_kafka_resources( - feature_group, offline_write_options, project_id + feature_group, offline_write_options, num_entries ) if feature_group._multi_part_insert: feature_group._kafka_producer = producer @@ -87,7 +87,7 @@ def init_kafka_resources( def _init_kafka_resources( feature_group: Union[FeatureGroup, ExternalFeatureGroup], offline_write_options: Dict[str, Any], - project_id: int, + num_entries: int, ) -> Tuple[ Producer, Dict[str, bytes], Dict[str, Callable[..., bytes]], Callable[..., bytes] : ]: @@ -103,13 +103,29 @@ def _init_kafka_resources( # setup row writer function writer = get_encoder_func(feature_group._get_encoded_avro_schema()) + return producer, get_headers(feature_group, num_entries), feature_writers, writer + +def get_headers( + feature_group: Union[FeatureGroup, ExternalFeatureGroup], + num_entries: int, +) -> Dict[str, bytes]: + # setup online ingestion + online_ingestion_instance = online_ingestion.OnlineIngestion( + num_entries=num_entries + ) + + online_ingestion_instance = online_ingestion_api.OnlineIngestionApi().create_online_ingestion( + feature_group, + online_ingestion_instance + ) + # custom headers for hopsworks onlineFS - headers = { - "projectId": str(project_id).encode("utf8"), + return { + "projectId": str(feature_group.feature_store.project_id).encode("utf8"), "featureGroupId": str(feature_group._id).encode("utf8"), "subjectId": str(feature_group.subject["id"]).encode("utf8"), + "onlineIngestionId": str(online_ingestion_instance.id).encode("utf8"), } - return producer, headers, feature_writers, writer def init_kafka_producer( diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index a9960b644..c1eae20bd 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -109,11 +109,11 @@ def current_offsets(self) -> str: @property def processed_entries(self) -> int: return 0 if self._processed_entries is None else self._processed_entries - + @property def inserted_entries(self) -> int: return 0 if self._inserted_entries is None else self._inserted_entries - + @property def aborted_entries(self) -> int: return 0 if self._aborted_entries is None else self._aborted_entries @@ -139,4 +139,4 @@ def wait_for_completion(self): time.sleep(1) - self.refresh() \ No newline at end of file + self.refresh() diff --git a/python/hsfs/core/online_ingestion_api.py b/python/hsfs/core/online_ingestion_api.py index a8bb8f5bd..f8cd541f6 100644 --- a/python/hsfs/core/online_ingestion_api.py +++ b/python/hsfs/core/online_ingestion_api.py @@ -65,4 +65,4 @@ def get_online_ingestion( _client._send_request("GET", path_params, query_params) ) online_ingestion_instance.feature_group = feature_group_instance - return online_ingestion_instance \ No newline at end of file + return online_ingestion_instance diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 32639358b..fe3432ff8 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -73,8 +73,6 @@ job, job_api, kafka_engine, - online_ingestion, - online_ingestion_api, statistics_api, storage_connector_api, training_dataset_api, @@ -1448,19 +1446,9 @@ def _write_dataframe_kafka( producer, headers, feature_writers, writer = kafka_engine.init_kafka_resources( feature_group, offline_write_options, - project_id=client.get_instance()._project_id, + num_entries=len(dataframe), ) - online_ingestion_instance = online_ingestion.OnlineIngestion( - num_entries=len(dataframe) - ) - - online_ingestion_instance = online_ingestion_api.OnlineIngestionApi().create_online_ingestion( - feature_group, - online_ingestion_instance - ) - - headers["onlineIngestionId"] = str(online_ingestion_instance.id).encode("utf8") if not feature_group._multi_part_insert: # set initial_check_point to the current offset initial_check_point = kafka_engine.kafka_get_offsets( diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 67e15468b..cafa77451 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -489,34 +489,17 @@ def save_stream_dataframe( ) serialized_df = self._serialize_to_avro(feature_group, dataframe) - project_id = str(feature_group.feature_store.project_id) - feature_group_id = str(feature_group._id) - subject_id = str(feature_group.subject["id"]).encode("utf8") - if query_name is None: query_name = ( - f"insert_stream_{project_id}_{feature_group_id}" + f"insert_stream_{feature_group.feature_store.project_id}_{feature_group._id}" f"_{feature_group.name}_{feature_group.version}_onlinefs" ) query = ( - serialized_df.withColumn( - "headers", - array( - struct( - lit("projectId").alias("key"), - lit(project_id.encode("utf8")).alias("value"), - ), - struct( - lit("featureGroupId").alias("key"), - lit(feature_group_id.encode("utf8")).alias("value"), - ), - struct( - lit("subjectId").alias("key"), lit(subject_id).alias("value") - ), - ), - ) - .writeStream.outputMode(output_mode) + serialized_df + .withColumn("headers", self._get_headers(feature_group, dataframe)) + .writeStream + .outputMode(output_mode) .format(self.KAFKA_FORMAT) .option( "checkpointLocation", @@ -582,23 +565,27 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): serialized_df = self._serialize_to_avro(feature_group, dataframe) - project_id = str(feature_group.feature_store.project_id).encode("utf8") - feature_group_id = str(feature_group._id).encode("utf8") - subject_id = str(feature_group.subject["id"]).encode("utf8") - - serialized_df.withColumn( - "headers", - array( - struct(lit("projectId").alias("key"), lit(project_id).alias("value")), - struct( - lit("featureGroupId").alias("key"), - lit(feature_group_id).alias("value"), - ), - struct(lit("subjectId").alias("key"), lit(subject_id).alias("value")), - ), - ).write.format(self.KAFKA_FORMAT).options(**write_options).option( - "topic", feature_group._online_topic_name - ).save() + ( + serialized_df + .withColumn("headers", self._get_headers(feature_group, dataframe)) + .write + .format(self.KAFKA_FORMAT) + .options(**write_options) + .option("topic", feature_group._online_topic_name) + .save() + ) + + def _get_headers( + self, + feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup], + dataframe: Union[RDD, DataFrame], + ): + return array( + *[ + struct(lit(key).alias("key"), lit(value).alias("value")) + for key, value in kafka_engine.get_headers(feature_group, dataframe.count()).items() + ] + ) def _serialize_to_avro( self, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index b6afc1ee5..09cfe9510 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -1788,7 +1788,7 @@ def statistics_config( type(statistics_config) ) ) - + def get_latest_online_ingestion(self) -> online_ingestion.OnlineIngestion: return online_ingestion_api.OnlineIngestionApi().get_online_ingestion(self, query_params={"filter_by": "LATEST"}) diff --git a/python/tests/client/test_base_client.py b/python/tests/client/test_base_client.py index 2a4e5a3e1..6ea90a7e8 100644 --- a/python/tests/client/test_base_client.py +++ b/python/tests/client/test_base_client.py @@ -20,7 +20,6 @@ import requests from hsfs.client.base import Client from hsfs.client.exceptions import RestAPIError - from tests.util import changes_environ From 1a5c1feeeee2b96e732f9f0603e46cbd7022a16b Mon Sep 17 00:00:00 2001 From: bubriks Date: Tue, 10 Dec 2024 15:54:25 +0200 Subject: [PATCH 03/25] lint --- python/tests/client/test_base_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/tests/client/test_base_client.py b/python/tests/client/test_base_client.py index 6ea90a7e8..2a4e5a3e1 100644 --- a/python/tests/client/test_base_client.py +++ b/python/tests/client/test_base_client.py @@ -20,6 +20,7 @@ import requests from hsfs.client.base import Client from hsfs.client.exceptions import RestAPIError + from tests.util import changes_environ From 946b80c115342afe4ef9b6658938f5041192c61e Mon Sep 17 00:00:00 2001 From: bubriks Date: Tue, 10 Dec 2024 16:15:50 +0200 Subject: [PATCH 04/25] some test fix --- python/tests/engine/test_python.py | 26 ++++++++++++++++++++++- python/tests/engine/test_python_writer.py | 7 ++++++ python/tests/engine/test_spark.py | 20 +++++++++++++---- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index ea83f618f..07ed7db3b 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -33,7 +33,7 @@ from hsfs.client import exceptions from hsfs.constructor import query from hsfs.constructor.hudi_feature_group_alias import HudiFeatureGroupAlias -from hsfs.core import inode, job +from hsfs.core import inode, job, online_ingestion from hsfs.core.constants import HAS_GREAT_EXPECTATIONS from hsfs.engine import python from hsfs.expectation_suite import ExpectationSuite @@ -3520,6 +3520,10 @@ def test_materialization_kafka(self, mocker): mocker.patch("hopsworks_common.client.get_instance") mocker.patch("hopsworks_common.client.get_instance") + mocker.patch( + "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", + return_value= online_ingestion.OnlineIngestion(id=123), + ) python_engine = python.Engine() @@ -3533,6 +3537,8 @@ def test_materialization_kafka(self, mocker): stream=False, time_travel_format="HUDI", ) + fg.feature_store = mocker.Mock() + fg.feature_store.project_id = 234 mocker.patch.object(fg, "commit_details", return_value={"commit1": 1}) @@ -3578,6 +3584,10 @@ def test_materialization_kafka_first_job_execution(self, mocker): mocker.patch("hopsworks_common.client.get_instance") mocker.patch("hopsworks_common.client.get_instance") + mocker.patch( + "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", + return_value= online_ingestion.OnlineIngestion(id=123), + ) python_engine = python.Engine() @@ -3591,6 +3601,8 @@ def test_materialization_kafka_first_job_execution(self, mocker): stream=False, time_travel_format="HUDI", ) + fg.feature_store = mocker.Mock() + fg.feature_store.project_id = 234 mocker.patch.object(fg, "commit_details", return_value={"commit1": 1}) @@ -3632,6 +3644,10 @@ def test_materialization_kafka_skip_offsets(self, mocker): mocker.patch("hopsworks_common.client.get_instance") mocker.patch("hopsworks_common.client.get_instance") + mocker.patch( + "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", + return_value= online_ingestion.OnlineIngestion(id=123), + ) python_engine = python.Engine() @@ -3645,6 +3661,8 @@ def test_materialization_kafka_skip_offsets(self, mocker): stream=False, time_travel_format="HUDI", ) + fg.feature_store = mocker.Mock() + fg.feature_store.project_id = 234 mocker.patch.object(fg, "commit_details", return_value={"commit1": 1}) @@ -3689,6 +3707,10 @@ def test_materialization_kafka_topic_doesnt_exist(self, mocker): mocker.patch("hopsworks_common.client.get_instance") mocker.patch("hopsworks_common.client.get_instance") + mocker.patch( + "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", + return_value= online_ingestion.OnlineIngestion(id=123), + ) python_engine = python.Engine() @@ -3702,6 +3724,8 @@ def test_materialization_kafka_topic_doesnt_exist(self, mocker): stream=False, time_travel_format="HUDI", ) + fg.feature_store = mocker.Mock() + fg.feature_store.project_id = 234 mocker.patch.object(fg, "commit_details", return_value={"commit1": 1}) diff --git a/python/tests/engine/test_python_writer.py b/python/tests/engine/test_python_writer.py index 5157e2b92..24a2b3b92 100644 --- a/python/tests/engine/test_python_writer.py +++ b/python/tests/engine/test_python_writer.py @@ -21,6 +21,7 @@ import fastavro from confluent_kafka.admin import TopicMetadata from hsfs import feature_group +from hsfs.core import online_ingestion from hsfs.engine import python @@ -33,6 +34,10 @@ def test_write_dataframe_kafka(self, mocker, dataframe_fixture_times): avro_schema_mock = mocker.patch( "hsfs.feature_group.FeatureGroup._get_encoded_avro_schema" ) + mocker.patch( + "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", + return_value= online_ingestion.OnlineIngestion(id=123), + ) avro_schema = ( '{"type":"record","name":"test_fg","namespace":"test_featurestore.db","fields":' '[{"name":"primary_key","type":["null","long"]},{"name":"event_date","type":' @@ -69,6 +74,8 @@ def test_write_dataframe_kafka(self, mocker, dataframe_fixture_times): id=10, stream=False, ) + fg.feature_store = mocker.Mock() + fg.feature_store.project_id = 234 mocker.patch.object(fg, "commit_details", return_value={"commit1": 1}) fg._online_topic_name = topic_name diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index da3449270..bc4292a9a 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -894,10 +894,13 @@ def test_save_stream_dataframe(self, mocker, backend_fixtures): mock_common_client_get_instance.return_value._project_name = "test_project_name" + df = pd.DataFrame(data={"col_0": [1, 2], "col_1": ["test_1", "test_2"]}) + spark_df = spark_engine._spark_session.createDataFrame(df) + # Act spark_engine.save_stream_dataframe( feature_group=fg, - dataframe=None, + dataframe=spark_df, query_name=None, output_mode="test_mode", await_termination=None, @@ -1012,10 +1015,13 @@ def test_save_stream_dataframe_query_name(self, mocker, backend_fixtures): mock_common_client_get_instance.return_value._project_name = "test_project_name" + df = pd.DataFrame(data={"col_0": [1, 2], "col_1": ["test_1", "test_2"]}) + spark_df = spark_engine._spark_session.createDataFrame(df) + # Act spark_engine.save_stream_dataframe( feature_group=fg, - dataframe=None, + dataframe=spark_df, query_name="test_query_name", output_mode="test_mode", await_termination=None, @@ -1138,10 +1144,13 @@ def test_save_stream_dataframe_checkpoint_dir(self, mocker, backend_fixtures): mock_common_client_get_instance.return_value._project_name = "test_project_name" + df = pd.DataFrame(data={"col_0": [1, 2], "col_1": ["test_1", "test_2"]}) + spark_df = spark_engine._spark_session.createDataFrame(df) + # Act spark_engine.save_stream_dataframe( feature_group=fg, - dataframe=None, + dataframe=spark_df, query_name=None, output_mode="test_mode", await_termination=None, @@ -1258,10 +1267,13 @@ def test_save_stream_dataframe_await_termination(self, mocker, backend_fixtures) mock_common_client_get_instance.return_value._project_name = "test_project_name" + df = pd.DataFrame(data={"col_0": [1, 2], "col_1": ["test_1", "test_2"]}) + spark_df = spark_engine._spark_session.createDataFrame(df) + # Act spark_engine.save_stream_dataframe( feature_group=fg, - dataframe=None, + dataframe=spark_df, query_name=None, output_mode="test_mode", await_termination=True, From 6e47085c2cd9f5ca8ee79476847d85518ac9df2c Mon Sep 17 00:00:00 2001 From: bubriks Date: Tue, 10 Dec 2024 16:27:57 +0200 Subject: [PATCH 05/25] working on tests --- python/tests/engine/test_spark.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index bc4292a9a..4e31132a8 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -34,7 +34,7 @@ ) from hsfs.client import exceptions from hsfs.constructor import hudi_feature_group_alias, query -from hsfs.core import training_dataset_engine +from hsfs.core import online_ingestion, training_dataset_engine from hsfs.core.constants import HAS_GREAT_EXPECTATIONS from hsfs.engine import spark from hsfs.hopsworks_udf import udf @@ -873,6 +873,10 @@ def test_save_stream_dataframe(self, mocker, backend_fixtures): mock_storage_connector_api = mocker.patch( "hsfs.core.storage_connector_api.StorageConnectorApi" ) + mocker.patch( + "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", + return_value= online_ingestion.OnlineIngestion(id=123), + ) json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc @@ -996,6 +1000,10 @@ def test_save_stream_dataframe_query_name(self, mocker, backend_fixtures): mock_storage_connector_api = mocker.patch( "hsfs.core.storage_connector_api.StorageConnectorApi" ) + mocker.patch( + "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", + return_value= online_ingestion.OnlineIngestion(id=123), + ) json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc @@ -1123,6 +1131,10 @@ def test_save_stream_dataframe_checkpoint_dir(self, mocker, backend_fixtures): mock_storage_connector_api = mocker.patch( "hsfs.core.storage_connector_api.StorageConnectorApi" ) + mocker.patch( + "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", + return_value= online_ingestion.OnlineIngestion(id=123), + ) json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc @@ -1246,6 +1258,10 @@ def test_save_stream_dataframe_await_termination(self, mocker, backend_fixtures) mock_storage_connector_api = mocker.patch( "hsfs.core.storage_connector_api.StorageConnectorApi" ) + mocker.patch( + "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", + return_value= online_ingestion.OnlineIngestion(id=123), + ) json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc @@ -1506,6 +1522,10 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): mock_storage_connector_api = mocker.patch( "hsfs.core.storage_connector_api.StorageConnectorApi" ) + mocker.patch( + "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", + return_value= online_ingestion.OnlineIngestion(id=123), + ) json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc @@ -1523,10 +1543,13 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): ) fg.feature_store = mocker.Mock() + df = pd.DataFrame(data={"col_0": [1, 2], "col_1": ["test_1", "test_2"]}) + spark_df = spark_engine._spark_session.createDataFrame(df) + # Act spark_engine._save_online_dataframe( feature_group=fg, - dataframe=None, + dataframe=spark_df, write_options={"test_name": "test_value"}, ) From 2bd22a92c9f18a53212f71fefefe6fddf23311fe Mon Sep 17 00:00:00 2001 From: bubriks Date: Fri, 13 Dec 2024 11:04:22 +0200 Subject: [PATCH 06/25] add unit test for get_headers --- python/tests/core/test_kafka_engine.py | 38 ++++++++++++++++++- python/tests/fixtures/backend_fixtures.py | 1 + .../fixtures/online_ingestion_fixtures.json | 22 +++++++++++ 3 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 python/tests/fixtures/online_ingestion_fixtures.json diff --git a/python/tests/core/test_kafka_engine.py b/python/tests/core/test_kafka_engine.py index 88085689e..51344e2cb 100644 --- a/python/tests/core/test_kafka_engine.py +++ b/python/tests/core/test_kafka_engine.py @@ -15,8 +15,8 @@ # import importlib -from hsfs import storage_connector -from hsfs.core import constants, kafka_engine +from hsfs import storage_connector, feature_group +from hsfs.core import constants, kafka_engine, online_ingestion if constants.HAS_CONFLUENT_KAFKA: @@ -523,3 +523,37 @@ def test_spark_get_kafka_config_internal_kafka(self, mocker, backend_fixtures): mock_storage_connector_api.return_value.get_kafka_connector.call_args[0][1] is False ) + + def test_get_headers(self, mocker, backend_fixtures): + # Arrange + mocker.patch("hopsworks_common.client.get_instance") + mock_online_ingestion_api = mocker.patch( + "hsfs.core.online_ingestion_api.OnlineIngestionApi" + ) + json = backend_fixtures["online_ingestion"]["get"]["response"] + oi = online_ingestion.OnlineIngestion.from_response_json(json) + mock_online_ingestion_api.return_value.create_online_ingestion.return_value = oi + + fg = feature_group.FeatureGroup( + id=111, + name="test", + version=1, + featurestore_id=99, + ) + fg.feature_store = mocker.Mock() + fg.feature_store.project_id = 234 + + fg._subject = {"id": 823} + + # Act + results = kafka_engine.get_headers( + fg, num_entries=10 + ) + + # Assert + assert results == { + 'featureGroupId': b'111', + 'onlineIngestionId': b'1', + 'projectId': b'234', + 'subjectId': b'823' + } diff --git a/python/tests/fixtures/backend_fixtures.py b/python/tests/fixtures/backend_fixtures.py index dd455b699..71a55548f 100644 --- a/python/tests/fixtures/backend_fixtures.py +++ b/python/tests/fixtures/backend_fixtures.py @@ -71,6 +71,7 @@ "transformer", "user", "validation_report", + "online_ingestion", ] backend_fixtures_json = {} diff --git a/python/tests/fixtures/online_ingestion_fixtures.json b/python/tests/fixtures/online_ingestion_fixtures.json new file mode 100644 index 000000000..23d804e60 --- /dev/null +++ b/python/tests/fixtures/online_ingestion_fixtures.json @@ -0,0 +1,22 @@ +{ + "get": { + "response": { + "count": 1, + "items": [ + { + "id": 1, + "num_entries": 10, + "processed_entries": 8, + "inserted_entries": 6, + "aborted_entries": 2 + } + ] + } + }, + "get_empty": { + "response": { + "count": 0, + "items": [] + } + } +} From f7b48007ccd403273061597ec95994984377d9d3 Mon Sep 17 00:00:00 2001 From: bubriks Date: Fri, 13 Dec 2024 11:07:52 +0200 Subject: [PATCH 07/25] ruff fix --- python/tests/core/test_kafka_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/core/test_kafka_engine.py b/python/tests/core/test_kafka_engine.py index 51344e2cb..682324e9d 100644 --- a/python/tests/core/test_kafka_engine.py +++ b/python/tests/core/test_kafka_engine.py @@ -15,7 +15,7 @@ # import importlib -from hsfs import storage_connector, feature_group +from hsfs import feature_group, storage_connector from hsfs.core import constants, kafka_engine, online_ingestion From f5b1206fb77a0deb4ae542e5660627d0bb507a95 Mon Sep 17 00:00:00 2001 From: bubriks Date: Fri, 13 Dec 2024 16:39:01 +0200 Subject: [PATCH 08/25] add wait_for_online_ingestion --- python/hsfs/engine/python.py | 5 +++++ python/hsfs/engine/spark.py | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index e3e32679c..16c7f4c4e 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1558,6 +1558,11 @@ def _write_dataframe_kafka( ), await_termination=offline_write_options.get("wait_for_job", False), ) + + # wait for online ingestion + if feature_group.online_enabled and offline_write_options.get("wait_for_online_ingestion", False): + feature_group.get_latest_online_ingestion().wait_for_completion() + return feature_group.materialization_job @staticmethod diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index f3af001dd..8d124d76e 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -518,6 +518,10 @@ def save_stream_dataframe( if await_termination: query.awaitTermination(timeout) + # wait for online ingestion + if feature_group.online_enabled and write_options.get("wait_for_online_ingestion", False): + feature_group.get_latest_online_ingestion().wait_for_completion() + return query def _save_offline_dataframe( @@ -573,6 +577,10 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): .save() ) + # wait for online ingestion + if feature_group.online_enabled and write_options.get("wait_for_online_ingestion", False): + feature_group.get_latest_online_ingestion().wait_for_completion() + def _get_headers( self, feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup], From b80c0def1d85ad589ecc4ebd777aefb782a22af5 Mon Sep 17 00:00:00 2001 From: bubriks Date: Fri, 13 Dec 2024 16:55:02 +0200 Subject: [PATCH 09/25] small rename --- python/hsfs/core/online_ingestion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index c1eae20bd..192000359 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -125,7 +125,7 @@ def batch_results(self): def wait_for_completion(self): with tqdm(total=self.num_entries, bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt}", - desc="Data processing progress", + desc="Online data ingestion progress", mininterval=1) as progress_bar: while True: if self.aborted_entries: From 1f83a730611c56c206e76a8eae4ff2d802450470 Mon Sep 17 00:00:00 2001 From: bubriks Date: Tue, 17 Dec 2024 17:47:25 +0200 Subject: [PATCH 10/25] add timeout --- python/hsfs/core/online_ingestion.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index 192000359..645638e9b 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -17,6 +17,8 @@ import json import time +import warnings +from datetime import datetime, timedelta from typing import Any, Dict, Optional import humps @@ -122,7 +124,11 @@ def aborted_entries(self) -> int: def batch_results(self): return self._batch_results - def wait_for_completion(self): + def wait_for_completion(self, options={}): + # Set timeout time + timeout_delta = timedelta(seconds=options.get("timeout", 60)) + timeout_time = datetime.now() + timeout_delta + with tqdm(total=self.num_entries, bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt}", desc="Online data ingestion progress", @@ -137,6 +143,13 @@ def wait_for_completion(self): if self.processed_entries >= self.num_entries: break - time.sleep(1) + if datetime.now() >= timeout_time: + warnings.warn( + f"Timeout of {timeout_delta} was exceeded while waiting for online ingestion completion.", + stacklevel=1, + ) + break + + time.sleep(options.get("period", 1)) self.refresh() From 0446791479f7422c368269546a78584fb4c39a2b Mon Sep 17 00:00:00 2001 From: bubriks Date: Tue, 17 Dec 2024 17:53:33 +0200 Subject: [PATCH 11/25] fix B006 --- python/hsfs/core/online_ingestion.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index 645638e9b..d194a7c34 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -124,7 +124,10 @@ def aborted_entries(self) -> int: def batch_results(self): return self._batch_results - def wait_for_completion(self, options={}): + def wait_for_completion(self, options: Dict[str, Any] = None): + if options is None: + options = {} + # Set timeout time timeout_delta = timedelta(seconds=options.get("timeout", 60)) timeout_time = datetime.now() + timeout_delta From 55c37eb125a0f9778eb2f4922a8764b0cdb38921 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Dec 2024 11:15:51 +0200 Subject: [PATCH 12/25] test fix --- python/tests/core/test_kafka_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/core/test_kafka_engine.py b/python/tests/core/test_kafka_engine.py index ef5fbc548..11b2d7ca4 100644 --- a/python/tests/core/test_kafka_engine.py +++ b/python/tests/core/test_kafka_engine.py @@ -17,7 +17,7 @@ from hopsworks_common.core import constants from hsfs import feature_group, storage_connector -from hsfs.core import constants, kafka_engine, online_ingestion +from hsfs.core import kafka_engine, online_ingestion if constants.HAS_CONFLUENT_KAFKA: From 03847e394224ebec76c8c67accb6a9381ec8acba Mon Sep 17 00:00:00 2001 From: bubriks Date: Fri, 20 Dec 2024 17:13:57 +0200 Subject: [PATCH 13/25] feedback fix --- python/hsfs/core/kafka_engine.py | 12 ++- python/hsfs/core/online_ingestion.py | 91 ++++++++++++------- python/hsfs/core/online_ingestion_api.py | 28 +++--- .../core/online_ingestion_batch_result.py | 86 ++++++++++++++++++ python/hsfs/engine/python.py | 8 +- python/hsfs/engine/spark.py | 38 +++++--- python/hsfs/feature_group.py | 4 +- python/tests/core/test_kafka_engine.py | 12 +-- python/tests/engine/test_python.py | 8 +- python/tests/engine/test_python_writer.py | 2 +- python/tests/engine/test_spark.py | 10 +- 11 files changed, 214 insertions(+), 85 deletions(-) create mode 100644 python/hsfs/core/online_ingestion_batch_result.py diff --git a/python/hsfs/core/kafka_engine.py b/python/hsfs/core/kafka_engine.py index 8e3349f54..0a211ff45 100644 --- a/python/hsfs/core/kafka_engine.py +++ b/python/hsfs/core/kafka_engine.py @@ -115,18 +115,20 @@ def _init_kafka_resources( return producer, get_headers(feature_group, num_entries), feature_writers, writer + def get_headers( - feature_group: Union[FeatureGroup, ExternalFeatureGroup], - num_entries: int, + feature_group: Union[FeatureGroup, ExternalFeatureGroup], + num_entries: int, ) -> Dict[str, bytes]: # setup online ingestion online_ingestion_instance = online_ingestion.OnlineIngestion( num_entries=num_entries ) - online_ingestion_instance = online_ingestion_api.OnlineIngestionApi().create_online_ingestion( - feature_group, - online_ingestion_instance + online_ingestion_instance = ( + online_ingestion_api.OnlineIngestionApi().create_online_ingestion( + feature_group, online_ingestion_instance + ) ) # custom headers for hopsworks onlineFS diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index d194a7c34..399d45f2a 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -19,10 +19,18 @@ import time import warnings from datetime import datetime, timedelta -from typing import Any, Dict, Optional +from typing import ( + Any, + Dict, + List, + Optional, + Union, +) import humps from hopsworks_common import util +from hsfs import feature_group as fg_mod +from hsfs.core import online_ingestion_batch_result from tqdm.auto import tqdm @@ -35,34 +43,53 @@ def __init__( self, id: Optional[int] = None, num_entries: int = None, - current_offsets: int = None, - processed_entries: int = None, - inserted_entries: int = None, - aborted_entries: int = None, - batch_results = None, + current_offsets: Optional[str] = None, + processed_entries: Optional[int] = None, + inserted_entries: Optional[int] = None, + aborted_entries: Optional[int] = None, + batch_results: Union[ + List[online_ingestion_batch_result.OnlineIngestionBatchResult], + List[Dict[str, Any]], + ] = [], + feature_group: fg_mod.FeatureGroup = None, **kwargs, ): self._id = id - self._num_entries = num_entries # specified when inserting + self._num_entries = num_entries # specified when inserting self._current_offsets = current_offsets self._processed_entries = processed_entries self._inserted_entries = inserted_entries self._aborted_entries = aborted_entries - self._batch_results = batch_results # batch inserts performed by onlinefs + self._batch_results = [ + ( + online_ingestion_batch_result.OnlineIngestionBatchResult.from_response_json( + batch_result + ) + if isinstance(batch_result, dict) + else batch_result + ) + for batch_result in batch_results + ] # batch inserts performed by onlinefs + self._feature_group = feature_group @classmethod - def from_response_json(cls, json_dict: Dict[str, Any]) -> "OnlineIngestion": + def from_response_json( + cls, json_dict: Dict[str, Any], feature_group: fg_mod.FeatureGroup = None + ) -> OnlineIngestion: if json_dict is None: return None json_decamelized: dict = humps.decamelize(json_dict) if "count" not in json_decamelized: - return cls(**json_decamelized) + return cls(**json_decamelized, feature_group=feature_group) elif json_decamelized["count"] == 1: - return cls(**json_decamelized["items"][0]) + return cls(**json_decamelized["items"][0], feature_group=feature_group) elif json_decamelized["count"] > 1: - return [cls(**item) for item in json_decamelized["items"]] + return [ + cls(**item, feature_group=feature_group) + for item in json_decamelized["items"] + ] else: return None @@ -70,22 +97,18 @@ def refresh(self): from hsfs.core.online_ingestion_api import OnlineIngestionApi online_ingestion = OnlineIngestionApi().get_online_ingestion( - self.feature_group, - query_params={"filter_by": f"ID:{self.id}"} + self.feature_group, query_params={"filter_by": f"ID:{self.id}"} ) self.__dict__.update(online_ingestion.__dict__) def to_dict(self): - return { - "id": self._id, - "numEntries": self._num_entries - } + return {"id": self._id, "numEntries": self._num_entries} def json(self): return json.dumps(self, cls=util.Encoder) @property - def id(self) -> int: + def id(self) -> Optional[int]: return self._id @property @@ -93,19 +116,11 @@ def num_entries(self) -> int: return self._num_entries @num_entries.setter - def num_entries(self, num_entries: str) -> None: + def num_entries(self, num_entries: int) -> None: self._num_entries = num_entries @property - def feature_group(self): - return self._feature_group - - @feature_group.setter - def feature_group(self, feature_group) -> None: - self._feature_group = feature_group - - @property - def current_offsets(self) -> str: + def current_offsets(self) -> Optional[str]: return self._current_offsets @property @@ -121,9 +136,15 @@ def aborted_entries(self) -> int: return 0 if self._aborted_entries is None else self._aborted_entries @property - def batch_results(self): + def batch_results( + self, + ) -> List[online_ingestion_batch_result.OnlineIngestionBatchResult]: return self._batch_results + @property + def feature_group(self) -> fg_mod.FeatureGroup: + return self._feature_group + def wait_for_completion(self, options: Dict[str, Any] = None): if options is None: options = {} @@ -132,10 +153,12 @@ def wait_for_completion(self, options: Dict[str, Any] = None): timeout_delta = timedelta(seconds=options.get("timeout", 60)) timeout_time = datetime.now() + timeout_delta - with tqdm(total=self.num_entries, - bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt}", - desc="Online data ingestion progress", - mininterval=1) as progress_bar: + with tqdm( + total=self.num_entries, + bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt}", + desc="Online data ingestion progress", + mininterval=1, + ) as progress_bar: while True: if self.aborted_entries: progress_bar.colour = "RED" diff --git a/python/hsfs/core/online_ingestion_api.py b/python/hsfs/core/online_ingestion_api.py index f8cd541f6..c17282a34 100644 --- a/python/hsfs/core/online_ingestion_api.py +++ b/python/hsfs/core/online_ingestion_api.py @@ -15,18 +15,19 @@ # from __future__ import annotations +from typing import Dict, Optional + from hopsworks_common import client from hsfs import feature_group as fg_mod from hsfs.core import online_ingestion class OnlineIngestionApi: - def create_online_ingestion( self, feature_group_instance: fg_mod.FeatureGroup, online_ingestion_instance: online_ingestion.OnlineIngestion, - ): + ) -> online_ingestion.OnlineIngestion: _client = client.get_instance() path_params = [ "project", @@ -39,17 +40,21 @@ def create_online_ingestion( ] headers = {"content-type": "application/json"} - online_ingestion_instance = online_ingestion.OnlineIngestion.from_response_json( - _client._send_request("POST", path_params, headers=headers, data=online_ingestion_instance.json()) + return online_ingestion.OnlineIngestion.from_response_json( + _client._send_request( + "POST", + path_params, + headers=headers, + data=online_ingestion_instance.json(), + ), + feature_group=feature_group_instance, ) - online_ingestion_instance.feature_group = feature_group_instance - return online_ingestion_instance def get_online_ingestion( self, feature_group_instance: fg_mod.FeatureGroup, - query_params: None, - ): + query_params: Optional[Dict[str, str]] = None, + ) -> online_ingestion.OnlineIngestion: _client = client.get_instance() path_params = [ "project", @@ -61,8 +66,7 @@ def get_online_ingestion( "online_ingestion", ] - online_ingestion_instance = online_ingestion.OnlineIngestion.from_response_json( - _client._send_request("GET", path_params, query_params) + return online_ingestion.OnlineIngestion.from_response_json( + _client._send_request("GET", path_params, query_params), + feature_group=feature_group_instance, ) - online_ingestion_instance.feature_group = feature_group_instance - return online_ingestion_instance diff --git a/python/hsfs/core/online_ingestion_batch_result.py b/python/hsfs/core/online_ingestion_batch_result.py new file mode 100644 index 000000000..079a35363 --- /dev/null +++ b/python/hsfs/core/online_ingestion_batch_result.py @@ -0,0 +1,86 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +import json +from typing import Any, Dict, Optional + +import humps +from hopsworks_common import util + + +class OnlineIngestionBatchResult: + """ + Metadata object used to provide Online Ingestion Batch Result information for a feature group. + """ + + def __init__( + self, + id: Optional[int] = None, + batch_size: int = None, + status: str = None, + info: Optional[str] = None, + **kwargs, + ): + self._id = id + self._batch_size = batch_size + self._status = status + self._info = info + + @classmethod + def from_response_json( + cls, json_dict: Dict[str, Any] + ) -> OnlineIngestionBatchResult: + if json_dict is None: + return None + + json_decamelized: dict = humps.decamelize(json_dict) + + if "count" not in json_decamelized: + return cls(**json_decamelized) + elif json_decamelized["count"] == 1: + return cls(**json_decamelized["items"][0]) + elif json_decamelized["count"] > 1: + return [cls(**item) for item in json_decamelized["items"]] + else: + return None + + def to_dict(self): + return { + "id": self._id, + "batchSize": self._batch_size, + "status": self._status, + "info": self._info, + } + + def json(self): + return json.dumps(self, cls=util.Encoder) + + @property + def id(self) -> Optional[int]: + return self._id + + @property + def batch_size(self) -> int: + return self._batch_size + + @property + def status(self) -> str: + return self._status + + @property + def info(self) -> Optional[str]: + return self._info diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 16c7f4c4e..073ac929b 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1560,8 +1560,12 @@ def _write_dataframe_kafka( ) # wait for online ingestion - if feature_group.online_enabled and offline_write_options.get("wait_for_online_ingestion", False): - feature_group.get_latest_online_ingestion().wait_for_completion() + if feature_group.online_enabled and offline_write_options.get( + "wait_for_online_ingestion", False + ): + feature_group.get_latest_online_ingestion().wait_for_completion( + options=offline_write_options.get("online_ingestion_options", {}) + ) return feature_group.materialization_job diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 8d124d76e..f20cc88b5 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -494,10 +494,10 @@ def save_stream_dataframe( ) query = ( - serialized_df - .withColumn("headers", self._get_headers(feature_group, dataframe)) - .writeStream - .outputMode(output_mode) + serialized_df.withColumn( + "headers", self._get_headers(feature_group, dataframe) + ) + .writeStream.outputMode(output_mode) .format(self.KAFKA_FORMAT) .option( "checkpointLocation", @@ -519,8 +519,12 @@ def save_stream_dataframe( query.awaitTermination(timeout) # wait for online ingestion - if feature_group.online_enabled and write_options.get("wait_for_online_ingestion", False): - feature_group.get_latest_online_ingestion().wait_for_completion() + if feature_group.online_enabled and write_options.get( + "wait_for_online_ingestion", False + ): + feature_group.get_latest_online_ingestion().wait_for_completion( + options=offline_write_options.get("online_ingestion_options", {}) + ) return query @@ -568,28 +572,34 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): serialized_df = self._serialize_to_avro(feature_group, dataframe) ( - serialized_df - .withColumn("headers", self._get_headers(feature_group, dataframe)) - .write - .format(self.KAFKA_FORMAT) + serialized_df.withColumn( + "headers", self._get_headers(feature_group, dataframe) + ) + .write.format(self.KAFKA_FORMAT) .options(**write_options) .option("topic", feature_group._online_topic_name) .save() ) # wait for online ingestion - if feature_group.online_enabled and write_options.get("wait_for_online_ingestion", False): - feature_group.get_latest_online_ingestion().wait_for_completion() + if feature_group.online_enabled and write_options.get( + "wait_for_online_ingestion", False + ): + feature_group.get_latest_online_ingestion().wait_for_completion( + options=offline_write_options.get("online_ingestion_options", {}) + ) def _get_headers( self, feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup], dataframe: Union[RDD, DataFrame], - ): + ) -> array: return array( *[ struct(lit(key).alias("key"), lit(value).alias("value")) - for key, value in kafka_engine.get_headers(feature_group, dataframe.count()).items() + for key, value in kafka_engine.get_headers( + feature_group, dataframe.count() + ).items() ] ) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 8613a4a88..7541686ea 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -1805,7 +1805,9 @@ def statistics_config( ) def get_latest_online_ingestion(self) -> online_ingestion.OnlineIngestion: - return online_ingestion_api.OnlineIngestionApi().get_online_ingestion(self, query_params={"filter_by": "LATEST"}) + return online_ingestion_api.OnlineIngestionApi().get_online_ingestion( + self, query_params={"filter_by": "LATEST"} + ) @property def feature_store_id(self) -> Optional[int]: diff --git a/python/tests/core/test_kafka_engine.py b/python/tests/core/test_kafka_engine.py index 11b2d7ca4..70275efe4 100644 --- a/python/tests/core/test_kafka_engine.py +++ b/python/tests/core/test_kafka_engine.py @@ -547,14 +547,12 @@ def test_get_headers(self, mocker, backend_fixtures): fg._subject = {"id": 823} # Act - results = kafka_engine.get_headers( - fg, num_entries=10 - ) + results = kafka_engine.get_headers(fg, num_entries=10) # Assert assert results == { - 'featureGroupId': b'111', - 'onlineIngestionId': b'1', - 'projectId': b'234', - 'subjectId': b'823' + "featureGroupId": b"111", + "onlineIngestionId": b"1", + "projectId": b"234", + "subjectId": b"823", } diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 07ed7db3b..388dcd721 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -3522,7 +3522,7 @@ def test_materialization_kafka(self, mocker): mocker.patch("hopsworks_common.client.get_instance") mocker.patch( "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", - return_value= online_ingestion.OnlineIngestion(id=123), + return_value=online_ingestion.OnlineIngestion(id=123), ) python_engine = python.Engine() @@ -3586,7 +3586,7 @@ def test_materialization_kafka_first_job_execution(self, mocker): mocker.patch("hopsworks_common.client.get_instance") mocker.patch( "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", - return_value= online_ingestion.OnlineIngestion(id=123), + return_value=online_ingestion.OnlineIngestion(id=123), ) python_engine = python.Engine() @@ -3646,7 +3646,7 @@ def test_materialization_kafka_skip_offsets(self, mocker): mocker.patch("hopsworks_common.client.get_instance") mocker.patch( "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", - return_value= online_ingestion.OnlineIngestion(id=123), + return_value=online_ingestion.OnlineIngestion(id=123), ) python_engine = python.Engine() @@ -3709,7 +3709,7 @@ def test_materialization_kafka_topic_doesnt_exist(self, mocker): mocker.patch("hopsworks_common.client.get_instance") mocker.patch( "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", - return_value= online_ingestion.OnlineIngestion(id=123), + return_value=online_ingestion.OnlineIngestion(id=123), ) python_engine = python.Engine() diff --git a/python/tests/engine/test_python_writer.py b/python/tests/engine/test_python_writer.py index 24a2b3b92..8e3107585 100644 --- a/python/tests/engine/test_python_writer.py +++ b/python/tests/engine/test_python_writer.py @@ -36,7 +36,7 @@ def test_write_dataframe_kafka(self, mocker, dataframe_fixture_times): ) mocker.patch( "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", - return_value= online_ingestion.OnlineIngestion(id=123), + return_value=online_ingestion.OnlineIngestion(id=123), ) avro_schema = ( '{"type":"record","name":"test_fg","namespace":"test_featurestore.db","fields":' diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 4e31132a8..d3b46a91e 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -875,7 +875,7 @@ def test_save_stream_dataframe(self, mocker, backend_fixtures): ) mocker.patch( "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", - return_value= online_ingestion.OnlineIngestion(id=123), + return_value=online_ingestion.OnlineIngestion(id=123), ) json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) @@ -1002,7 +1002,7 @@ def test_save_stream_dataframe_query_name(self, mocker, backend_fixtures): ) mocker.patch( "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", - return_value= online_ingestion.OnlineIngestion(id=123), + return_value=online_ingestion.OnlineIngestion(id=123), ) json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) @@ -1133,7 +1133,7 @@ def test_save_stream_dataframe_checkpoint_dir(self, mocker, backend_fixtures): ) mocker.patch( "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", - return_value= online_ingestion.OnlineIngestion(id=123), + return_value=online_ingestion.OnlineIngestion(id=123), ) json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) @@ -1260,7 +1260,7 @@ def test_save_stream_dataframe_await_termination(self, mocker, backend_fixtures) ) mocker.patch( "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", - return_value= online_ingestion.OnlineIngestion(id=123), + return_value=online_ingestion.OnlineIngestion(id=123), ) json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) @@ -1524,7 +1524,7 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): ) mocker.patch( "hsfs.core.online_ingestion_api.OnlineIngestionApi.create_online_ingestion", - return_value= online_ingestion.OnlineIngestion(id=123), + return_value=online_ingestion.OnlineIngestion(id=123), ) json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) From 6a96b0693f6aae9d892d60c1b3a88fecfc0dfd89 Mon Sep 17 00:00:00 2001 From: bubriks Date: Fri, 20 Dec 2024 17:16:10 +0200 Subject: [PATCH 14/25] fix --- python/hsfs/engine/spark.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index f20cc88b5..b3f64e87d 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -523,7 +523,7 @@ def save_stream_dataframe( "wait_for_online_ingestion", False ): feature_group.get_latest_online_ingestion().wait_for_completion( - options=offline_write_options.get("online_ingestion_options", {}) + options=write_options.get("online_ingestion_options", {}) ) return query @@ -586,7 +586,7 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): "wait_for_online_ingestion", False ): feature_group.get_latest_online_ingestion().wait_for_completion( - options=offline_write_options.get("online_ingestion_options", {}) + options=write_options.get("online_ingestion_options", {}) ) def _get_headers( From a87a4bb58decbe41bb54741e3672457e5e407d0e Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 23 Dec 2024 10:36:00 +0200 Subject: [PATCH 15/25] fix lint --- python/hsfs/core/online_ingestion.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index 399d45f2a..f35f00359 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -50,7 +50,7 @@ def __init__( batch_results: Union[ List[online_ingestion_batch_result.OnlineIngestionBatchResult], List[Dict[str, Any]], - ] = [], + ] = None, feature_group: fg_mod.FeatureGroup = None, **kwargs, ): @@ -69,7 +69,7 @@ def __init__( else batch_result ) for batch_result in batch_results - ] # batch inserts performed by onlinefs + ] if batch_results else [] # batch inserts performed by onlinefs self._feature_group = feature_group @classmethod From cf4208f918f33259596997783d94a072b54bb344 Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 30 Dec 2024 15:32:56 +0200 Subject: [PATCH 16/25] remove info --- python/hsfs/core/online_ingestion.py | 55 +++++++++++++++---- .../core/online_ingestion_batch_result.py | 7 --- 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index f35f00359..e5ef39dd8 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -28,9 +28,10 @@ ) import humps -from hopsworks_common import util +from hopsworks_common import client, util from hsfs import feature_group as fg_mod from hsfs.core import online_ingestion_batch_result +from hsfs.core.opensearch import OpenSearchClientSingleton from tqdm.auto import tqdm @@ -60,16 +61,20 @@ def __init__( self._processed_entries = processed_entries self._inserted_entries = inserted_entries self._aborted_entries = aborted_entries - self._batch_results = [ - ( - online_ingestion_batch_result.OnlineIngestionBatchResult.from_response_json( - batch_result + self._batch_results = ( + [ + ( + online_ingestion_batch_result.OnlineIngestionBatchResult.from_response_json( + batch_result + ) + if isinstance(batch_result, dict) + else batch_result ) - if isinstance(batch_result, dict) - else batch_result - ) - for batch_result in batch_results - ] if batch_results else [] # batch inserts performed by onlinefs + for batch_result in batch_results + ] + if batch_results + else [] + ) # batch inserts performed by onlinefs self._feature_group = feature_group @classmethod @@ -179,3 +184,33 @@ def wait_for_completion(self, options: Dict[str, Any] = None): time.sleep(options.get("period", 1)) self.refresh() + + def print_logs(self, priority: str = "error", size: int = 20): + open_search_client = OpenSearchClientSingleton() + + response = open_search_client.search( + body={ + "query": { + "bool": { + "must": [ + { + "match": { + "log_arguments.feature_group_id": f"{self.feature_group.id}" + } + }, + { + "match": { + "log_arguments.online_ingestion_id": f"{self.id}" + } + }, + {"match": {"priority": priority}}, + ] + } + }, + "size": size, + }, + index=f"onlinefs_{client.get_instance()._project_id}-*", + ) + + for hit in response["hits"]["hits"]: + print(hit["_source"]["error"]["data"]) diff --git a/python/hsfs/core/online_ingestion_batch_result.py b/python/hsfs/core/online_ingestion_batch_result.py index 079a35363..175c696f9 100644 --- a/python/hsfs/core/online_ingestion_batch_result.py +++ b/python/hsfs/core/online_ingestion_batch_result.py @@ -32,13 +32,11 @@ def __init__( id: Optional[int] = None, batch_size: int = None, status: str = None, - info: Optional[str] = None, **kwargs, ): self._id = id self._batch_size = batch_size self._status = status - self._info = info @classmethod def from_response_json( @@ -63,7 +61,6 @@ def to_dict(self): "id": self._id, "batchSize": self._batch_size, "status": self._status, - "info": self._info, } def json(self): @@ -80,7 +77,3 @@ def batch_size(self) -> int: @property def status(self) -> str: return self._status - - @property - def info(self) -> Optional[str]: - return self._info From 3253104942592dc69fcd8e8a774c538c3189c9f5 Mon Sep 17 00:00:00 2001 From: bubriks Date: Fri, 3 Jan 2025 14:30:12 +0200 Subject: [PATCH 17/25] id change --- python/hsfs/core/online_ingestion_batch_result.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/hsfs/core/online_ingestion_batch_result.py b/python/hsfs/core/online_ingestion_batch_result.py index 175c696f9..844a6c117 100644 --- a/python/hsfs/core/online_ingestion_batch_result.py +++ b/python/hsfs/core/online_ingestion_batch_result.py @@ -16,7 +16,7 @@ from __future__ import annotations import json -from typing import Any, Dict, Optional +from typing import Any, Dict import humps from hopsworks_common import util @@ -29,7 +29,7 @@ class OnlineIngestionBatchResult: def __init__( self, - id: Optional[int] = None, + id: str = None, batch_size: int = None, status: str = None, **kwargs, @@ -67,7 +67,7 @@ def json(self): return json.dumps(self, cls=util.Encoder) @property - def id(self) -> Optional[int]: + def id(self) -> str: return self._id @property From 779fb28dfa674800cbc8bd239dc4c87842067584 Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 6 Jan 2025 11:54:22 +0200 Subject: [PATCH 18/25] allow for nullable OnlineIngestion num_entries --- python/hsfs/core/kafka_engine.py | 6 +++--- python/hsfs/core/online_ingestion.py | 8 ++++---- python/hsfs/engine/spark.py | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/python/hsfs/core/kafka_engine.py b/python/hsfs/core/kafka_engine.py index 0a211ff45..31fb45705 100644 --- a/python/hsfs/core/kafka_engine.py +++ b/python/hsfs/core/kafka_engine.py @@ -71,7 +71,7 @@ def init_kafka_consumer( def init_kafka_resources( feature_group: Union[FeatureGroup, ExternalFeatureGroup], offline_write_options: Dict[str, Any], - num_entries: int, + num_entries: Optional[int] = None, ) -> Tuple[ Producer, Dict[str, bytes], Dict[str, Callable[..., bytes]], Callable[..., bytes] : ]: @@ -97,7 +97,7 @@ def init_kafka_resources( def _init_kafka_resources( feature_group: Union[FeatureGroup, ExternalFeatureGroup], offline_write_options: Dict[str, Any], - num_entries: int, + num_entries: Optional[int] = None, ) -> Tuple[ Producer, Dict[str, bytes], Dict[str, Callable[..., bytes]], Callable[..., bytes] : ]: @@ -118,7 +118,7 @@ def _init_kafka_resources( def get_headers( feature_group: Union[FeatureGroup, ExternalFeatureGroup], - num_entries: int, + num_entries: Optional[int] = None, ) -> Dict[str, bytes]: # setup online ingestion online_ingestion_instance = online_ingestion.OnlineIngestion( diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index e5ef39dd8..a8d22ccf5 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -43,7 +43,7 @@ class OnlineIngestion: def __init__( self, id: Optional[int] = None, - num_entries: int = None, + num_entries: Optional[int] = None, current_offsets: Optional[str] = None, processed_entries: Optional[int] = None, inserted_entries: Optional[int] = None, @@ -56,7 +56,7 @@ def __init__( **kwargs, ): self._id = id - self._num_entries = num_entries # specified when inserting + self._num_entries = num_entries # specified when inserting (optional since might not be specified when using streaming) self._current_offsets = current_offsets self._processed_entries = processed_entries self._inserted_entries = inserted_entries @@ -117,7 +117,7 @@ def id(self) -> Optional[int]: return self._id @property - def num_entries(self) -> int: + def num_entries(self) -> Optional[int]: return self._num_entries @num_entries.setter @@ -171,7 +171,7 @@ def wait_for_completion(self, options: Dict[str, Any] = None): progress_bar.n = self.processed_entries progress_bar.refresh() - if self.processed_entries >= self.num_entries: + if self.num_entries and self.processed_entries >= self.num_entries: break if datetime.now() >= timeout_time: diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index b3f64e87d..dde91c657 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -592,13 +592,13 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): def _get_headers( self, feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup], - dataframe: Union[RDD, DataFrame], + dataframe: Optional[Union[RDD, DataFrame]] = None, ) -> array: return array( *[ struct(lit(key).alias("key"), lit(value).alias("value")) for key, value in kafka_engine.get_headers( - feature_group, dataframe.count() + feature_group, dataframe.count() if dataframe else None ).items() ] ) From 0c4a82e120fcee5b9e46e0e817571b8eeb72757a Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 6 Jan 2025 14:54:16 +0200 Subject: [PATCH 19/25] support for java client (not tested/finished) --- .../hsfs/beam/engine/BeamProducer.java | 13 +-- .../flink/engine/KafkaRecordSerializer.java | 12 +-- .../logicalclocks/hsfs/FeatureGroupBase.java | 6 ++ .../logicalclocks/hsfs/OnlineIngestion.java | 71 +++++++++++++++++ .../hsfs/OnlineIngestionBatchResult.java | 41 ++++++++++ .../hsfs/engine/FeatureGroupUtils.java | 20 +++++ .../hsfs/metadata/OnlineIngestionApi.java | 79 +++++++++++++++++++ .../hsfs/spark/engine/SparkEngine.java | 53 ++++--------- python/hsfs/core/online_ingestion.py | 2 +- python/hsfs/core/online_ingestion_api.py | 2 +- .../core/online_ingestion_batch_result.py | 2 +- python/hsfs/engine/spark.py | 8 +- 12 files changed, 247 insertions(+), 62 deletions(-) create mode 100644 java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java create mode 100644 java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestionBatchResult.java create mode 100644 java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java index e2b13e074..c9ebb2b1e 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java @@ -19,6 +19,8 @@ import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.beam.StreamFeatureGroup; +import com.logicalclocks.hsfs.engine.FeatureGroupUtils; + import lombok.NonNull; import org.apache.avro.Schema; @@ -46,13 +48,11 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -64,7 +64,7 @@ public class BeamProducer extends PTransform<@NonNull PCollection, @NonNull private transient Schema encodedSchema; private Map deserializedComplexFeatureSchemas; private List primaryKeys; - private final Map headerMap = new HashMap<>(); + private final Map headerMap; public BeamProducer(String topic, Map properties, Schema schema, Schema encodedSchema, Map deserializedComplexFeatureSchemas, List primaryKeys, @@ -75,12 +75,7 @@ public BeamProducer(String topic, Map properties, Schema schema, this.properties = properties; this.deserializedComplexFeatureSchemas = deserializedComplexFeatureSchemas; this.primaryKeys = primaryKeys; - - headerMap.put("projectId", - String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8)); - headerMap.put("featureGroupId", String.valueOf(streamFeatureGroup.getId()).getBytes(StandardCharsets.UTF_8)); - headerMap.put("subjectId", - String.valueOf(streamFeatureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8)); + this.headerMap = FeatureGroupUtils.getHeaders(streamFeatureGroup, null); } @Override diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java index b1729f75d..7409ed94e 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java @@ -18,7 +18,9 @@ package com.logicalclocks.hsfs.flink.engine; import com.logicalclocks.hsfs.FeatureStoreException; +import com.logicalclocks.hsfs.engine.FeatureGroupUtils; import com.logicalclocks.hsfs.flink.StreamFeatureGroup; + import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; @@ -32,7 +34,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,17 +41,12 @@ public class KafkaRecordSerializer implements KafkaRecordSerializationSchema primaryKeys; - private final Map headerMap = new HashMap<>(); + private final Map headerMap; KafkaRecordSerializer(StreamFeatureGroup streamFeatureGroup) throws FeatureStoreException, IOException { this.topic = streamFeatureGroup.getOnlineTopicName(); this.primaryKeys = streamFeatureGroup.getPrimaryKeys(); - - headerMap.put("projectId", - String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8)); - headerMap.put("featureGroupId", String.valueOf(streamFeatureGroup.getId()).getBytes(StandardCharsets.UTF_8)); - headerMap.put("subjectId", - String.valueOf(streamFeatureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8)); + this.headerMap = FeatureGroupUtils.getHeaders(streamFeatureGroup, null); } @Override diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java index 61ca2f5a9..ae38c9bd6 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java @@ -22,6 +22,7 @@ import com.logicalclocks.hsfs.constructor.QueryBase; import com.logicalclocks.hsfs.engine.FeatureGroupEngineBase; import com.logicalclocks.hsfs.engine.FeatureGroupUtils; +import com.logicalclocks.hsfs.metadata.OnlineIngestionApi; import com.logicalclocks.hsfs.metadata.Statistics; import com.logicalclocks.hsfs.metadata.Subject; import com.logicalclocks.hsfs.metadata.User; @@ -153,6 +154,7 @@ public abstract class FeatureGroupBase { protected FeatureGroupEngineBase featureGroupEngineBase = new FeatureGroupEngineBase(); protected FeatureGroupUtils utils = new FeatureGroupUtils(); + protected OnlineIngestionApi onlineIngestionApi = new OnlineIngestionApi(); protected static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupBase.class); @@ -543,5 +545,9 @@ public Schema getDeserializedAvroSchema() throws FeatureStoreException, IOExcept return utils.getDeserializedAvroSchema(getAvroSchema()); } + @JsonIgnore + public OnlineIngestion getLatestOnlineIngestion() throws FeatureStoreException, IOException { + return onlineIngestionApi.getOnlineIngestion(this, "filter_by=LATEST"); + } } diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java new file mode 100644 index 000000000..cb8e81b89 --- /dev/null +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2025. Hopsworks AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.logicalclocks.hsfs; + +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@NoArgsConstructor +@AllArgsConstructor +public class OnlineIngestion { + + @Getter + @Setter + private Integer id; + + @Getter + @Setter + private long numEntries; + + @Getter + @Setter + private String currentOffsets; + + @Getter + @Setter + private Integer processedEntries; + + @Getter + @Setter + private Integer insertedEntries; + + @Getter + @Setter + private Integer abortedEntries; + + @Getter + @Setter + private List batchResults; + + @Getter + @Setter + private FeatureGroupBase featureGroup; + + public OnlineIngestion(long numEntries) { + this.numEntries = numEntries; + } + + public void waitForCompletion() { + + } + +} \ No newline at end of file diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestionBatchResult.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestionBatchResult.java new file mode 100644 index 000000000..90a15d750 --- /dev/null +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestionBatchResult.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2025. Hopsworks AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.logicalclocks.hsfs; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@NoArgsConstructor +@AllArgsConstructor +public class OnlineIngestionBatchResult { + + @Getter + @Setter + private String id; + + @Getter + @Setter + private Integer batchSize; + + @Getter + @Setter + private String status; // "SUCCEEDED", "FAILED" + +} \ No newline at end of file diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java index a937cca1a..9ce23ec98 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java @@ -20,11 +20,13 @@ import com.logicalclocks.hsfs.metadata.FeatureGroupApi; import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.metadata.KafkaApi; +import com.logicalclocks.hsfs.metadata.OnlineIngestionApi; import com.logicalclocks.hsfs.metadata.Subject; import com.logicalclocks.hsfs.Feature; import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.FeatureGroupCommit; import com.logicalclocks.hsfs.FeatureStoreException; +import com.logicalclocks.hsfs.OnlineIngestion; import lombok.SneakyThrows; import org.apache.avro.Schema; @@ -34,6 +36,7 @@ import scala.collection.Seq; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -247,4 +250,21 @@ public String getDatasetType(String path) { } return "DATASET"; } + + public static Map getHeaders(FeatureGroupBase featureGroup, Long numEntries) + throws FeatureStoreException, IOException { + Map headerMap = new HashMap<>(); + OnlineIngestion onlineIngestion = new OnlineIngestionApi() + .createOnlineIngestion(featureGroup, new OnlineIngestion(numEntries)); + + headerMap.put("projectId", + String.valueOf(featureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8)); + headerMap.put("featureGroupId", String.valueOf(featureGroup.getId()).getBytes(StandardCharsets.UTF_8)); + headerMap.put("subjectId", + String.valueOf(featureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8)); + headerMap.put("onlineIngestionId", + String.valueOf(onlineIngestion.getId()).getBytes(StandardCharsets.UTF_8)); + + return headerMap; + } } diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java new file mode 100644 index 000000000..ed361cae1 --- /dev/null +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2025. Hopsworks AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.logicalclocks.hsfs.metadata; + +import com.damnhandy.uri.template.UriTemplate; +import com.logicalclocks.hsfs.FeatureGroupBase; +import com.logicalclocks.hsfs.FeatureStoreException; +import com.logicalclocks.hsfs.OnlineIngestion; + +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class OnlineIngestionApi { + + private static final Logger LOGGER = LoggerFactory.getLogger(OnlineIngestionApi.class); + + public static final String ONLINE_INGESTION_PATH = "/featuregroups/{fgId}/online_ingestion{?queryParameters}"; + + public OnlineIngestion createOnlineIngestion(FeatureGroupBase featureGroup, OnlineIngestion onlineIngestion) + throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = HopsworksClient.getInstance(); + String pathTemplate = HopsworksClient.PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + ONLINE_INGESTION_PATH; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", hopsworksClient.getProject().getProjectId()) + .set("fsId", featureGroup.getFeatureStore().getId()) + .set("fgId", featureGroup.getId()) + .expand(); + + HttpPost postRequest = new HttpPost(uri); + postRequest.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + postRequest.setEntity(hopsworksClient.buildStringEntity(onlineIngestion)); + + LOGGER.info("Sending metadata request: " + uri); + + return hopsworksClient.handleRequest(postRequest, OnlineIngestion.class); + } + + public OnlineIngestion getOnlineIngestion(FeatureGroupBase featureGroup, String queryParameters) + throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = HopsworksClient.getInstance(); + String pathTemplate = HopsworksClient.PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + ONLINE_INGESTION_PATH; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", hopsworksClient.getProject().getProjectId()) + .set("fsId", featureGroup.getFeatureStore().getId()) + .set("fgId", featureGroup.getId()) + .set("queryParameters", queryParameters) + .expand(); + + LOGGER.info("Sending metadata request: " + uri); + + return hopsworksClient.handleRequest(new HttpGet(uri), OnlineIngestion.class); + } +} diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java index 7c784d8d8..600ee00c6 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java @@ -41,6 +41,7 @@ import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.metadata.HopsworksClient; import com.logicalclocks.hsfs.metadata.OnDemandOptions; +import com.logicalclocks.hsfs.metadata.OnlineIngestionApi; import com.logicalclocks.hsfs.metadata.Option; import com.logicalclocks.hsfs.util.Constants; import com.logicalclocks.hsfs.spark.ExternalFeatureGroup; @@ -124,6 +125,7 @@ public class SparkEngine extends EngineBase { private final StorageConnectorUtils storageConnectorUtils = new StorageConnectorUtils(); private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils(); + private OnlineIngestionApi onlineIngestionApi = new OnlineIngestionApi(); private static SparkEngine INSTANCE = null; @@ -532,26 +534,8 @@ public Dataset read(StorageConnector storageConnector, String dataFormat, public void writeOnlineDataframe(FeatureGroupBase featureGroupBase, Dataset dataset, String onlineTopicName, Map writeOptions) throws FeatureStoreException, IOException { - byte[] projectId = String.valueOf(featureGroupBase.getFeatureStore().getProjectId()) - .getBytes(StandardCharsets.UTF_8); - byte[] featureGroupId = String.valueOf(featureGroupBase.getId()).getBytes(StandardCharsets.UTF_8); - byte[] subjectId = String.valueOf(featureGroupBase.getSubject().getId()).getBytes(StandardCharsets.UTF_8); - onlineFeatureGroupToAvro(featureGroupBase, encodeComplexFeatures(featureGroupBase, dataset)) - .withColumn("headers", array( - struct( - lit("projectId").as("key"), - lit(projectId).as("value") - ), - struct( - lit("featureGroupId").as("key"), - lit(featureGroupId).as("value") - ), - struct( - lit("subjectId").as("key"), - lit(subjectId).as("value") - ) - )) + .withColumn("headers", getHeader(featureGroupBase, dataset.count())) .write() .format(Constants.KAFKA_FORMAT) .options(writeOptions) @@ -564,28 +548,10 @@ public StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase Long timeout, String checkpointLocation, Map writeOptions) throws FeatureStoreException, IOException, StreamingQueryException, TimeoutException { - byte[] projectId = String.valueOf(featureGroupBase.getFeatureStore().getProjectId()) - .getBytes(StandardCharsets.UTF_8); - byte[] featureGroupId = String.valueOf(featureGroupBase.getId()).getBytes(StandardCharsets.UTF_8); - byte[] subjectId = String.valueOf(featureGroupBase.getSubject().getId()).getBytes(StandardCharsets.UTF_8); - queryName = makeQueryName(queryName, featureGroupBase); DataStreamWriter writer = onlineFeatureGroupToAvro(featureGroupBase, encodeComplexFeatures(featureGroupBase, dataset)) - .withColumn("headers", array( - struct( - lit("projectId").as("key"), - lit(projectId).as("value") - ), - struct( - lit("featureGroupId").as("key"), - lit(featureGroupId).as("value") - ), - struct( - lit("subjectId").as("key"), - lit(subjectId).as("value") - ) - )) + .withColumn("headers", getHeader(featureGroupBase, null)) .writeStream() .format(Constants.KAFKA_FORMAT) .outputMode(outputMode) @@ -604,6 +570,17 @@ public StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase return query; } + private Column getHeader(FeatureGroupBase featureGroup, Long numEntries) throws FeatureStoreException, IOException { + return array( + FeatureGroupUtils.getHeaders(featureGroup, numEntries).entrySet().stream() + .map(entry -> struct( + lit(entry.getKey()).as("key"), + lit(entry.getValue()).as("value") + )) + .toArray(Column[]::new) + ); + } + /** * Encodes all complex type features to binary using their avro type as schema. * diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index a8d22ccf5..fa25cd4aa 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -1,5 +1,5 @@ # -# Copyright 2024 Hopsworks AB +# Copyright 2025 Hopsworks AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/python/hsfs/core/online_ingestion_api.py b/python/hsfs/core/online_ingestion_api.py index c17282a34..5608cbd1b 100644 --- a/python/hsfs/core/online_ingestion_api.py +++ b/python/hsfs/core/online_ingestion_api.py @@ -1,5 +1,5 @@ # -# Copyright 2024 Hopsworks AB +# Copyright 2025 Hopsworks AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/python/hsfs/core/online_ingestion_batch_result.py b/python/hsfs/core/online_ingestion_batch_result.py index 844a6c117..836a4de16 100644 --- a/python/hsfs/core/online_ingestion_batch_result.py +++ b/python/hsfs/core/online_ingestion_batch_result.py @@ -1,5 +1,5 @@ # -# Copyright 2024 Hopsworks AB +# Copyright 2025 Hopsworks AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index dde91c657..04a0932ac 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -495,7 +495,7 @@ def save_stream_dataframe( query = ( serialized_df.withColumn( - "headers", self._get_headers(feature_group, dataframe) + "headers", self._get_headers(feature_group) ) .writeStream.outputMode(output_mode) .format(self.KAFKA_FORMAT) @@ -573,7 +573,7 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): ( serialized_df.withColumn( - "headers", self._get_headers(feature_group, dataframe) + "headers", self._get_headers(feature_group, dataframe.count()) ) .write.format(self.KAFKA_FORMAT) .options(**write_options) @@ -592,13 +592,13 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): def _get_headers( self, feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup], - dataframe: Optional[Union[RDD, DataFrame]] = None, + num_entries: Optional[int] = None, ) -> array: return array( *[ struct(lit(key).alias("key"), lit(value).alias("value")) for key, value in kafka_engine.get_headers( - feature_group, dataframe.count() if dataframe else None + feature_group, num_entries ).items() ] ) From 270b4ea714ab1d5930381976cead21e051ca8c44 Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 6 Jan 2025 15:17:18 +0200 Subject: [PATCH 20/25] long to Long --- .../src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java index cb8e81b89..5e0555d4d 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java @@ -34,7 +34,7 @@ public class OnlineIngestion { @Getter @Setter - private long numEntries; + private Long numEntries; @Getter @Setter From 4261040dbffe318a2befd902d6a0d03509aa955c Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 6 Jan 2025 17:35:04 +0200 Subject: [PATCH 21/25] some java fixes --- .../logicalclocks/hsfs/FeatureGroupBase.java | 2 +- .../logicalclocks/hsfs/OnlineIngestion.java | 50 ++++++++++++++++++- .../hsfs/metadata/OnlineIngestionApi.java | 13 +++-- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java index ae38c9bd6..794db5f9d 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java @@ -547,7 +547,7 @@ public Schema getDeserializedAvroSchema() throws FeatureStoreException, IOExcept @JsonIgnore public OnlineIngestion getLatestOnlineIngestion() throws FeatureStoreException, IOException { - return onlineIngestionApi.getOnlineIngestion(this, "filter_by=LATEST"); + return onlineIngestionApi.getOnlineIngestion(this, "filter_by=LATEST").get(0); } } diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java index 5e0555d4d..d8cbc78ec 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java @@ -17,8 +17,15 @@ package com.logicalclocks.hsfs; +import java.io.IOException; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.logicalclocks.hsfs.metadata.OnlineIngestionApi; +import com.logicalclocks.hsfs.metadata.RestDto; + import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -26,7 +33,9 @@ @NoArgsConstructor @AllArgsConstructor -public class OnlineIngestion { +public class OnlineIngestion extends RestDto { + + protected static final Logger LOGGER = LoggerFactory.getLogger(OnlineIngestion.class); @Getter @Setter @@ -64,8 +73,45 @@ public OnlineIngestion(long numEntries) { this.numEntries = numEntries; } - public void waitForCompletion() { + public void refresh() throws FeatureStoreException, IOException { + OnlineIngestion onlineIngestion = new OnlineIngestionApi() + .getOnlineIngestion(featureGroup, "filter_by=ID:" + id).get(0); + + // Method to copy data from another object + this.id = onlineIngestion.id; + this.numEntries = onlineIngestion.numEntries; + this.currentOffsets = onlineIngestion.currentOffsets; + this.processedEntries = onlineIngestion.processedEntries; + this.insertedEntries = onlineIngestion.insertedEntries; + this.abortedEntries = onlineIngestion.abortedEntries; + this.batchResults = onlineIngestion.batchResults; + this.featureGroup = onlineIngestion.featureGroup; + } + public void waitForCompletion(int timeout, int period) + throws InterruptedException, FeatureStoreException, IOException { + long startTime = System.currentTimeMillis(); + + // Convert to milliseconds + timeout = timeout * 1000; + period = period * 1000; + + while (true) { + refresh(); + if (numEntries != null && processedEntries >= numEntries) { + break; + } + + // Check if the timeout has been reached + if (System.currentTimeMillis() - startTime > timeout) { + LOGGER.warn("Timeout of " + timeout + + " was exceeded while waiting for online ingestion completion."); + break; + } + + // Sleep for the specified period in seconds + Thread.sleep(period); + } } } \ No newline at end of file diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java index ed361cae1..8e2118d95 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; public class OnlineIngestionApi { @@ -55,10 +56,12 @@ public OnlineIngestion createOnlineIngestion(FeatureGroupBase featureGroup, Onli LOGGER.info("Sending metadata request: " + uri); - return hopsworksClient.handleRequest(postRequest, OnlineIngestion.class); + onlineIngestion = hopsworksClient.handleRequest(postRequest, OnlineIngestion.class); + onlineIngestion.setFeatureGroup(featureGroup); + return onlineIngestion; } - public OnlineIngestion getOnlineIngestion(FeatureGroupBase featureGroup, String queryParameters) + public List getOnlineIngestion(FeatureGroupBase featureGroup, String queryParameters) throws FeatureStoreException, IOException { HopsworksClient hopsworksClient = HopsworksClient.getInstance(); String pathTemplate = HopsworksClient.PROJECT_PATH @@ -74,6 +77,10 @@ public OnlineIngestion getOnlineIngestion(FeatureGroupBase featureGroup, String LOGGER.info("Sending metadata request: " + uri); - return hopsworksClient.handleRequest(new HttpGet(uri), OnlineIngestion.class); + OnlineIngestion onlineIngestion = hopsworksClient.handleRequest(new HttpGet(uri), OnlineIngestion.class); + for (OnlineIngestion ingestion : onlineIngestion.getItems()) { + ingestion.setFeatureGroup(featureGroup); + } + return onlineIngestion.getItems(); } } From 7bf9ce72456bb36a0424369e05c04d53d1343efa Mon Sep 17 00:00:00 2001 From: bubriks Date: Tue, 7 Jan 2025 10:53:02 +0200 Subject: [PATCH 22/25] if timeout is 0 we will wait indefinitely --- .../main/java/com/logicalclocks/hsfs/OnlineIngestion.java | 6 ++++-- python/hsfs/core/online_ingestion.py | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java index d8cbc78ec..5845ba13b 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java @@ -98,12 +98,14 @@ public void waitForCompletion(int timeout, int period) while (true) { refresh(); + + // Check if the online ingestion is complete if (numEntries != null && processedEntries >= numEntries) { break; } - // Check if the timeout has been reached - if (System.currentTimeMillis() - startTime > timeout) { + // Check if the timeout has been reached (if timeout is 0 we will wait indefinitely) + if (timeout != 0 && System.currentTimeMillis() - startTime > timeout) { LOGGER.warn("Timeout of " + timeout + " was exceeded while waiting for online ingestion completion."); break; diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index fa25cd4aa..df45c0001 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -171,16 +171,19 @@ def wait_for_completion(self, options: Dict[str, Any] = None): progress_bar.n = self.processed_entries progress_bar.refresh() + # Check if the online ingestion is complete if self.num_entries and self.processed_entries >= self.num_entries: break - if datetime.now() >= timeout_time: + # Check if the timeout has been reached (if timeout is 0 we will wait indefinitely) + if timeout_delta != timedelta(0) and datetime.now() >= timeout_time: warnings.warn( f"Timeout of {timeout_delta} was exceeded while waiting for online ingestion completion.", stacklevel=1, ) break + # Sleep for the specified period in seconds time.sleep(options.get("period", 1)) self.refresh() From e30f2d5f509017dcbbf6948cb633380c6a3a3cff Mon Sep 17 00:00:00 2001 From: bubriks Date: Tue, 7 Jan 2025 16:02:15 +0200 Subject: [PATCH 23/25] get latest fix for java --- .../com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java index 8e2118d95..2e456bfea 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java @@ -35,7 +35,7 @@ public class OnlineIngestionApi { private static final Logger LOGGER = LoggerFactory.getLogger(OnlineIngestionApi.class); - public static final String ONLINE_INGESTION_PATH = "/featuregroups/{fgId}/online_ingestion{?queryParameters}"; + public static final String ONLINE_INGESTION_PATH = "/featuregroups/{fgId}/online_ingestion"; public OnlineIngestion createOnlineIngestion(FeatureGroupBase featureGroup, OnlineIngestion onlineIngestion) throws FeatureStoreException, IOException { @@ -72,10 +72,10 @@ public List getOnlineIngestion(FeatureGroupBase featureGroup, S .set("projectId", hopsworksClient.getProject().getProjectId()) .set("fsId", featureGroup.getFeatureStore().getId()) .set("fgId", featureGroup.getId()) - .set("queryParameters", queryParameters) - .expand(); + .expand() + "?" + queryParameters; LOGGER.info("Sending metadata request: " + uri); + System.out.println(uri); OnlineIngestion onlineIngestion = hopsworksClient.handleRequest(new HttpGet(uri), OnlineIngestion.class); for (OnlineIngestion ingestion : onlineIngestion.getItems()) { From 21442926389e5156875d8482fd264216dad291df Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 9 Jan 2025 13:49:57 +0200 Subject: [PATCH 24/25] rows ignored --- .../logicalclocks/hsfs/OnlineIngestion.java | 19 +++++----- python/hsfs/core/online_ingestion.py | 35 ++++++++++--------- .../fixtures/online_ingestion_fixtures.json | 6 ++-- 3 files changed, 33 insertions(+), 27 deletions(-) diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java index 5845ba13b..6bdb4ee0c 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java @@ -51,15 +51,15 @@ public class OnlineIngestion extends RestDto { @Getter @Setter - private Integer processedEntries; + private Integer rowsUpserted; @Getter @Setter - private Integer insertedEntries; + private Integer rowsFailed; @Getter @Setter - private Integer abortedEntries; + private Integer rowsIgnored; @Getter @Setter @@ -81,9 +81,9 @@ public void refresh() throws FeatureStoreException, IOException { this.id = onlineIngestion.id; this.numEntries = onlineIngestion.numEntries; this.currentOffsets = onlineIngestion.currentOffsets; - this.processedEntries = onlineIngestion.processedEntries; - this.insertedEntries = onlineIngestion.insertedEntries; - this.abortedEntries = onlineIngestion.abortedEntries; + this.rowsUpserted = onlineIngestion.rowsUpserted; + this.rowsFailed = onlineIngestion.rowsFailed; + this.rowsIgnored = onlineIngestion.rowsIgnored; this.batchResults = onlineIngestion.batchResults; this.featureGroup = onlineIngestion.featureGroup; } @@ -97,10 +97,11 @@ public void waitForCompletion(int timeout, int period) period = period * 1000; while (true) { - refresh(); + // Get total number of rows processed + long rowsProcessed = rowsUpserted + rowsFailed + rowsIgnored; // Check if the online ingestion is complete - if (numEntries != null && processedEntries >= numEntries) { + if (numEntries != null && rowsProcessed >= numEntries) { break; } @@ -113,6 +114,8 @@ public void waitForCompletion(int timeout, int period) // Sleep for the specified period in seconds Thread.sleep(period); + + refresh(); } } diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index df45c0001..fb4694a25 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -45,9 +45,9 @@ def __init__( id: Optional[int] = None, num_entries: Optional[int] = None, current_offsets: Optional[str] = None, - processed_entries: Optional[int] = None, - inserted_entries: Optional[int] = None, - aborted_entries: Optional[int] = None, + rows_upserted: Optional[int] = None, + rows_failed: Optional[int] = None, + rows_ignored: Optional[int] = None, batch_results: Union[ List[online_ingestion_batch_result.OnlineIngestionBatchResult], List[Dict[str, Any]], @@ -58,9 +58,9 @@ def __init__( self._id = id self._num_entries = num_entries # specified when inserting (optional since might not be specified when using streaming) self._current_offsets = current_offsets - self._processed_entries = processed_entries - self._inserted_entries = inserted_entries - self._aborted_entries = aborted_entries + self._rows_upserted = rows_upserted + self._rows_failed = rows_failed + self._rows_ignored = rows_ignored self._batch_results = ( [ ( @@ -129,16 +129,16 @@ def current_offsets(self) -> Optional[str]: return self._current_offsets @property - def processed_entries(self) -> int: - return 0 if self._processed_entries is None else self._processed_entries + def rows_upserted(self) -> int: + return 0 if self._rows_upserted is None else self._rows_upserted @property - def inserted_entries(self) -> int: - return 0 if self._inserted_entries is None else self._inserted_entries + def rows_failed(self) -> int: + return 0 if self._rows_failed is None else self._rows_failed @property - def aborted_entries(self) -> int: - return 0 if self._aborted_entries is None else self._aborted_entries + def rows_ignored(self) -> int: + return 0 if self._rows_ignored is None else self._rows_ignored @property def batch_results( @@ -165,14 +165,17 @@ def wait_for_completion(self, options: Dict[str, Any] = None): mininterval=1, ) as progress_bar: while True: - if self.aborted_entries: - progress_bar.colour = "RED" + # Get total number of rows processed + rows_processed = self.rows_upserted + self.rows_failed + self.rows_ignored - progress_bar.n = self.processed_entries + # Update progress bar + if self.rows_failed or self.rows_ignored: + progress_bar.colour = "RED" + progress_bar.n = rows_processed progress_bar.refresh() # Check if the online ingestion is complete - if self.num_entries and self.processed_entries >= self.num_entries: + if self.num_entries and rows_processed >= self.num_entries: break # Check if the timeout has been reached (if timeout is 0 we will wait indefinitely) diff --git a/python/tests/fixtures/online_ingestion_fixtures.json b/python/tests/fixtures/online_ingestion_fixtures.json index 23d804e60..4adf11e2e 100644 --- a/python/tests/fixtures/online_ingestion_fixtures.json +++ b/python/tests/fixtures/online_ingestion_fixtures.json @@ -6,9 +6,9 @@ { "id": 1, "num_entries": 10, - "processed_entries": 8, - "inserted_entries": 6, - "aborted_entries": 2 + "rows_upserted": 8, + "rows_failed": 6, + "rows_ignored": 2 } ] } From 6bee2533921bb46d42fe5fcc48c9851035502e35 Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 13 Jan 2025 17:35:21 +0200 Subject: [PATCH 25/25] dont create online ingestion for offline fg --- .../hsfs/engine/FeatureGroupUtils.java | 12 ++++++--- python/hsfs/core/kafka_engine.py | 27 ++++++++++--------- python/tests/core/test_kafka_engine.py | 26 ++++++++++++++++++ 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java index 9ce23ec98..a6da9a258 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java @@ -254,16 +254,20 @@ public String getDatasetType(String path) { public static Map getHeaders(FeatureGroupBase featureGroup, Long numEntries) throws FeatureStoreException, IOException { Map headerMap = new HashMap<>(); - OnlineIngestion onlineIngestion = new OnlineIngestionApi() - .createOnlineIngestion(featureGroup, new OnlineIngestion(numEntries)); headerMap.put("projectId", String.valueOf(featureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8)); headerMap.put("featureGroupId", String.valueOf(featureGroup.getId()).getBytes(StandardCharsets.UTF_8)); headerMap.put("subjectId", String.valueOf(featureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8)); - headerMap.put("onlineIngestionId", - String.valueOf(onlineIngestion.getId()).getBytes(StandardCharsets.UTF_8)); + + if (featureGroup.getOnlineEnabled()) { + OnlineIngestion onlineIngestion = new OnlineIngestionApi() + .createOnlineIngestion(featureGroup, new OnlineIngestion(numEntries)); + + headerMap.put("onlineIngestionId", + String.valueOf(onlineIngestion.getId()).getBytes(StandardCharsets.UTF_8)); + } return headerMap; } diff --git a/python/hsfs/core/kafka_engine.py b/python/hsfs/core/kafka_engine.py index 31fb45705..22bb5c084 100644 --- a/python/hsfs/core/kafka_engine.py +++ b/python/hsfs/core/kafka_engine.py @@ -120,25 +120,26 @@ def get_headers( feature_group: Union[FeatureGroup, ExternalFeatureGroup], num_entries: Optional[int] = None, ) -> Dict[str, bytes]: - # setup online ingestion - online_ingestion_instance = online_ingestion.OnlineIngestion( - num_entries=num_entries - ) - - online_ingestion_instance = ( - online_ingestion_api.OnlineIngestionApi().create_online_ingestion( - feature_group, online_ingestion_instance - ) - ) - # custom headers for hopsworks onlineFS - return { + headers = { "projectId": str(feature_group.feature_store.project_id).encode("utf8"), "featureGroupId": str(feature_group._id).encode("utf8"), "subjectId": str(feature_group.subject["id"]).encode("utf8"), - "onlineIngestionId": str(online_ingestion_instance.id).encode("utf8"), } + if feature_group.online_enabled: + # setup online ingestion id + online_ingestion_instance = ( + online_ingestion_api.OnlineIngestionApi().create_online_ingestion( + feature_group, online_ingestion.OnlineIngestion( + num_entries=num_entries + ) + ) + ) + headers["onlineIngestionId"] = str(online_ingestion_instance.id).encode("utf8") + + return headers + @uses_confluent_kafka def init_kafka_producer( diff --git a/python/tests/core/test_kafka_engine.py b/python/tests/core/test_kafka_engine.py index 70275efe4..8ccbe46c1 100644 --- a/python/tests/core/test_kafka_engine.py +++ b/python/tests/core/test_kafka_engine.py @@ -528,6 +528,31 @@ def test_spark_get_kafka_config_internal_kafka(self, mocker, backend_fixtures): def test_get_headers(self, mocker, backend_fixtures): # Arrange mocker.patch("hopsworks_common.client.get_instance") + + fg = feature_group.FeatureGroup( + id=111, + name="test", + version=1, + featurestore_id=99, + ) + fg.feature_store = mocker.Mock() + fg.feature_store.project_id = 234 + + fg._subject = {"id": 823} + + # Act + results = kafka_engine.get_headers(fg, num_entries=10) + + # Assert + assert results == { + "featureGroupId": b"111", + "projectId": b"234", + "subjectId": b"823", + } + + def test_get_headers_online_ingestion(self, mocker, backend_fixtures): + # Arrange + mocker.patch("hopsworks_common.client.get_instance") mock_online_ingestion_api = mocker.patch( "hsfs.core.online_ingestion_api.OnlineIngestionApi" ) @@ -540,6 +565,7 @@ def test_get_headers(self, mocker, backend_fixtures): name="test", version=1, featurestore_id=99, + online_enabled=True, ) fg.feature_store = mocker.Mock() fg.feature_store.project_id = 234