Skip to content

Commit

Permalink
Merge pull request #485 from xinlian12/addSyncBulkSupport
Browse files Browse the repository at this point in the history
Add sync bulk support
  • Loading branch information
xinlian12 authored Sep 28, 2022
2 parents b688df0 + 5467801 commit d5f0ebb
Show file tree
Hide file tree
Showing 38 changed files with 1,089 additions and 103 deletions.
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
## Release History

### 1.6.0-beta.1 (Unreleased)
#### New Features
* Added bulk support for `CosmosDBSinkConnector`. - [PR 485](https://github.com/microsoft/kafka-connect-cosmosdb/pull/485)
#### Breaking Changes

#### Key Bug Fixes

#### Other Changes

### 1.5.0 (2022-09-16)
#### New Features
* Updated `azure-cosmos` version to 4.36.0.

#### Key Bug Fixes
* Fixed parsing for empty arrays. [PR 466](https://github.com/microsoft/kafka-connect-cosmosdb/pull/466)
* Updated `CosmosDBSinkConnector` to keep retrying for throttled requests. [PR 472](https://github.com/microsoft/kafka-connect-cosmosdb/pull/472)
* Updated `CosmosDBSinkConnector` to keep retrying for throttled requests. - [PR 472](https://github.com/microsoft/kafka-connect-cosmosdb/pull/472)

### 1.4.0 (2022-05-26)
#### New Features
Expand Down
57 changes: 47 additions & 10 deletions src/main/java/com/azure/cosmos/kafka/connect/CosmosDBConfig.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect;

import static com.azure.cosmos.kafka.connect.CosmosDBConfig.CosmosClientBuilder.createClient;
Expand Down Expand Up @@ -50,7 +53,16 @@ public class CosmosDBConfig extends AbstractConfig {
private static final String COSMOS_CONTAINER_TOPIC_MAP_DISPLAY = "Topic-Container map";
private static final String COSMOS_CONTAINER_TOPIC_MAP_DOC =
"A comma delimited list of Kafka topics mapped to Cosmos containers.\n"
+ "For example: topic1#con1,topic2#con2.";
+ "For example: topic1#con1,topic2#con2.";

public static final String COSMOS_SINK_BULK_ENABLED_CONF = "connect.cosmos.sink.bulk.enabled";
private static final String COSMOS_SINK_BULK_ENABLED_DOC = "Flag to indicate whether Cosmos DB bulk mode is enabled for Sink connector. By default it is true.";
private static final boolean DEFAULT_COSMOS_SINK_BULK_ENABLED = true;

public static final String COSMOS_SINK_MAX_RETRY_COUNT = "connect.cosmos.sink.maxRetryCount";
private static final String COSMOS_SINK_MAX_RETRY_COUNT_DOC =
"Cosmos DB max retry attempts on write failures for Sink connector. By default, the connector will retry on transient write errors for up to 10 times.";
private static final int DEFAULT_COSMOS_SINK_MAX_RETRY_COUNT = 10;

public static final String COSMOS_PROVIDER_NAME_CONF = "connect.cosmos.provider.name";
private static final String COSMOS_PROVIDER_NAME_DEFAULT = null;
Expand All @@ -63,20 +75,24 @@ public class CosmosDBConfig extends AbstractConfig {
public static final int COSMOS_DATABASE_GROUP_ORDER = 2;
public static final String COSMOS_CLIENT_USER_AGENT_SUFFIX = "APN/1.0 Microsoft/1.0 KafkaConnect/";

private String connEndpoint;
private String connKey;
private String databaseName;
private String providerName;
private final String connEndpoint;
private final String connKey;
private final String databaseName;
private final String providerName;
private final boolean bulkModeEnabled;
private final int maxRetryCount;
private TopicContainerMap topicContainerMap = TopicContainerMap.empty();

public CosmosDBConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);

connEndpoint = this.getString(COSMOS_CONN_ENDPOINT_CONF);
connKey = this.getPassword(COSMOS_CONN_KEY_CONF).value();
databaseName = this.getString(COSMOS_DATABASE_NAME_CONF);
topicContainerMap = TopicContainerMap.deserialize(this.getString(COSMOS_CONTAINER_TOPIC_MAP_CONF));
providerName = this.getString(COSMOS_PROVIDER_NAME_CONF);
this.connEndpoint = this.getString(COSMOS_CONN_ENDPOINT_CONF);
this.connKey = this.getPassword(COSMOS_CONN_KEY_CONF).value();
this.databaseName = this.getString(COSMOS_DATABASE_NAME_CONF);
this.topicContainerMap = TopicContainerMap.deserialize(this.getString(COSMOS_CONTAINER_TOPIC_MAP_CONF));
this.providerName = this.getString(COSMOS_PROVIDER_NAME_CONF);
this.bulkModeEnabled = this.getBoolean(COSMOS_SINK_BULK_ENABLED_CONF);
this.maxRetryCount = this.getInt(COSMOS_SINK_MAX_RETRY_COUNT);
}

public CosmosDBConfig(Map<String, String> parsedConfig) {
Expand Down Expand Up @@ -125,6 +141,19 @@ private static void defineConnectionConfigs(ConfigDef result) {
"none",
Importance.MEDIUM,
TOLERANCE_ON_ERROR_DOC
).define(
COSMOS_SINK_BULK_ENABLED_CONF,
Type.BOOLEAN,
DEFAULT_COSMOS_SINK_BULK_ENABLED,
Importance.LOW,
COSMOS_SINK_BULK_ENABLED_DOC
)
.define(
COSMOS_SINK_MAX_RETRY_COUNT,
Type.INT,
DEFAULT_COSMOS_SINK_MAX_RETRY_COUNT,
Importance.HIGH,
COSMOS_SINK_MAX_RETRY_COUNT_DOC
)
.defineInternal(
COSMOS_PROVIDER_NAME_CONF,
Expand Down Expand Up @@ -188,6 +217,14 @@ public String getProviderName() {
return this.providerName;
}

public boolean isBulkModeEnabled() {
return this.bulkModeEnabled;
}

public int getMaxRetryCount() {
return this.maxRetryCount;
}

public static void validateConnection(Map<String, String> connectorConfigs, Map<String, ConfigValue> configValues) {
String endpoint = connectorConfigs.get(CosmosDBSinkConfig.COSMOS_CONN_ENDPOINT_CONF);
String key = connectorConfigs.get(CosmosDBSinkConfig.COSMOS_CONN_KEY_CONF);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect;

import org.apache.commons.collections4.BidiMap;
Expand Down
150 changes: 150 additions & 0 deletions src/main/java/com/azure/cosmos/kafka/connect/sink/BulkWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.sink;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.models.CosmosBulkItemResponse;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public class BulkWriter extends SinkWriterBase {
private static final Logger logger = LoggerFactory.getLogger(BulkWriter.class);

private final CosmosContainer cosmosContainer;
private final PartitionKeyDefinition partitionKeyDefinition;

public BulkWriter(CosmosContainer container, int maxRetryCount) {
super(maxRetryCount);

checkNotNull(container, "Argument 'container' can not be null");
this.cosmosContainer = container;
this.partitionKeyDefinition = container.read().getProperties().getPartitionKeyDefinition();
}

/***
* Bulk write the sink records.
*
* @param sinkRecords the sink records needs to be written.
* @return the list of sink write failed operations.
*/
protected SinkWriteResponse writeCore(List<SinkRecord> sinkRecords) {

SinkWriteResponse sinkWriteResponse = new SinkWriteResponse();
if (sinkRecords == null || sinkRecords.isEmpty()) {
return sinkWriteResponse;
}

List<CosmosItemOperation> itemOperations = new ArrayList<>();
for (SinkRecord sinkRecord : sinkRecords) {
CosmosItemOperation cosmosItemOperation = CosmosBulkOperations.getUpsertItemOperation(
sinkRecord.value(),
this.getPartitionKeyValue(sinkRecord.value()),
new SinkOperationContext(sinkRecord));

itemOperations.add(cosmosItemOperation);
}

Iterable<CosmosBulkOperationResponse<Object>> responseList = cosmosContainer.executeBulkOperations(itemOperations);

// Non-transient exceptions will be put in the front of the list
for (CosmosBulkOperationResponse<Object> bulkOperationResponse : responseList) {
SinkOperationContext context = bulkOperationResponse.getOperation().getContext();
checkNotNull(context, "sinkOperationContext should not be null");

SinkRecord sinkRecord = context.getSinkRecord();

if (bulkOperationResponse.getException() != null
|| bulkOperationResponse.getResponse() == null
|| !bulkOperationResponse.getResponse().isSuccessStatusCode()) {

BulkOperationFailedException exception = handleErrorStatusCode(
bulkOperationResponse.getResponse(),
bulkOperationResponse.getException(),
context);

// Generally we would want to retry for the transient exceptions, and fail-fast for non-transient exceptions
// Putting the non-transient exceptions at the front of the list
// so later when deciding retry behavior, only examining the first exception will be enough
if (ExceptionsHelper.isTransientFailure(exception)) {
sinkWriteResponse.getFailedRecordResponses().add(new SinkOperationFailedResponse(sinkRecord, exception));
} else {
sinkWriteResponse.getFailedRecordResponses().add(0, new SinkOperationFailedResponse(sinkRecord, exception));
}
} else {
sinkWriteResponse.getSucceededRecords().add(sinkRecord);
}
}

return sinkWriteResponse;
}

private PartitionKey getPartitionKeyValue(Object recordValue) {
checkArgument(recordValue instanceof Map, "Argument 'recordValue' is not valid map format.");

//TODO: examine the code here for sub-partition
String partitionKeyPath = StringUtils.join(this.partitionKeyDefinition.getPaths(), "");
Map<String, Object> recordMap = (Map<String, Object>) recordValue;
Object partitionKeyValue = recordMap.get(partitionKeyPath.substring(1));
PartitionKeyInternal partitionKeyInternal = PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), false);

return ImplementationBridgeHelpers
.PartitionKeyHelper
.getPartitionKeyAccessor()
.toPartitionKey(partitionKeyInternal);
}

BulkOperationFailedException handleErrorStatusCode(
CosmosBulkItemResponse itemResponse,
Exception exception,
SinkOperationContext sinkOperationContext) {

int effectiveStatusCode =
itemResponse != null
? itemResponse.getStatusCode()
: (exception != null && exception instanceof CosmosException ? ((CosmosException)exception).getStatusCode() : HttpConstants.StatusCodes.REQUEST_TIMEOUT);
int effectiveSubStatusCode =
itemResponse != null
? itemResponse.getSubStatusCode()
: (exception != null && exception instanceof CosmosException ? ((CosmosException)exception).getSubStatusCode() : 0);

String errorMessage =
String.format(
"Request failed with effectiveStatusCode: {%s}, effectiveSubStatusCode: {%s}, kafkaOffset: {%s}, kafkaPartition: {%s}, topic: {%s}",
effectiveStatusCode,
effectiveSubStatusCode,
sinkOperationContext.getKafkaOffset(),
sinkOperationContext.getKafkaPartition(),
sinkOperationContext.getTopic());


return new BulkOperationFailedException(effectiveStatusCode, effectiveSubStatusCode, errorMessage, exception);
}

private static class BulkOperationFailedException extends CosmosException {
protected BulkOperationFailedException(int statusCode, int subStatusCode, String message, Throwable cause) {
super(statusCode, message, null, cause);
BridgeInternal.setSubStatusCode(this, subStatusCode);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.sink;

import com.azure.cosmos.kafka.connect.sink.id.strategy.AbstractIdStrategyConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.sink;

import static com.azure.cosmos.kafka.connect.CosmosDBConfig.validateConnection;
Expand Down
Loading

0 comments on commit d5f0ebb

Please sign in to comment.