From a63092c689ecf892e0f9e4cf0cc98dfcce816148 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Wed, 24 Jan 2024 08:37:35 -0800 Subject: [PATCH 1/5] change --- .../CosmosKafkaSchedulers.java | 21 +++++++++++++++++++ .../connect/source/CosmosDBSourceTask.java | 18 +++++----------- 2 files changed, 26 insertions(+), 13 deletions(-) create mode 100644 src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosKafkaSchedulers.java diff --git a/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosKafkaSchedulers.java b/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosKafkaSchedulers.java new file mode 100644 index 00000000..0d07c25e --- /dev/null +++ b/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosKafkaSchedulers.java @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.kafka.connect.implementations; + +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +public class CosmosKafkaSchedulers { + private final static String COSMOS_KAFKA_CFP_THREAD_NAME = "cosmos-kafka-cfp-bounded-elastic"; + private final static int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS + + // Custom bounded elastic scheduler for kafka connector + public final static Scheduler COSMOS_KAFKA_CFP_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( + Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + COSMOS_KAFKA_CFP_THREAD_NAME, + TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, + true + ); +} diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java index bb87e758..17ccbd25 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java @@ -13,6 +13,7 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.kafka.connect.CosmosDBConfig; import com.azure.cosmos.kafka.connect.TopicContainerMap; +import com.azure.cosmos.kafka.connect.implementations.CosmosKafkaSchedulers; import com.azure.cosmos.models.ChangeFeedProcessorOptions; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; @@ -25,7 +26,6 @@ import org.apache.kafka.connect.source.SourceTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.scheduler.Schedulers; import java.time.Duration; import java.util.ArrayList; @@ -87,7 +87,7 @@ public void start(Map map) { // Initiate Cosmos change feed processor changeFeedProcessor = getChangeFeedProcessor(config.getWorkerName(), feedContainer, leaseContainer, config.useLatestOffset()); changeFeedProcessor.start() - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(CosmosKafkaSchedulers.COSMOS_KAFKA_CFP_BOUNDED_ELASTIC) .doOnSuccess(aVoid -> running.set(true)) .subscribe(); @@ -191,21 +191,13 @@ private void fillRecords(List records, String topic) throws Interr @Override public void stop() { logger.info("Stopping CosmosDB source task."); - while (!this.queue.isEmpty()) { - // Wait till the items are drained by poll before stopping. - try { - sleep(500); - } catch (InterruptedException e) { - logger.error("Interrupted! Failed to stop the task", e); - // Restore interrupted state... - Thread.currentThread().interrupt(); - } - } + // NOTE: poll() method and stop() method are both called from the same thread, + // so it is important not to include any changes which may block both places forever running.set(false); // Release all the resources. if (changeFeedProcessor != null) { - changeFeedProcessor.stop(); + changeFeedProcessor.stop().block(); changeFeedProcessor = null; } } From d5e0816c10ac202a755c0ad2ae7c605775cc083b Mon Sep 17 00:00:00 2001 From: annie-mac Date: Wed, 24 Jan 2024 08:42:43 -0800 Subject: [PATCH 2/5] update changelog --- CHANGELOG.md | 4 ++++ pom.xml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 20d57581..dbb6364a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ ## Release History +### 1.13.0-beta.1 (Unreleased) +#### Other Changes +* Fixed an issue where source connector can be stuck in an infinite loop when task got cancelled. [PR 545](https://github.com/microsoft/kafka-connect-cosmosdb/pull/545) + ### 1.12.0 (2023-12-18) #### New Features * Updated `azure-cosmos` version to 4.53.1. diff --git a/pom.xml b/pom.xml index e8da31ce..d7a799e9 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.azure.cosmos.kafka kafka-connect-cosmos - 1.12.0 + 1.13.0-beta.1 kafka-connect-cosmos https://github.com/microsoft/kafka-connect-cosmosdb From 7036767570f436152ff851c818ad99902a02bb0a Mon Sep 17 00:00:00 2001 From: annie-mac Date: Wed, 24 Jan 2024 09:13:14 -0800 Subject: [PATCH 3/5] fix tests --- .../source/CosmosDBSourceTaskTest.java | 45 ++++++++++++++----- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java index fe6c05d6..d827b54f 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java @@ -82,7 +82,7 @@ public void setup() throws IllegalAccessException { //Mock query results and iterator for getting lease container token CosmosPagedFlux mockLeaseQueryResults = (CosmosPagedFlux) Mockito.mock(CosmosPagedFlux.class); when(mockLeaseContainer.queryItems(anyString(), any(), eq(JsonNode.class))).thenReturn(mockLeaseQueryResults); - + Iterable mockLeaseQueryIterable = (Iterable) Mockito.mock(Iterable.class); when(mockLeaseQueryResults.toIterable()).thenReturn(mockLeaseQueryIterable); @@ -107,9 +107,9 @@ public void testHandleChanges() throws JsonProcessingException, IllegalAccessExc new Thread(() -> { testTask.handleCosmosDbChanges(changes); }).start(); - + int recordCount = 0; - while(recordCount == 0) { + while (recordCount == 0) { JsonNode jsonNode = this.queue.poll(); if (jsonNode != null) { recordCount++; @@ -135,7 +135,7 @@ public void testPoll() throws InterruptedException, JsonProcessingException, Ill testTask.handleCosmosDbChanges(changes); }).start(); - List result=testTask.poll(); + List result = testTask.poll(); Assert.assertEquals(1, result.size()); AtomicBoolean shouldFillMoreRecords = (AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask); @@ -162,7 +162,7 @@ public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, J (AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask); shouldFillMoreRecords.set(false); - List result=testTask.poll(); + List result = testTask.poll(); Assert.assertEquals(0, result.size()); Assert.assertTrue(shouldFillMoreRecords.get()); } @@ -179,7 +179,7 @@ public void testPollWithMessageKey() throws InterruptedException, JsonProcessing testTask.handleCosmosDbChanges(changes); }).start(); - List result=testTask.poll(); + List result = testTask.poll(); Assert.assertEquals(1, result.size()); Assert.assertEquals("123", result.get(0).key()); } @@ -217,7 +217,7 @@ public void testZeroBatchSize() throws InterruptedException, JsonProcessingExcep testTask.handleCosmosDbChanges(changes); }).start(); - List result=testTask.poll(); + List result = testTask.poll(); Assert.assertEquals(0, result.size()); } @@ -236,12 +236,11 @@ public void testSmallBufferSize() throws InterruptedException, JsonProcessingExc testTask.handleCosmosDbChanges(changes); }).start(); - List result=testTask.poll(); + List result = testTask.poll(); Assert.assertEquals(1, result.size()); } - - @Test(expected=IllegalStateException.class) + @Test(expected = IllegalStateException.class) public void testEmptyAssignedContainerThrowsIllegalStateException() throws InterruptedException, JsonProcessingException, IllegalAccessException { String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}"; ObjectMapper mapper = new ObjectMapper(); @@ -258,4 +257,30 @@ public void testEmptyAssignedContainerThrowsIllegalStateException() throws Inter testTask.poll(); } + + @Test + public void testStop() throws JsonProcessingException, IllegalAccessException, InterruptedException { + // validate when stop() being called, even though the queue is not empty, it can still return successfully + String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}"; + ObjectMapper mapper = new ObjectMapper(); + JsonNode actualObj = mapper.readTree(jsonString); + List changes = new ArrayList<>(); + changes.add(actualObj); + + new Thread(() -> { + testTask.handleCosmosDbChanges(changes); + }).start(); + + testTask.stop(); + AtomicBoolean isRunning = + (AtomicBoolean) FieldUtils.readField( + FieldUtils.getField(CosmosDBSourceTask.class, "running", true), + testTask + ); + Assert.assertFalse(isRunning.get()); + + // revert back to running status to avoid interrupt other tests + FieldUtils.writeField(testTask, "running", new AtomicBoolean(true), true); + testTask.poll(); + } } From 4d0a2c2b0a7281a2e6a40791490a55cc837d4314 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Wed, 24 Jan 2024 09:24:43 -0800 Subject: [PATCH 4/5] update tests --- .../cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java index d827b54f..dd5b0849 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java @@ -271,6 +271,8 @@ public void testStop() throws JsonProcessingException, IllegalAccessException, I testTask.handleCosmosDbChanges(changes); }).start(); + Thread.sleep(500); // give some time for the above task to run + testTask.stop(); AtomicBoolean isRunning = (AtomicBoolean) FieldUtils.readField( From c594c791889d379f3132a17129c74cfdcc8ed254 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Wed, 24 Jan 2024 09:35:28 -0800 Subject: [PATCH 5/5] fix compiling issues --- .../connect/implementations/CosmosKafkaSchedulers.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosKafkaSchedulers.java b/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosKafkaSchedulers.java index 0d07c25e..de9ee8c4 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosKafkaSchedulers.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosKafkaSchedulers.java @@ -7,11 +7,11 @@ import reactor.core.scheduler.Schedulers; public class CosmosKafkaSchedulers { - private final static String COSMOS_KAFKA_CFP_THREAD_NAME = "cosmos-kafka-cfp-bounded-elastic"; - private final static int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS + private static final String COSMOS_KAFKA_CFP_THREAD_NAME = "cosmos-kafka-cfp-bounded-elastic"; + private static final int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS // Custom bounded elastic scheduler for kafka connector - public final static Scheduler COSMOS_KAFKA_CFP_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( + public static final Scheduler COSMOS_KAFKA_CFP_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, COSMOS_KAFKA_CFP_THREAD_NAME,