-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-6723 Allow specifying partitions for EventHub #51
DBZ-6723 Allow specifying partitions for EventHub #51
Conversation
Adds supports to detect the number of partitions in the supplied EventHub producer, create batches for every partition, and to assign EventData to a specific partition/batch, in a new BatchManager class. When a batch hits its maximum size (1024KB), the batch is emitted and a new batch is created. When all incoming records have been processed all open/remaining batches are emitted if they have any EventData assigned. As creating an EventDataBatch initiates a connection over AMQP, that adds quite a bit of overhead. By putting the batch creation behind a proxy, we only create the real EventDataBatch when it is necessary, saving a bit of overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@slknijnenburg Thanks a lot of PR, it is good start but there might stil be issues to handle - mostly event committng.
debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java
Show resolved
Hide resolved
debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java
Outdated
Show resolved
Hide resolved
...ium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
else { | ||
partitionId = record.partition(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if partition is empty and netiher partitionID nor partitionKey is set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the partiiton is not matching the partition count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points! I've added both a configuration and a runtime check to ensure that the partition ID is in the list of available partitions, throwing an exception if it's not, as that would indicate a configuration failure.
When no partitionID or partitionKey are present to decide where to send the batch to, I've added a default batch where CreateBatchOptions won't get either set. The EventHubsAsyncProducer will in that case round-robin over the available partitions.
...ium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java
Outdated
Show resolved
Hide resolved
...r-eventhubs/src/test/java/io/debezium/server/eventhubs/EventHubsWithPartitionKeyProfile.java
Outdated
Show resolved
Hide resolved
...venthubs/src/test/java/io/debezium/server/eventhubs/EventHubsWithPartitionRouterProfile.java
Outdated
Show resolved
Hide resolved
index -> { | ||
ChangeEvent<Object, Object> record = records.get(index); | ||
try { | ||
committer.markProcessed(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not going to work. The markProcessed
should be called sequentially for the records are they are made available from the input. There must not be gaps or change in order. IMHO this is not guaranteed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any advice on how to address this? When stepping through the code I see that the risk of non-sequentially calling markProcessed is that the wrong offset is ultimately written to the offsetWriter, and those offsets are only committed when markBatchFinished()
is called.
The first thing that comes to mind would be to called committer.markProcessed(record)
as soon as it's scheduled in an EventDataBatch, as that is guaranteed to be in order. That seems to have the risk that that batch might fail to be emitted, while other batches to other EventHub partitions might succeed. That situation would result in an exception being thrown from handleBatch()
, meaning that markBatchFinished()
would not be called. Is that sufficient to make sure the right offsets are being stored? A retry in that case would probably be fine, as that would lead to at-least-once delivery, whereas in the current situation writing out the wrong offset could lead to data-loss in case of retries?
@jpechane Could you acknowledge that my train of thought here is correct? If so I'll make the change to markProcessed once records are added to a batch. If I'm wrong I'd hope you have an alternative suggestion on how to resolve this, without having to emit the batches to EventHub in serial order based on partition routing of incoming records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@slknijnenburg Hi, there are generally two solutions
- Simple which I believe is gooed enough for starter, just commit all records in order before calling
markBatchFinished()
. It brings larger number of duplicates in case of connector crash but it is simple and easy to implement - The other option is to track the smallest index of message that was successfully written by each atch accross all batches. In that case you'll get optimal solution at the cost of increased complexity.
I personally would recommend to implement option 1) to conclude the PR and handle option 2) as a follow-up (if needed)
...ium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java
Outdated
Show resolved
Hide resolved
The boolean was used as a shorthand to check for either partitionID and partitionKey being not empty and was used in more places in previous iterations. As it was only used twice now, the isEmpty() checks could as well be inlined.
When a partitionKey is defined, we always store the batch data in batches[0]. This same index is used just as a regular index for other use cases, which can be confusing. For this reason a static final Integer was extracted to make that the partitionkey usecase explicit.
Adds both a startup- and a runtime check to verify the target partition of respectively the configuration or the event is really present on the target eventhub.
af51c48
to
2ac80d8
Compare
When a ChangeEvent has no PartitionId, it should still be emitted to EventHub. The EventHub producer will send events for which the EventDataBatch does not have any partition configuration round-robin across all partitions, according to https://github.com/Azure/azure-sdk-for-java/blob/6b04bc47b5cd04b7630d20a8b869f2d72595bd34/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java#L569-L591.
@slknijnenburg Applied, thanks a lot! Could you please prepare a docs update PR too? |
@jpechane Here you go: debezium/debezium#5070 |
Seems like debezium/debezium-server#51 introduced a potential regression, where if the sink is configured with a custom producer, it is not wrapped in a BatchManager and batchManager will be null when handleBatch is invoked.
Link: https://issues.redhat.com/browse/DBZ-6723
Background
With the existing Azure EventHubs sink implementation it is not possible to leverage multiple partitions on the EventHubs side. One can either hardcode the partitionID where all messages will be sent to, or a fixed partitionKey, which will make EventHubs pick a single partition derived from that key. Being only able to leverage a single partition makes it hard to parallelise any downstream processing. This is something we ran into when processing ~900GB of messages from a single large database server.
The EventHubs SDK only allows specifying the partition (either via an ID or a key) on a batch level, as the entire batch of messages is being emitted as a single message under the hood. This means that in order to send messages to N partitions, we need to manage N potential batches and we need to route events to the correct batch to ultimately make the message arrive in the correct EventHubs partition.
Changes
This PR adds supports to sends events to multiple EventHubs partitions based on input from the PartitionRouter SMT. In order to be able to read the partition number from the ChangeEvent, its interface had to be changed, for this there is an open PR in the
debezium
repository on which this PR depends.A new BatchManager class now create batches for every partition, and assigns EventData object to a specific batch based on the partition derived from the ChangeEvent. When a batch hits its maximum size (1024KB), the batch is emitted to EventHubs and a new batch is created.
When all incoming records have been processed all open/remaining batches are emitted if they contain any EventData.
As creating an EventDataBatch initiates a connection over AMQP, that adds quite a bit of overhead. By putting the batch creation behind a proxy class, we only create the real EventDataBatch when it is necessary, saving a bit of overhead.
Other notes