From 6c8bcf5fb4283b71acc583d70bbf171f35ba4fad Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 27 Feb 2024 19:25:13 -0800 Subject: [PATCH 1/5] few changes --- CHANGELOG.md | 11 ++++++ pom.xml | 2 +- .../kafka/connect/sink/CosmosDBSinkTask.java | 9 ++++- .../connect/source/CosmosDBSourceTask.java | 39 ++++++++++++++++++- 4 files changed, 56 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6014f611..1e351ccc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,15 @@ ## Release History + +### 1.14.0 (2024-02-28) +#### New Features +* Updated `azure-cosmos` version to 4.56.0. + +#### Key Bug Fixes +* Fixed `NullPointerException` in `CosmosDBSinkConnector` when `TRACE` level log is enabled and `SinkRecord` value being null + +#### Other Changes +* Added more DEBUG level logs in `CosmosDBSourceConnector` + ### 1.13.0 (2024-01-25) #### New Features * Updated `azure-cosmos` version to 4.54.0. diff --git a/pom.xml b/pom.xml index 2721e6e3..200b9029 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,7 @@ com.azure azure-cosmos - 4.54.0 + 4.56.0 com.jayway.jsonpath diff --git a/src/main/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTask.java b/src/main/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTask.java index a933f6bd..9f6ec6e7 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTask.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTask.java @@ -104,9 +104,14 @@ public void put(Collection records) { if (record.key() != null) { MDC.put(String.format("CosmosDbSink-%s", containerName), record.key().toString()); } - logger.trace("Writing record, value type: {}", record.value().getClass().getName()); + logger.trace("Key Schema: {}", record.keySchema()); - logger.trace("Value schema: {}", record.valueSchema()); + if (record.value() != null) { + logger.trace("Writing record, value type: {}", record.value().getClass().getName()); + logger.trace("Value schema: {}", record.valueSchema()); + } else { + logger.trace("Record value is null"); + } Object recordValue; if (record.value() instanceof Struct) { diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java index 17ccbd25..2d7c3ce3 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java @@ -35,6 +35,7 @@ import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static java.lang.Thread.sleep; import static java.util.Collections.singletonMap; @@ -121,11 +122,27 @@ public List poll() throws InterruptedException { while (running.get()) { fillRecords(records, topic); if (records.isEmpty() || System.currentTimeMillis() > maxWaitTime) { - logger.info("Sending {} documents.", records.size()); break; } } + logger.info("Worker {}, Sending {} documents.", this.config.getWorkerName(), records.size()); + + if (logger.isDebugEnabled()) { + List recordDetails = + records + .stream() + .map(sourceRecord -> String.format("key %s, offset %s", sourceRecord.key(), sourceRecord.sourceOffset())) + .collect(Collectors.toList()); + + logger.debug( + "Worker {}, sending docs {}", + this.config.getWorkerName(), + recordDetails + ); + } + + logger.debug("Worker {}, shouldFillMoreRecords {}", this.config.getWorkerName(), true); this.shouldFillMoreRecords.set(true); return records; } @@ -211,7 +228,7 @@ private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) { .consistencyLevel(ConsistencyLevel.SESSION) .contentResponseOnWriteEnabled(true) .connectionSharingAcrossClientsEnabled(config.isConnectionSharingEnabled()) - .userAgentSuffix(CosmosDBConfig.COSMOS_CLIENT_USER_AGENT_SUFFIX + version()); + .userAgentSuffix(getUserAgentSuffix()); if (config.isGatewayModeEnabled()) { cosmosClientBuilder.gatewayMode(); @@ -220,6 +237,10 @@ private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) { return cosmosClientBuilder.buildAsyncClient(); } + private String getUserAgentSuffix() { + return CosmosDBConfig.COSMOS_CLIENT_USER_AGENT_SUFFIX + version() + "|" + this.config.getWorkerName(); + } + private ChangeFeedProcessor getChangeFeedProcessor( String hostName, CosmosAsyncContainer feedContainer, @@ -243,6 +264,19 @@ private ChangeFeedProcessor getChangeFeedProcessor( } protected void handleCosmosDbChanges(List docs) { + if (docs != null) { + List docIds = + docs + .stream() + .map(jsonNode -> jsonNode.get("id").asText()) + .collect(Collectors.toList()); + logger.debug( + "handleCosmosDbChanges - Worker {}, docIds {}, Details [{}].", + this.config.getWorkerName(), + docIds.size(), + docIds); + } + for (JsonNode document : docs) { // Blocks for each transfer till it is processed by the poll method. // If we fail before checkpointing then the new worker starts again. @@ -264,6 +298,7 @@ protected void handleCosmosDbChanges(List docs) { if (docs.size() > 0) { // it is important to flush the current batches to kafka as currently we are using lease container continuationToken for book marking // so we would only want to move ahead of the book marking when all the records have been returned to kafka + logger.debug("Worker {}, shouldFillMoreRecords {}", this.config.getWorkerName(), false); this.shouldFillMoreRecords.set(false); } } From cb89e690c2837fd98fabd2ff77f86f5af3874545 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 27 Feb 2024 21:47:15 -0800 Subject: [PATCH 2/5] change --- .../connect/source/CosmosDBSourceTask.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java index 2d7c3ce3..1b24a9c0 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java @@ -63,11 +63,11 @@ public String version() { @Override public void start(Map map) { - logger.info("Starting CosmosDBSourceTask."); + logger.info("Worker {} Starting CosmosDBSourceTask.", this.config.getWorkerName()); config = new CosmosDBSourceConfig(map); this.queue = new LinkedTransferQueue<>(); - logger.info("Creating the client."); + logger.info("Worker {} Creating the client.", this.config.getWorkerName()); client = getCosmosClient(config); // Initialize the database, feed and lease containers @@ -102,7 +102,7 @@ public void start(Map map) { } } // Wait for ChangeFeedProcessor to start. - logger.info("Started CosmosDB source task."); + logger.info("Worker {} Started CosmosDB source task.", this.config.getWorkerName()); } private String getItemLsn(JsonNode item) { @@ -172,9 +172,6 @@ private void fillRecords(List records, String topic) throws Interr // Get the latest lsn and record as offset Map sourceOffset = singletonMap(OFFSET_KEY, getItemLsn(node)); - if (logger.isDebugEnabled()) { - logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY)); - } // Convert JSON to Kafka Connect struct and JSON schema SchemaAndValue schemaAndValue = jsonToStruct.recordToSchemaAndValue(node); @@ -193,13 +190,16 @@ private void fillRecords(List records, String topic) throws Interr } else { // If the buffer Size exceeds then do not remove the node. if (logger.isDebugEnabled()) { - logger.debug("Adding record back to the queue since adding it exceeds the allowed buffer size {}", config.getTaskBufferSize()); + logger.debug( + "Worker {} Adding record back to the queue since adding it exceeds the allowed buffer size {}", + this.config.getWorkerName(), + config.getTaskBufferSize()); } this.queue.add(node); break; } } catch (Exception e) { - logger.error("Failed to fill Source Records for Topic {}", topic); + logger.error("Worker {} Failed to fill Source Records for Topic {}", this.config.getWorkerName(), topic); throw e; } } @@ -207,7 +207,7 @@ private void fillRecords(List records, String topic) throws Interr @Override public void stop() { - logger.info("Stopping CosmosDB source task."); + logger.info("Worker {} Stopping CosmosDB source task.", this.config.getWorkerName()); // NOTE: poll() method and stop() method are both called from the same thread, // so it is important not to include any changes which may block both places forever running.set(false); @@ -217,10 +217,14 @@ public void stop() { changeFeedProcessor.stop().block(); changeFeedProcessor = null; } + + if (this.client != null) { + this.client.close(); + } } private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) { - logger.info("Creating Cosmos Client."); + logger.info("Worker {} Creating Cosmos Client.", this.config.getWorkerName()); CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder() .endpoint(config.getConnEndpoint()) @@ -271,7 +275,7 @@ protected void handleCosmosDbChanges(List docs) { .map(jsonNode -> jsonNode.get("id").asText()) .collect(Collectors.toList()); logger.debug( - "handleCosmosDbChanges - Worker {}, docIds {}, Details [{}].", + "handleCosmosDbChanges - Worker {}, total docs {}, Details [{}].", this.config.getWorkerName(), docIds.size(), docIds); From 10a9eabb80a2cf3257918aa5432d4704aac7dea5 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 27 Feb 2024 21:52:24 -0800 Subject: [PATCH 3/5] update changelog --- CHANGELOG.md | 4 ++-- pom.xml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e351ccc..8f9f87bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,10 @@ * Updated `azure-cosmos` version to 4.56.0. #### Key Bug Fixes -* Fixed `NullPointerException` in `CosmosDBSinkConnector` when `TRACE` level log is enabled and `SinkRecord` value being null +* Fixed `NullPointerException` in `CosmosDBSinkConnector` when `TRACE` level log is enabled and `SinkRecord` value being null. [PR 549](https://github.com/microsoft/kafka-connect-cosmosdb/pull/549) #### Other Changes -* Added more DEBUG level logs in `CosmosDBSourceConnector` +* Added more DEBUG level logs in `CosmosDBSourceConnector`. [PR 549](https://github.com/microsoft/kafka-connect-cosmosdb/pull/549) ### 1.13.0 (2024-01-25) #### New Features diff --git a/pom.xml b/pom.xml index 200b9029..61f90e50 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.azure.cosmos.kafka kafka-connect-cosmos - 1.13.0 + 1.14.0 kafka-connect-cosmos https://github.com/microsoft/kafka-connect-cosmosdb From de1d36ab27a4c83e3c123b7be584d3cce93758fc Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 27 Feb 2024 21:55:13 -0800 Subject: [PATCH 4/5] minor changes --- .../azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java index 1b24a9c0..9259bf79 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java @@ -132,11 +132,11 @@ public List poll() throws InterruptedException { List recordDetails = records .stream() - .map(sourceRecord -> String.format("key %s, offset %s", sourceRecord.key(), sourceRecord.sourceOffset())) + .map(sourceRecord -> String.format("[key %s - offset %s]", sourceRecord.key(), sourceRecord.sourceOffset())) .collect(Collectors.toList()); logger.debug( - "Worker {}, sending docs {}", + "Worker {}, sending {} documents", this.config.getWorkerName(), recordDetails ); From 1024555cf10385f0011e9611917c419a5a9aafc7 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Wed, 28 Feb 2024 13:31:37 +0100 Subject: [PATCH 5/5] Update CosmosDBSourceTask.java --- .../azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java index 9259bf79..0bc8d426 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java @@ -272,7 +272,7 @@ protected void handleCosmosDbChanges(List docs) { List docIds = docs .stream() - .map(jsonNode -> jsonNode.get("id").asText()) + .map(jsonNode -> jsonNode.get("id") != null ? jsonNode.get("id").asText() : "null") .collect(Collectors.toList()); logger.debug( "handleCosmosDbChanges - Worker {}, total docs {}, Details [{}].", @@ -285,7 +285,7 @@ protected void handleCosmosDbChanges(List docs) { // Blocks for each transfer till it is processed by the poll method. // If we fail before checkpointing then the new worker starts again. try { - logger.trace("Queuing document"); + logger.trace("Queuing document {}", document.get("id") != null ? document.get("id").asText() : "null"); // The item is being transferred to the queue, and the method will only return if the item has been polled from the queue. // The queue is being continuously polled and then put into a batch list, but the batch list is not being flushed right away