Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync bulk support #485

Merged
merged 13 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
## Release History

### 1.6.0-beta.1 (Unreleased)
#### New Features
* Added bulk support for `CosmosDBSinkConnector`. - [PR 485](https://github.com/microsoft/kafka-connect-cosmosdb/pull/485)
#### Breaking Changes

#### Key Bug Fixes

#### Other Changes

### 1.5.0 (2022-09-16)
#### New Features
* Updated `azure-cosmos` version to 4.36.0.

#### Key Bug Fixes
* Fixed parsing for empty arrays. [PR 466](https://github.com/microsoft/kafka-connect-cosmosdb/pull/466)
* Updated `CosmosDBSinkConnector` to keep retrying for throttled requests. [PR 472](https://github.com/microsoft/kafka-connect-cosmosdb/pull/472)
* Updated `CosmosDBSinkConnector` to keep retrying for throttled requests. - [PR 472](https://github.com/microsoft/kafka-connect-cosmosdb/pull/472)

### 1.4.0 (2022-05-26)
#### New Features
Expand Down
57 changes: 47 additions & 10 deletions src/main/java/com/azure/cosmos/kafka/connect/CosmosDBConfig.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect;

import static com.azure.cosmos.kafka.connect.CosmosDBConfig.CosmosClientBuilder.createClient;
Expand Down Expand Up @@ -50,7 +53,16 @@ public class CosmosDBConfig extends AbstractConfig {
private static final String COSMOS_CONTAINER_TOPIC_MAP_DISPLAY = "Topic-Container map";
private static final String COSMOS_CONTAINER_TOPIC_MAP_DOC =
"A comma delimited list of Kafka topics mapped to Cosmos containers.\n"
+ "For example: topic1#con1,topic2#con2.";
+ "For example: topic1#con1,topic2#con2.";

public static final String COSMOS_SINK_BULK_ENABLED_CONF = "connect.cosmos.sink.bulk.enabled";
private static final String COSMOS_SINK_BULK_ENABLED_DOC = "Flag to indicate whether Cosmos DB bulk mode is enabled for Sink connector. By default it is true.";
private static final boolean DEFAULT_COSMOS_SINK_BULK_ENABLED = true;

public static final String COSMOS_SINK_MAX_RETRY_COUNT = "connect.cosmos.sink.maxRetryCount";
private static final String COSMOS_SINK_MAX_RETRY_COUNT_DOC =
"Cosmos DB max retry attempts on write failures for Sink connector. By default, the connector will retry on transient write errors for up to 10 times.";
private static final int DEFAULT_COSMOS_SINK_MAX_RETRY_COUNT = 10;
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved

public static final String COSMOS_PROVIDER_NAME_CONF = "connect.cosmos.provider.name";
private static final String COSMOS_PROVIDER_NAME_DEFAULT = null;
Expand All @@ -63,20 +75,24 @@ public class CosmosDBConfig extends AbstractConfig {
public static final int COSMOS_DATABASE_GROUP_ORDER = 2;
public static final String COSMOS_CLIENT_USER_AGENT_SUFFIX = "APN/1.0 Microsoft/1.0 KafkaConnect/";

private String connEndpoint;
private String connKey;
private String databaseName;
private String providerName;
private final String connEndpoint;
private final String connKey;
private final String databaseName;
private final String providerName;
private final boolean bulkModeEnabled;
private final int maxRetryCount;
private TopicContainerMap topicContainerMap = TopicContainerMap.empty();

public CosmosDBConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);

connEndpoint = this.getString(COSMOS_CONN_ENDPOINT_CONF);
connKey = this.getPassword(COSMOS_CONN_KEY_CONF).value();
databaseName = this.getString(COSMOS_DATABASE_NAME_CONF);
topicContainerMap = TopicContainerMap.deserialize(this.getString(COSMOS_CONTAINER_TOPIC_MAP_CONF));
providerName = this.getString(COSMOS_PROVIDER_NAME_CONF);
this.connEndpoint = this.getString(COSMOS_CONN_ENDPOINT_CONF);
this.connKey = this.getPassword(COSMOS_CONN_KEY_CONF).value();
this.databaseName = this.getString(COSMOS_DATABASE_NAME_CONF);
this.topicContainerMap = TopicContainerMap.deserialize(this.getString(COSMOS_CONTAINER_TOPIC_MAP_CONF));
this.providerName = this.getString(COSMOS_PROVIDER_NAME_CONF);
this.bulkModeEnabled = this.getBoolean(COSMOS_SINK_BULK_ENABLED_CONF);
this.maxRetryCount = this.getInt(COSMOS_SINK_MAX_RETRY_COUNT);
}

public CosmosDBConfig(Map<String, String> parsedConfig) {
Expand Down Expand Up @@ -125,6 +141,19 @@ private static void defineConnectionConfigs(ConfigDef result) {
"none",
Importance.MEDIUM,
TOLERANCE_ON_ERROR_DOC
).define(
COSMOS_SINK_BULK_ENABLED_CONF,
Type.BOOLEAN,
DEFAULT_COSMOS_SINK_BULK_ENABLED,
Importance.LOW,
COSMOS_SINK_BULK_ENABLED_DOC
)
.define(
COSMOS_SINK_MAX_RETRY_COUNT,
Type.INT,
DEFAULT_COSMOS_SINK_MAX_RETRY_COUNT,
Importance.HIGH,
COSMOS_SINK_MAX_RETRY_COUNT_DOC
)
.defineInternal(
COSMOS_PROVIDER_NAME_CONF,
Expand Down Expand Up @@ -188,6 +217,14 @@ public String getProviderName() {
return this.providerName;
}

public boolean isBulkModeEnabled() {
return this.bulkModeEnabled;
}

public int getMaxRetryCount() {
return this.maxRetryCount;
}

public static void validateConnection(Map<String, String> connectorConfigs, Map<String, ConfigValue> configValues) {
String endpoint = connectorConfigs.get(CosmosDBSinkConfig.COSMOS_CONN_ENDPOINT_CONF);
String key = connectorConfigs.get(CosmosDBSinkConfig.COSMOS_CONN_KEY_CONF);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect;

import org.apache.commons.collections4.BidiMap;
Expand Down
150 changes: 150 additions & 0 deletions src/main/java/com/azure/cosmos/kafka/connect/sink/BulkWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.sink;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.models.CosmosBulkItemResponse;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public class BulkWriter extends SinkWriterBase {
private static final Logger logger = LoggerFactory.getLogger(BulkWriter.class);

private final CosmosContainer cosmosContainer;
private final PartitionKeyDefinition partitionKeyDefinition;

public BulkWriter(CosmosContainer container, int maxRetryCount) {
super(maxRetryCount);

checkNotNull(container, "Argument 'container' can not be null");
this.cosmosContainer = container;
this.partitionKeyDefinition = container.read().getProperties().getPartitionKeyDefinition();
}

/***
* Bulk write the sink records.
*
* @param sinkRecords the sink records needs to be written.
* @return the list of sink write failed operations.
*/
protected SinkWriteResponse writeCore(List<SinkRecord> sinkRecords) {

SinkWriteResponse sinkWriteResponse = new SinkWriteResponse();
if (sinkRecords == null || sinkRecords.isEmpty()) {
return sinkWriteResponse;
}

xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
List<CosmosItemOperation> itemOperations = new ArrayList<>();
for (SinkRecord sinkRecord : sinkRecords) {
CosmosItemOperation cosmosItemOperation = CosmosBulkOperations.getUpsertItemOperation(
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
sinkRecord.value(),
this.getPartitionKeyValue(sinkRecord.value()),
new SinkOperationContext(sinkRecord));

itemOperations.add(cosmosItemOperation);
}

Iterable<CosmosBulkOperationResponse<Object>> responseList = cosmosContainer.executeBulkOperations(itemOperations);

// Non-transient exceptions will be put in the front of the list
for (CosmosBulkOperationResponse<Object> bulkOperationResponse : responseList) {
SinkOperationContext context = bulkOperationResponse.getOperation().getContext();
checkNotNull(context, "sinkOperationContext should not be null");

SinkRecord sinkRecord = context.getSinkRecord();

if (bulkOperationResponse.getException() != null
|| bulkOperationResponse.getResponse() == null
|| !bulkOperationResponse.getResponse().isSuccessStatusCode()) {

BulkOperationFailedException exception = handleErrorStatusCode(
bulkOperationResponse.getResponse(),
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
bulkOperationResponse.getException(),
context);

// Generally we would want to retry for the transient exceptions, and fail-fast for non-transient exceptions
// Putting the non-transient exceptions at the front of the list
// so later when deciding retry behavior, only examining the first exception will be enough
if (ExceptionsHelper.isTransientFailure(exception)) {
sinkWriteResponse.getFailedRecordResponses().add(new SinkOperationFailedResponse(sinkRecord, exception));
} else {
sinkWriteResponse.getFailedRecordResponses().add(0, new SinkOperationFailedResponse(sinkRecord, exception));
}
} else {
sinkWriteResponse.getSucceededRecords().add(sinkRecord);
}
}

return sinkWriteResponse;
}

private PartitionKey getPartitionKeyValue(Object recordValue) {
checkArgument(recordValue instanceof Map, "Argument 'recordValue' is not valid map format.");

//TODO: examine the code here for sub-partition
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
String partitionKeyPath = StringUtils.join(this.partitionKeyDefinition.getPaths(), "");
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Object> recordMap = (Map<String, Object>) recordValue;
Object partitionKeyValue = recordMap.get(partitionKeyPath.substring(1));
PartitionKeyInternal partitionKeyInternal = PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), false);

return ImplementationBridgeHelpers
.PartitionKeyHelper
.getPartitionKeyAccessor()
.toPartitionKey(partitionKeyInternal);
}

BulkOperationFailedException handleErrorStatusCode(
CosmosBulkItemResponse itemResponse,
Exception exception,
SinkOperationContext sinkOperationContext) {

int effectiveStatusCode =
itemResponse != null
? itemResponse.getStatusCode()
: (exception != null && exception instanceof CosmosException ? ((CosmosException)exception).getStatusCode() : HttpConstants.StatusCodes.REQUEST_TIMEOUT);
int effectiveSubStatusCode =
itemResponse != null
? itemResponse.getSubStatusCode()
: (exception != null && exception instanceof CosmosException ? ((CosmosException)exception).getSubStatusCode() : 0);

String errorMessage =
String.format(
"Request failed with effectiveStatusCode: {%s}, effectiveSubStatusCode: {%s}, kafkaOffset: {%s}, kafkaPartition: {%s}, topic: {%s}",
effectiveStatusCode,
effectiveSubStatusCode,
sinkOperationContext.getKafkaOffset(),
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
sinkOperationContext.getKafkaPartition(),
sinkOperationContext.getTopic());


return new BulkOperationFailedException(effectiveStatusCode, effectiveSubStatusCode, errorMessage, exception);
}

private static class BulkOperationFailedException extends CosmosException {
protected BulkOperationFailedException(int statusCode, int subStatusCode, String message, Throwable cause) {
super(statusCode, message, null, cause);
BridgeInternal.setSubStatusCode(this, subStatusCode);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.sink;

import com.azure.cosmos.kafka.connect.sink.id.strategy.AbstractIdStrategyConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.sink;

import static com.azure.cosmos.kafka.connect.CosmosDBConfig.validateConnection;
Expand Down
Loading