From eb6fa033c2324686fa3bd5fd915acc127c168448 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 4 Dec 2024 15:06:54 +0200 Subject: [PATCH 1/5] init --- utils/python/hsfs_utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index ac2f23de2..9e200b3d2 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -301,13 +301,15 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in .option("includeHeaders", "true") .option("failOnDataLoss", "false") .load() - .limit(5000000) ) # filter only the necassary entries df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) df = df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"])) + # limit the number of records ingested + df = df.limit(5000000) + # deserialize dataframe so that it can be properly saved deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df) From e4bc3576e67b0115a04d8f53d0a581212f8cb0b7 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 4 Dec 2024 15:25:37 +0200 Subject: [PATCH 2/5] move update offsets --- utils/python/hsfs_utils.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 9e200b3d2..b04a23b18 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -301,15 +301,19 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in .option("includeHeaders", "true") .option("failOnDataLoss", "false") .load() + .limit(5000000) ) + # update offsets + df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect() + offset_dict = json.loads(offset_string) + for offset_row in df_offsets: + offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1 + # filter only the necassary entries df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) df = df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"])) - # limit the number of records ingested - df = df.limit(5000000) - # deserialize dataframe so that it can be properly saved deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df) @@ -317,12 +321,6 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in entity.stream = False # to make sure we dont write to kafka entity.insert(deserialized_df, storage="offline") - # update offsets - df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect() - offset_dict = json.loads(offset_string) - for offset_row in df_offsets: - offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1 - # save offsets offset_df = spark.createDataFrame([offset_dict]) offset_df.coalesce(1).write.mode("overwrite").json(offset_location) From c2eac063a3f4f095cdfa117308452f745c4c51fd Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 4 Dec 2024 15:41:56 +0200 Subject: [PATCH 3/5] choose offsets based on limit --- utils/python/hsfs_utils.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index b04a23b18..b431aea49 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -301,26 +301,30 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in .option("includeHeaders", "true") .option("failOnDataLoss", "false") .load() - .limit(5000000) ) - # update offsets - df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect() - offset_dict = json.loads(offset_string) - for offset_row in df_offsets: - offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1 - # filter only the necassary entries - df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) - df = df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"])) + filtered_df = filtered_df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) + filtered_df = filtered_df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"])) + + limit = 5000000 + + # limit the number of records ingested + filtered_df = filtered_df.limit(limit) # deserialize dataframe so that it can be properly saved - deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df) + deserialized_df = engine.get_instance()._deserialize_from_avro(entity, filtered_df) # insert data entity.stream = False # to make sure we dont write to kafka entity.insert(deserialized_df, storage="offline") + # update offsets + df_offsets = (df if limit > filtered_df.count() else filtered_df).groupBy('partition').agg(max('offset').alias('offset')).collect() + offset_dict = json.loads(offset_string) + for offset_row in df_offsets: + offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1 + # save offsets offset_df = spark.createDataFrame([offset_dict]) offset_df.coalesce(1).write.mode("overwrite").json(offset_location) From ac19f2d70b1baa301a5c58919f6476b268556163 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 4 Dec 2024 15:42:57 +0200 Subject: [PATCH 4/5] fix --- utils/python/hsfs_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index b431aea49..4038c1b6e 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -304,7 +304,7 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in ) # filter only the necassary entries - filtered_df = filtered_df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) + filtered_df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) filtered_df = filtered_df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"])) limit = 5000000 From 7db7c5469c688f25a80567717523cb0920a35e66 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 5 Dec 2024 17:26:30 +0200 Subject: [PATCH 5/5] add job_limit --- utils/python/hsfs_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 4038c1b6e..fc226dca4 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -307,9 +307,8 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in filtered_df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) filtered_df = filtered_df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"])) - limit = 5000000 - # limit the number of records ingested + limit = job_conf.get("write_options", {}).get("job_limit", 5000000) filtered_df = filtered_df.limit(limit) # deserialize dataframe so that it can be properly saved