-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Await commits during revoke #830
Conversation
Question - are you trying to make it work while using external offsets (not commiting to kafka)? Or is it intended to work only in case when we commit to kafka using |
First of all, the new mode will be optional and not enabled by default. It should be kept disabled for external commits. However, supporting external commits would be very nice (thanks for asking!). We should try that in a later PR. Would you like to create an issue for that? Bonus if you have suggestions for the API or on how to implement it. |
But from what I understand it would be possible to use this mode when I would commit both externally and internally, right? If only commits to kafka happen after external commits then it should work as intended. I will create the issue for the case with external-only commits but I don't have any idea yet on how to implement it. |
Yes, that would work. |
I now have a working draft. It requires a patched kafka library (it must contain PR apache/kafka#13678) and a patched embedded-kafka. |
bb60695
to
ac7f61b
Compare
b0a5660
to
e1aaf8a
Compare
This PR adds the setting `rebalanceSafeCommits`. When set to `true`, we hold up a rebalance until offsets for revoked partitions are committed. This allows the client that gets the partition assigned hereafter to start at the exact right offset, preventing duplicate processing. When `rebalanceSafeCommits` is set to `false` the previous behavior is active. This lets the old stream finish its work in the background while the new stream is already starting. The old behavior causes duplicate processing but can be a bit more performant since rebalances are not hold up. In order to handle commits while waiting for the streams to end, commits are added to a separate queue. To make sure the main poll loop knows about these commits, we also place a `CommitAvailable` value on the command queue. The rebalance listener ZIO effects (and other callbacks) are executed on the calling thread. This is required because the underlying Java client requires single threaded access. It rejects multiple concurrent invocation unless they are from the same thread, that is, the thread that executes the poll. (Remember, the rebalance listener and other callbacks are invoked by the Java client during a call to poll or commit.) Note though that ZIO operations that require time are not possible since these cause the workflow to shift to another thread. Once we know that all commits have been sent to the broker, we need to wait till the callbacks are called. We do this by calling `commitSync` with an empty map of offsets. CommitSync guarantees that all previously send commits complete before commitSync returns. (NOTE: requires patched Kafka client.) The rebalance listener no longer manipulates the state. The rebalance event only collects changes. The main poll loop then use those changes to construct the next state. When the rebalance listener was not invoked, we skip some tasks for a small performance improvement. Added WIP RebalanceSafeCommitConsumerSpec as a testbed for improved unit tests (and also to test the new commit-on-rebalance feature) Also: - End all streams when only onLost is invoked. - Disable zio-intellij's inspection `SimplifyWhenInspection` in Runloop because its suggestion is not equivalent (performance-wise). - Configure a low `max.poll.records` (100 instead of 1000) for tests. There was a concurrency bug that only failed a test the lower setting. The benchmarks continue to use 1000 for `max.poll.records`. - Prevent deadlock by doing unsubscribe with a single consumer access. - Await stream end on unsubscribe and shutdown as well. - Removed method `Runloop.awaitShutdown`, it is not used, not accessible and its semantics are questionable.
e1aaf8a
to
14732c0
Compare
The commit throughput that is supported by Kafka brokers is much lower than the consume throughput. Therefore, to retain a high consume throughput it is important that not every record's offset is committed. In this PR we automatically merge all commits that were generated in the course of a single run of the runloop. This frees users from having to merge streams and do the commit merging themselves. Commits are placed in a separate queue. Although this is not really needed for this change, we do need this in the next PR, where we start handling commits from the rebalance listener (see #830).
The commit throughput that is supported by Kafka brokers is much lower than the consume throughput. Therefore, to retain a high consume throughput it is important that not every record's offset is committed. In this PR we automatically merge all commits that were generated in the course of a single run of the runloop. This frees users from having to merge streams and do the commit merging themselves. Commits are placed in a separate queue. Although this is not really needed for this change, we do need this in the next PR, where we start handling commits from the rebalance listener (see #830).
Previously, streams would only be ended in the rebalance listener for revoked partitions. Now, they are ended there even when `restartStreamsOnRebalancing` is used. Lost partitions are no longer treated as being revoked. With this change, streams of lost partitions are interrupted. Interrupting them prevents these streams from processing and committing more data. A nice side effect is that Zio-kafka is now faster when the rebalance listener was _not_ called; the 'fast track'. The main reason for this change is to prepare awaiting commits from within the rebalance listener which will prevent duplicate consuming of records (see #830). Also: fix test `restartStreamsOnRebalancing mode closes all partition streams` so that it detects rebalances properly on fast computers.
Previously, streams would only be ended in the rebalance listener for revoked partitions. Now, they are ended there even when `restartStreamsOnRebalancing` is used. Lost partitions are no longer treated as being revoked. With this change, streams of lost partitions are interrupted. Interrupting them prevents these streams from processing and committing more data. A nice side effect is that Zio-kafka is now faster when the rebalance listener was _not_ called; the 'fast track'. The main reason for this change is to prepare awaiting commits from within the rebalance listener which will prevent duplicate consuming of records (see #830). Also: fix test `restartStreamsOnRebalancing mode closes all partition streams` so that it detects rebalances properly on fast computers.
For #830 we need to keep track of which records were pulled from the stream so that we can wait for them to be committed. This change prepares for that. The idea is that the rebalance listener first ends the streams for revoked partitions, then it will get the last consumed offset using the now public `completedPromise`. With that info, it can await these commits to complete. The advantage of this approach is that it is very precise; we only await offset commits for records that we know were pulled from the stream. Alternative implementation would track the given offsets inside Runloop where we do not have the exact knowledge of what was pulled or not.
Previously, streams would only be ended in the rebalance listener for revoked partitions. Now, they are ended there even when `restartStreamsOnRebalancing` is used. Lost partitions are no longer treated as being revoked. With this change, streams of lost partitions are interrupted. Interrupting them prevents these streams from processing and committing more data. A nice side effect is that Zio-kafka is now faster when the rebalance listener was _not_ called; the 'fast track'. The main reason for this change is to prepare awaiting commits from within the rebalance listener which will prevent duplicate consuming of records (see #830). Also: fix test `restartStreamsOnRebalancing mode closes all partition streams` so that it detects rebalances properly on fast computers.
Previously, streams would only be ended in the rebalance listener for revoked partitions. Now, they are ended there even when `restartStreamsOnRebalancing` is used. Lost partitions are no longer treated as being revoked. With this change, streams of lost partitions are interrupted. Interrupting them prevents these streams from processing and committing more data. A nice side effect is that Zio-kafka is now faster when the rebalance listener was _not_ called; the 'fast track'. The main reason for this change is to prepare awaiting commits from within the rebalance listener which will prevent duplicate consuming of records (see #830). Also: fix test `restartStreamsOnRebalancing mode closes all partition streams` so that it detects rebalances properly on fast computers.
) Previously, streams would only be ended in the rebalance listener for revoked partitions. Now, they are ended there even when `restartStreamsOnRebalancing` is used. Lost partitions are no longer treated as being revoked. With this change, streams of lost partitions are interrupted. Interrupting them prevents these streams from processing and committing more data. A nice side effect is that Zio-kafka is now faster when the rebalance listener was _not_ called; the 'fast track'. The main reason for this change is to prepare awaiting commits from within the rebalance listener which will prevent duplicate consuming of records (see #830). Also: prevent storing pending requests for streams that are no longer assigned.
By tracking these offsets we can skip awaiting already completed commits from the rebalance listener in #830. To prevent unbounded memory usage, after a rebalance we remove the committed offset for partitions that are no longer assigned to this consumer. Note that a commit might complete just after a partition was revoked. This is not a big issue; the offset will still be removed in the next rebalance. When the `rebalanceSafeCommits` feature is available (see #830) commits will complete in the rebalance listener and this cannot happen anymore.
By tracking these offsets we can skip awaiting already completed commits from the rebalance listener in #830. To prevent unbounded memory usage, after a rebalance we remove the committed offset for partitions that are no longer assigned to this consumer. Note that a commit might complete just after a partition was revoked. This is not a big issue; the offset will still be removed in the next rebalance. When the `rebalanceSafeCommits` feature is available and enabled (see #830) commits will complete in the rebalance listener and this cannot happen anymore. The offsets map is wrapped in a case class for 2 reasons: * It provides a very nice place to put the updating methods. * Having updating methods makes the code that uses `CommitOffsets` very concise.
Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: fd4s/fs2-kafka#1200 - alpakka-kafka: akka/alpakka-kafka#1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
Replaced by #1098. |
Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Experimental Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: fd4s/fs2-kafka#1200 - alpakka-kafka: akka/alpakka-kafka#1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Experimental Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: fd4s/fs2-kafka#1200 - alpakka-kafka: akka/alpakka-kafka#1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Experimental Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: fd4s/fs2-kafka#1200 - alpakka-kafka: akka/alpakka-kafka#1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Experimental Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: fd4s/fs2-kafka#1200 - alpakka-kafka: akka/alpakka-kafka#1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
Replaced by #1098.
To solve #590, in this PR we introduce a new mode that holds up a rebalance until the offsets of messages provided to streams of revoked partitions have been committed.
Motivation
Here is a common (single partition) scenario around rebalances:
Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable.
After merging this PR, the scenario will unfold as follows:
Complications
ZIO.timeout
, still make processing jump to another thread. Therefore, we cannot useZIO.timeout
to limit the time we spend in the onRevoke callback.Related
The same issue is present in:
In fact, every program that does polls and commits asynchronously is likely affected.
Things to do
Non-goals
This PR does not try to solve the following goals. However, these can be addressed in future PRs.
This branch is based on the work of abandoned #788.