Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 1.14.0 #550

Merged
merged 6 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
## Release History

### 1.14.0 (2024-02-28)
#### New Features
* Updated `azure-cosmos` version to 4.56.0.

#### Key Bug Fixes
* Fixed `NullPointerException` in `CosmosDBSinkConnector` when `TRACE` level log is enabled and `SinkRecord` value being null. [PR 549](https://github.com/microsoft/kafka-connect-cosmosdb/pull/549)

#### Other Changes
* Added more DEBUG level logs in `CosmosDBSourceConnector`. [PR 549](https://github.com/microsoft/kafka-connect-cosmosdb/pull/549)

### 1.13.0 (2024-01-25)
#### New Features
* Updated `azure-cosmos` version to 4.54.0.
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.azure.cosmos.kafka</groupId>
<artifactId>kafka-connect-cosmos</artifactId>
<version>1.13.0</version>
<version>1.14.0</version>

<name> kafka-connect-cosmos</name>
<url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.54.0</version>
<version>4.56.0</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,14 @@ public void put(Collection<SinkRecord> records) {
if (record.key() != null) {
MDC.put(String.format("CosmosDbSink-%s", containerName), record.key().toString());
}
logger.trace("Writing record, value type: {}", record.value().getClass().getName());

logger.trace("Key Schema: {}", record.keySchema());
logger.trace("Value schema: {}", record.valueSchema());
if (record.value() != null) {
logger.trace("Writing record, value type: {}", record.value().getClass().getName());
logger.trace("Value schema: {}", record.valueSchema());
} else {
logger.trace("Record value is null");
}

Object recordValue;
if (record.value() instanceof Struct) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.lang.Thread.sleep;
import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -62,11 +63,11 @@ public String version() {

@Override
public void start(Map<String, String> map) {
logger.info("Starting CosmosDBSourceTask.");
logger.info("Worker {} Starting CosmosDBSourceTask.", this.config.getWorkerName());
config = new CosmosDBSourceConfig(map);
this.queue = new LinkedTransferQueue<>();

logger.info("Creating the client.");
logger.info("Worker {} Creating the client.", this.config.getWorkerName());
client = getCosmosClient(config);

// Initialize the database, feed and lease containers
Expand Down Expand Up @@ -101,7 +102,7 @@ public void start(Map<String, String> map) {
}
} // Wait for ChangeFeedProcessor to start.

logger.info("Started CosmosDB source task.");
logger.info("Worker {} Started CosmosDB source task.", this.config.getWorkerName());
}

private String getItemLsn(JsonNode item) {
Expand All @@ -121,11 +122,27 @@ public List<SourceRecord> poll() throws InterruptedException {
while (running.get()) {
fillRecords(records, topic);
if (records.isEmpty() || System.currentTimeMillis() > maxWaitTime) {
logger.info("Sending {} documents.", records.size());
break;
}
}

logger.info("Worker {}, Sending {} documents.", this.config.getWorkerName(), records.size());

if (logger.isDebugEnabled()) {
List<String> recordDetails =
records
.stream()
.map(sourceRecord -> String.format("[key %s - offset %s]", sourceRecord.key(), sourceRecord.sourceOffset()))
.collect(Collectors.toList());

logger.debug(
"Worker {}, sending {} documents",
this.config.getWorkerName(),
recordDetails
);
}

logger.debug("Worker {}, shouldFillMoreRecords {}", this.config.getWorkerName(), true);
this.shouldFillMoreRecords.set(true);
return records;
}
Expand Down Expand Up @@ -155,9 +172,6 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
// Get the latest lsn and record as offset
Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getItemLsn(node));

if (logger.isDebugEnabled()) {
logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY));
}
// Convert JSON to Kafka Connect struct and JSON schema
SchemaAndValue schemaAndValue = jsonToStruct.recordToSchemaAndValue(node);

Expand All @@ -176,21 +190,24 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
} else {
// If the buffer Size exceeds then do not remove the node.
if (logger.isDebugEnabled()) {
logger.debug("Adding record back to the queue since adding it exceeds the allowed buffer size {}", config.getTaskBufferSize());
logger.debug(
"Worker {} Adding record back to the queue since adding it exceeds the allowed buffer size {}",
this.config.getWorkerName(),
config.getTaskBufferSize());
}
this.queue.add(node);
break;
}
} catch (Exception e) {
logger.error("Failed to fill Source Records for Topic {}", topic);
logger.error("Worker {} Failed to fill Source Records for Topic {}", this.config.getWorkerName(), topic);
throw e;
}
}
}

@Override
public void stop() {
logger.info("Stopping CosmosDB source task.");
logger.info("Worker {} Stopping CosmosDB source task.", this.config.getWorkerName());
// NOTE: poll() method and stop() method are both called from the same thread,
// so it is important not to include any changes which may block both places forever
running.set(false);
Expand All @@ -200,18 +217,22 @@ public void stop() {
changeFeedProcessor.stop().block();
changeFeedProcessor = null;
}

if (this.client != null) {
this.client.close();
}
}

private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) {
logger.info("Creating Cosmos Client.");
logger.info("Worker {} Creating Cosmos Client.", this.config.getWorkerName());

CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(config.getConnEndpoint())
.key(config.getConnKey())
.consistencyLevel(ConsistencyLevel.SESSION)
.contentResponseOnWriteEnabled(true)
.connectionSharingAcrossClientsEnabled(config.isConnectionSharingEnabled())
.userAgentSuffix(CosmosDBConfig.COSMOS_CLIENT_USER_AGENT_SUFFIX + version());
.userAgentSuffix(getUserAgentSuffix());

if (config.isGatewayModeEnabled()) {
cosmosClientBuilder.gatewayMode();
Expand All @@ -220,6 +241,10 @@ private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) {
return cosmosClientBuilder.buildAsyncClient();
}

private String getUserAgentSuffix() {
return CosmosDBConfig.COSMOS_CLIENT_USER_AGENT_SUFFIX + version() + "|" + this.config.getWorkerName();
}

private ChangeFeedProcessor getChangeFeedProcessor(
String hostName,
CosmosAsyncContainer feedContainer,
Expand All @@ -243,11 +268,24 @@ private ChangeFeedProcessor getChangeFeedProcessor(
}

protected void handleCosmosDbChanges(List<JsonNode> docs) {
if (docs != null) {
List<String> docIds =
docs
.stream()
.map(jsonNode -> jsonNode.get("id") != null ? jsonNode.get("id").asText() : "null")
.collect(Collectors.toList());
logger.debug(
"handleCosmosDbChanges - Worker {}, total docs {}, Details [{}].",
this.config.getWorkerName(),
docIds.size(),
docIds);
}

for (JsonNode document : docs) {
// Blocks for each transfer till it is processed by the poll method.
// If we fail before checkpointing then the new worker starts again.
try {
logger.trace("Queuing document");
logger.trace("Queuing document {}", document.get("id") != null ? document.get("id").asText() : "null");

// The item is being transferred to the queue, and the method will only return if the item has been polled from the queue.
// The queue is being continuously polled and then put into a batch list, but the batch list is not being flushed right away
Expand All @@ -264,6 +302,7 @@ protected void handleCosmosDbChanges(List<JsonNode> docs) {
if (docs.size() > 0) {
// it is important to flush the current batches to kafka as currently we are using lease container continuationToken for book marking
// so we would only want to move ahead of the book marking when all the records have been returned to kafka
logger.debug("Worker {}, shouldFillMoreRecords {}", this.config.getWorkerName(), false);
this.shouldFillMoreRecords.set(false);
}
}
Expand Down
Loading