Skip to content

Implementing max retries with batching enabled #731

Closed Answered by rkolesnev
CharlyRien asked this question in Q&A
Discussion options

You must be logged in to vote

Hi @CharlyRien,

I believe you are right in that batches can be reshuffled as they are taken out of processing into retry queue on error and work is redistributed for processing again once retry backoff is elapsed.
You can track number of previous retries per message in the user function and filter them out of the batch before submitting it for processing - for example:

        parallelConsumer.poll(recordContexts -> {
            //Filter out messages that already retried too many times
            List<PollContext<String, String>> recordsToProcessFiltered = recordContexts.stream()
                    .filter(recordContext -> recordContext.getNumberOfFailedAttempts() < maxRetries).collect…

Replies: 1 comment 1 reply

Comment options

You must be logged in to vote
1 reply
@CharlyRien
Comment options

Answer selected by rkolesnev
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
2 participants