retriable-kafka-consumer
is a library for retriable event-processing using kafka.
If processing of a kafka-record throws an unhandled exception, the record is re-sent to a retry-topic.
The record will then be retried until it is successfully processed without exceptions or the retryPeriod
is exceeded.
This library is suitable for low to medium volume topics where reliability is important. Avoid use cases like high volume log analytics, click stream aggregations, dashboards etc where an occasional lost message is acceptable.
Since this library re-posts your messages to a new topic, the order of the messages is not preserved. So if you depend on in-order-processing, this library is not for you.
From maven central:
implementation 'no.finn.retriable-kafka-consumer:retriable-kafka-consumer:1.55'
or
<dependency>
<groupId>no.finn.retriable-kafka-consumer</groupId>
<artifactId>retriable-kafka-consumer</artifactId>
<version>1.55</version>
</dependency>
-
In your kafka-config, turn off auto-commit. The library will handle commits.
-
You need to implement a
KafkaClientFactory
:public interface KafkaClientFactory<K, V> { Consumer<K, V> consumer(); Producer<K, V> producer(); String groupId(); }
Please notice that consumer and producer may be called multiple times. producer()
must return the same producer, but consumer()
must return a new
consumer for every call
pool = new ReliablePoolBuilder<>(eventClientFactory)
.topics(List.of(TOPIC))
.processingFunction(record -> process(record, adStore, insightEnqueuer))
.build();
pool.monitor.start();
// build a consumer-pool
ReliableKafkaConsumerPool<String,String> pool = new ReliablePoolBuilder<>(eventClientFactory)
.topics(List.of(TOPIC, TOPIC2)) // list of topics to subscribe to, will generate retry topics
.poolCount(5) // optional: number of threads consuming from kafka
.processingFunction(record -> process(record)) // required : a function that processes records
.logHandler(new DefaultLogHandler()) // optional : logging implementation (a default will be provided if omitted)
.expiredHandler(t -> System.out.println("t = " + t)) // optional: what to do with expired messages
.retryPeriodMillis(24 * 60 * 60 * 1000) // optional: how long a message should be retried before given up on
.retryThrottleMillis(5_000) // optional: specify how fast a message should be retried
.build();
pool.monitor.start(); // start the pool
// optional: check if all consumer-threads are alive, recommended to check these every minute or so
pool.monitor.monitor()
// build a consumer-pool
ReliableKafkaConsumerPool<String,String> pool = new ReliablePoolBuilder<>(eventClientFactory)
.topicsRetryTopics(Collections.singletonMap(TOPIC, RETRY_TOPIC)) //explicitly set retry topic.
.poolCount(5) // optional: number of threads consuming from kafka
.processingFunction(record -> process(record)) // required : a function that processes records
.retryPeriodMillis(24 * 60 * 60 * 1000) // optional: how long a message should be retried before given up on
.retryThrottleMillis(5_000) // optional: specify how fast a message should be retried
.build();
pool.monitor.start(); // start the pool
// optional: check if all consumer-threads are alive, recommended to check these every minute or so
pool.monitor.monitor()
This library is inspired by an article written by Über Engineering, but takes a slightly different approach. Instead of using n retry-topics and a dead-letter-queue, we use 1 retry queue and timestamps/counters in the record header to check if a record is expired.
The retry-topic will be named "retry--", and the pool will consume messages from this topic in the same way as the original topic. Consumer-threads are made restartable, meaning if any of the threads die for some reason they can be restarted by the monitor.
Reprocessing is controlled by headers on the kafka-record to keep track of how many times a record has been processed and a timestamp for the initial retry-attempt.
Metric counters are exposed via prometheus.
- expired_events, number of events that has been reprocessed until
retryPeriod
is exceeded. - failed_events, number of events that has failed and will be reprocessed
- processed_successfully_events, number or successful events processed
These are all in the namespace of the application, the system-property or system-environment variable "ARTIFACT_NAME". If "ARTIFACT_NAME" is not specified, the value "unknown app" will be used instead.
- Use java 8 (e.g. 8.0.222-amzn)
- Update pom version to final version, commit and push
- Upload to nexus:
MacOs only:
$ export GPG_TTY=$(tty)
All platforms:
$ mvn clean deploy -Psign
$ mvn nexus-staging:release
- Tag current revision with "v"
- Push tags
- Increment current version to -SNAPSHOT