Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-6723 Allow specifying partitions for EventHub #51

4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ az eventhubs namespace create --name debezium-test --resource-group eventhubstes

### Create an Event Hub

Create an Event Hub (equivalent to a topic) with `one` partition. Check the documentation for options on how do this using the [Azure Portal](https://docs.microsoft.com/azure/event-hubs/event-hubs-create#create-an-event-hub), [Azure CLI](https://docs.microsoft.com/azure/event-hubs/event-hubs-quickstart-cli#create-an-event-hub) etc. , e.g. on the CLI:
Create an Event Hub (equivalent to a topic) with 5 partitions. Check the documentation for options on how do this using the [Azure Portal](https://docs.microsoft.com/azure/event-hubs/event-hubs-create#create-an-event-hub), [Azure CLI](https://docs.microsoft.com/azure/event-hubs/event-hubs-quickstart-cli#create-an-event-hub) etc. , e.g. on the CLI:

```shell
az eventhubs eventhub create --name debezium-test-hub --resource-group eventhubstest --namespace-name debezium-test
`az eventhubs eventhub create` --name debezium-test-hub --resource-group eventhubstest --namespace-name debezium-test --partition-count 5
```

### Build the module
Expand Down
5 changes: 3 additions & 2 deletions debezium-server-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@
<artifactId>debezium-testing-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope></dependency>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.eventhubs;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;

public class BatchManager {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchManager.class);
private final EventHubProducerClient producer;
private final boolean forceSinglePartitionMode;
private final String partitionID;
private final String partitionKey;
private final Integer maxBatchSize;

// Prepare CreateBatchOptions for N partitions
private final HashMap<Integer, CreateBatchOptions> batchOptions = new HashMap<>();
private final HashMap<Integer, EventDataBatchProxy> batches = new HashMap<>();
private final HashMap<Integer, ArrayList<Integer>> processedRecordIndices = new HashMap<>();
private List<ChangeEvent<Object, Object>> records;
private DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer;

public BatchManager(EventHubProducerClient producer, boolean forceSinglePartitionMode,
String partitionID, String partitionKey, Integer maxBatchSize) {
this.producer = producer;
this.forceSinglePartitionMode = forceSinglePartitionMode;
this.partitionID = partitionID;
this.partitionKey = partitionKey;
this.maxBatchSize = maxBatchSize;
}

public void initializeBatch(List<ChangeEvent<Object, Object>> records,
DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) {
this.records = records;
this.committer = committer;

if (forceSinglePartitionMode) {
CreateBatchOptions op = new CreateBatchOptions();

if (!partitionID.isEmpty()) {
op.setPartitionId(partitionID);

batchOptions.put(Integer.parseInt(partitionID), op);
batches.put(Integer.parseInt(partitionID), new EventDataBatchProxy(producer, op));
processedRecordIndices.put(Integer.parseInt(partitionID), new ArrayList<>());
}
slknijnenburg marked this conversation as resolved.
Show resolved Hide resolved

if (!partitionKey.isEmpty()) {
op.setPartitionKey(partitionKey);

batchOptions.put(0, op);
batches.put(0, new EventDataBatchProxy(producer, op));
processedRecordIndices.put(0, new ArrayList<>());
}

if (maxBatchSize != 0) {
op.setMaximumSizeInBytes(maxBatchSize);
}

return;
}

producer.getPartitionIds().stream().forEach(partitionId -> {
CreateBatchOptions op = new CreateBatchOptions().setPartitionId(partitionId);
if (maxBatchSize != 0) {
op.setMaximumSizeInBytes(maxBatchSize);
}
batchOptions.put(Integer.parseInt(partitionId), op);
});
// Prepare batches
batchOptions.forEach((partitionId, batchOption) -> {
EventDataBatchProxy batch = new EventDataBatchProxy(producer, batchOption);
batches.put(partitionId, batch);
processedRecordIndices.put(partitionId, new ArrayList<>());
});

}

public void closeAndEmitBatches() {
// All records have been processed, emit the final (non-full) batches.
batches.forEach((partitionId, batch) -> {
if (batch.getCount() > 0) {
LOGGER.trace("Dispatching {} events.", batch.getCount());
emitBatchToEventHub(records, committer, processedRecordIndices.get(partitionId), batch);
}
});
}

public void sendEventToPartitionId(EventData eventData, Integer recordIndex, Integer partitionId) {
EventDataBatchProxy batch = batches.get(partitionId);

if (!batch.tryAdd(eventData)) {
if (batch.getCount() == 0) {
// If we fail to add at least the very first event to the batch that is because
// the event's size exceeds the maxBatchSize in which case we cannot safely
// recover and dispatch the event, only option is to throw an exception.
throw new DebeziumException("Event data is too large to fit into batch");
}
// reached the maximum allowed size for the batch
LOGGER.trace("Maximum batch reached, dispatching {} events.", batch.getCount());
slknijnenburg marked this conversation as resolved.
Show resolved Hide resolved

// Max size reached, dispatch the batch to EventHub
emitBatchToEventHub(records, committer, processedRecordIndices.get(partitionId), batch);
// Renew the batch proxy so we can continue.
batch = new EventDataBatchProxy(producer, batchOptions.get(partitionId));
batches.put(partitionId, batch);
processedRecordIndices.put(partitionId, new ArrayList<>());
}

// Record the index of the record that was added to the batch.
processedRecordIndices.get(partitionId).add(recordIndex);
}

private void emitBatchToEventHub(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer,
ArrayList<Integer> processedIndices, EventDataBatchProxy batch) {
final int batchEventSize = batch.getCount();
if (batchEventSize > 0) {
try {
LOGGER.trace("Sending batch of {} events to Event Hubs", batchEventSize);
batch.emit();
LOGGER.trace("Sent record batch to Event Hubs");
}
catch (Exception e) {
throw new DebeziumException(e);
}

// this loop commits each record submitted in the event hubs batch
List<String> processedIndexesStrings = processedIndices.stream().map(Object::toString).collect(Collectors.toList());
LOGGER.trace("Marking records as processed: {}", String.join("; ", processedIndexesStrings));
processedIndices.forEach(
index -> {
ChangeEvent<Object, Object> record = records.get(index);
try {
committer.markProcessed(record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not going to work. The markProcessed should be called sequentially for the records are they are made available from the input. There must not be gaps or change in order. IMHO this is not guaranteed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any advice on how to address this? When stepping through the code I see that the risk of non-sequentially calling markProcessed is that the wrong offset is ultimately written to the offsetWriter, and those offsets are only committed when markBatchFinished() is called.

The first thing that comes to mind would be to called committer.markProcessed(record) as soon as it's scheduled in an EventDataBatch, as that is guaranteed to be in order. That seems to have the risk that that batch might fail to be emitted, while other batches to other EventHub partitions might succeed. That situation would result in an exception being thrown from handleBatch(), meaning that markBatchFinished() would not be called. Is that sufficient to make sure the right offsets are being stored? A retry in that case would probably be fine, as that would lead to at-least-once delivery, whereas in the current situation writing out the wrong offset could lead to data-loss in case of retries?

@jpechane Could you acknowledge that my train of thought here is correct? If so I'll make the change to markProcessed once records are added to a batch. If I'm wrong I'd hope you have an alternative suggestion on how to resolve this, without having to emit the batches to EventHub in serial order based on partition routing of incoming records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slknijnenburg Hi, there are generally two solutions

  1. Simple which I believe is gooed enough for starter, just commit all records in order before calling markBatchFinished(). It brings larger number of duplicates in case of connector crash but it is simple and easy to implement
  2. The other option is to track the smallest index of message that was successfully written by each atch accross all batches. In that case you'll get optimal solution at the cost of increased complexity.

I personally would recommend to implement option 1) to conclude the PR and handle option 2) as a follow-up (if needed)

LOGGER.trace("Record marked processed");
}
catch (Exception e) {
throw new DebeziumException(e);
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.eventhubs;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;

/**
* Proxy class/wrapper for EventDataBatch. Will create an inner EventDataBatch when data is being emitted to a specific
* partition.
*/
public class EventDataBatchProxy {
private EventDataBatch batch;
private final EventHubProducerClient producer;
private final CreateBatchOptions batchOptions;

public EventDataBatchProxy(EventHubProducerClient producer, CreateBatchOptions batchOptions) {
this.producer = producer;
this.batchOptions = batchOptions;
}

public boolean tryAdd(final EventData eventData) {
if (this.batch == null) {
this.batch = producer.createBatch(this.batchOptions);
}

return batch.tryAdd(eventData);
}

public int getCount() {
if (this.batch == null) {
return 0;
}

return batch.getCount();
}

public void emit() {
if (this.batch == null) {
return;
}

producer.send(this.batch);
}
}
Loading