Skip to content

Commit

Permalink
Merge pull request #516 from j3-signalroom/github_issue-515
Browse files Browse the repository at this point in the history
Resolved #515.
  • Loading branch information
j3-signalroom authored Dec 1, 2024
2 parents 078d7d9 + 5076157 commit ef2b641
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
20 changes: 16 additions & 4 deletions java/app/src/main/java/kickstarter/AvroDataGeneratorApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,30 @@ public static void main(String[] args) throws Exception {
// --- Create a blank Flink execution environment (a.k.a. the Flink job graph -- the DAG).
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

// --- Enable checkpointing every 5000 milliseconds (5 seconds).
/*
* Enable checkpointing every 5000 milliseconds (5 seconds). Note, consider the
* resource cost of checkpointing frequency, as short intervals can lead to higher
* I/O and CPU overhead. Proper tuning of checkpoint intervals depends on the
* state size, latency requirements, and resource constraints.
*/
env.enableCheckpointing(5000);

/*
* Set timeout to 60 seconds
* The maximum amount of time a checkpoint attempt can take before being discarded.
* Set checkpoint timeout to 60 seconds, which is the maximum amount of time a
* checkpoint attempt can take before being discarded. Note, setting an appropriate
* checkpoint timeout helps maintain a balance between achieving exactly-once semantics
* and avoiding excessive delays that can impact real-time stream processing performance.
*/
env.getCheckpointConfig().setCheckpointTimeout(60000);

/*
* Set the maximum number of concurrent checkpoints to 1 (i.e., only one checkpoint
* is created at a time).
* is created at a time). Note, this is useful for limiting resource usage and
* ensuring checkpoints do not interfere with each other, but may impact throughput
* if checkpointing is slow. Adjust this setting based on the nature of your job,
* the size of the state, and available resources. If your environment has enough
* resources and you want to ensure faster recovery, you could increase the limit
* to allow multiple concurrent checkpoints.
*/
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

Expand Down
20 changes: 16 additions & 4 deletions java/app/src/main/java/kickstarter/JsonDataGeneratorApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,30 @@ public static void main(String[] args) throws Exception {
// --- Create a blank Flink execution environment (a.k.a. the Flink job graph -- the DAG).
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// --- Enable checkpointing every 5000 milliseconds (5 seconds).
/*
* Enable checkpointing every 5000 milliseconds (5 seconds). Note, consider the
* resource cost of checkpointing frequency, as short intervals can lead to higher
* I/O and CPU overhead. Proper tuning of checkpoint intervals depends on the
* state size, latency requirements, and resource constraints.
*/
env.enableCheckpointing(5000);

/*
* Set timeout to 60 seconds
* The maximum amount of time a checkpoint attempt can take before being discarded.
* Set checkpoint timeout to 60 seconds, which is the maximum amount of time a
* checkpoint attempt can take before being discarded. Note, setting an appropriate
* checkpoint timeout helps maintain a balance between achieving exactly-once semantics
* and avoiding excessive delays that can impact real-time stream processing performance.
*/
env.getCheckpointConfig().setCheckpointTimeout(60000);

/*
* Set the maximum number of concurrent checkpoints to 1 (i.e., only one checkpoint
* is created at a time).
* is created at a time). Note, this is useful for limiting resource usage and
* ensuring checkpoints do not interfere with each other, but may impact throughput
* if checkpointing is slow. Adjust this setting based on the nature of your job,
* the size of the state, and available resources. If your environment has enough
* resources and you want to ensure faster recovery, you could increase the limit
* to allow multiple concurrent checkpoints.
*/
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

Expand Down

0 comments on commit ef2b641

Please sign in to comment.