From 5076157011d79796d606dc270c5e32dbb77a9995 Mon Sep 17 00:00:00 2001 From: "Jeffrey Jonathan Jennings (J3)" Date: Sat, 30 Nov 2024 23:45:42 -0500 Subject: [PATCH] Resolved #515. --- .../kickstarter/AvroDataGeneratorApp.java | 20 +++++++++++++++---- .../kickstarter/JsonDataGeneratorApp.java | 20 +++++++++++++++---- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/java/app/src/main/java/kickstarter/AvroDataGeneratorApp.java b/java/app/src/main/java/kickstarter/AvroDataGeneratorApp.java index dfc69b2..c1e2640 100644 --- a/java/app/src/main/java/kickstarter/AvroDataGeneratorApp.java +++ b/java/app/src/main/java/kickstarter/AvroDataGeneratorApp.java @@ -71,18 +71,30 @@ public static void main(String[] args) throws Exception { // --- Create a blank Flink execution environment (a.k.a. the Flink job graph -- the DAG). StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); - // --- Enable checkpointing every 5000 milliseconds (5 seconds). + /* + * Enable checkpointing every 5000 milliseconds (5 seconds). Note, consider the + * resource cost of checkpointing frequency, as short intervals can lead to higher + * I/O and CPU overhead. Proper tuning of checkpoint intervals depends on the + * state size, latency requirements, and resource constraints. + */ env.enableCheckpointing(5000); /* - * Set timeout to 60 seconds - * The maximum amount of time a checkpoint attempt can take before being discarded. + * Set checkpoint timeout to 60 seconds, which is the maximum amount of time a + * checkpoint attempt can take before being discarded. Note, setting an appropriate + * checkpoint timeout helps maintain a balance between achieving exactly-once semantics + * and avoiding excessive delays that can impact real-time stream processing performance. */ env.getCheckpointConfig().setCheckpointTimeout(60000); /* * Set the maximum number of concurrent checkpoints to 1 (i.e., only one checkpoint - * is created at a time). + * is created at a time). Note, this is useful for limiting resource usage and + * ensuring checkpoints do not interfere with each other, but may impact throughput + * if checkpointing is slow. Adjust this setting based on the nature of your job, + * the size of the state, and available resources. If your environment has enough + * resources and you want to ensure faster recovery, you could increase the limit + * to allow multiple concurrent checkpoints. */ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); diff --git a/java/app/src/main/java/kickstarter/JsonDataGeneratorApp.java b/java/app/src/main/java/kickstarter/JsonDataGeneratorApp.java index c7c207e..a91e2d4 100644 --- a/java/app/src/main/java/kickstarter/JsonDataGeneratorApp.java +++ b/java/app/src/main/java/kickstarter/JsonDataGeneratorApp.java @@ -61,18 +61,30 @@ public static void main(String[] args) throws Exception { // --- Create a blank Flink execution environment (a.k.a. the Flink job graph -- the DAG). StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // --- Enable checkpointing every 5000 milliseconds (5 seconds). + /* + * Enable checkpointing every 5000 milliseconds (5 seconds). Note, consider the + * resource cost of checkpointing frequency, as short intervals can lead to higher + * I/O and CPU overhead. Proper tuning of checkpoint intervals depends on the + * state size, latency requirements, and resource constraints. + */ env.enableCheckpointing(5000); /* - * Set timeout to 60 seconds - * The maximum amount of time a checkpoint attempt can take before being discarded. + * Set checkpoint timeout to 60 seconds, which is the maximum amount of time a + * checkpoint attempt can take before being discarded. Note, setting an appropriate + * checkpoint timeout helps maintain a balance between achieving exactly-once semantics + * and avoiding excessive delays that can impact real-time stream processing performance. */ env.getCheckpointConfig().setCheckpointTimeout(60000); /* * Set the maximum number of concurrent checkpoints to 1 (i.e., only one checkpoint - * is created at a time). + * is created at a time). Note, this is useful for limiting resource usage and + * ensuring checkpoints do not interfere with each other, but may impact throughput + * if checkpointing is slow. Adjust this setting based on the nature of your job, + * the size of the state, and available resources. If your environment has enough + * resources and you want to ensure faster recovery, you could increase the limit + * to allow multiple concurrent checkpoints. */ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);