diff --git a/CHANGELOG.md b/CHANGELOG.md
index 20d57581..dbb6364a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
## Release History
+### 1.13.0-beta.1 (Unreleased)
+#### Other Changes
+* Fixed an issue where source connector can be stuck in an infinite loop when task got cancelled. [PR 545](https://github.com/microsoft/kafka-connect-cosmosdb/pull/545)
+
### 1.12.0 (2023-12-18)
#### New Features
* Updated `azure-cosmos` version to 4.53.1.
diff --git a/pom.xml b/pom.xml
index e8da31ce..d7a799e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
com.azure.cosmos.kafka
kafka-connect-cosmos
- 1.12.0
+ 1.13.0-beta.1
kafka-connect-cosmos
https://github.com/microsoft/kafka-connect-cosmosdb
diff --git a/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosKafkaSchedulers.java b/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosKafkaSchedulers.java
new file mode 100644
index 00000000..de9ee8c4
--- /dev/null
+++ b/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosKafkaSchedulers.java
@@ -0,0 +1,21 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.kafka.connect.implementations;
+
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+public class CosmosKafkaSchedulers {
+ private static final String COSMOS_KAFKA_CFP_THREAD_NAME = "cosmos-kafka-cfp-bounded-elastic";
+ private static final int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS
+
+ // Custom bounded elastic scheduler for kafka connector
+ public static final Scheduler COSMOS_KAFKA_CFP_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
+ Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
+ Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
+ COSMOS_KAFKA_CFP_THREAD_NAME,
+ TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
+ true
+ );
+}
diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
index bb87e758..17ccbd25 100644
--- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
+++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
@@ -13,6 +13,7 @@
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.kafka.connect.CosmosDBConfig;
import com.azure.cosmos.kafka.connect.TopicContainerMap;
+import com.azure.cosmos.kafka.connect.implementations.CosmosKafkaSchedulers;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
@@ -25,7 +26,6 @@
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.ArrayList;
@@ -87,7 +87,7 @@ public void start(Map map) {
// Initiate Cosmos change feed processor
changeFeedProcessor = getChangeFeedProcessor(config.getWorkerName(), feedContainer, leaseContainer, config.useLatestOffset());
changeFeedProcessor.start()
- .subscribeOn(Schedulers.boundedElastic())
+ .subscribeOn(CosmosKafkaSchedulers.COSMOS_KAFKA_CFP_BOUNDED_ELASTIC)
.doOnSuccess(aVoid -> running.set(true))
.subscribe();
@@ -191,21 +191,13 @@ private void fillRecords(List records, String topic) throws Interr
@Override
public void stop() {
logger.info("Stopping CosmosDB source task.");
- while (!this.queue.isEmpty()) {
- // Wait till the items are drained by poll before stopping.
- try {
- sleep(500);
- } catch (InterruptedException e) {
- logger.error("Interrupted! Failed to stop the task", e);
- // Restore interrupted state...
- Thread.currentThread().interrupt();
- }
- }
+ // NOTE: poll() method and stop() method are both called from the same thread,
+ // so it is important not to include any changes which may block both places forever
running.set(false);
// Release all the resources.
if (changeFeedProcessor != null) {
- changeFeedProcessor.stop();
+ changeFeedProcessor.stop().block();
changeFeedProcessor = null;
}
}
diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
index fe6c05d6..dd5b0849 100644
--- a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
+++ b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
@@ -82,7 +82,7 @@ public void setup() throws IllegalAccessException {
//Mock query results and iterator for getting lease container token
CosmosPagedFlux mockLeaseQueryResults = (CosmosPagedFlux) Mockito.mock(CosmosPagedFlux.class);
when(mockLeaseContainer.queryItems(anyString(), any(), eq(JsonNode.class))).thenReturn(mockLeaseQueryResults);
-
+
Iterable mockLeaseQueryIterable = (Iterable) Mockito.mock(Iterable.class);
when(mockLeaseQueryResults.toIterable()).thenReturn(mockLeaseQueryIterable);
@@ -107,9 +107,9 @@ public void testHandleChanges() throws JsonProcessingException, IllegalAccessExc
new Thread(() -> {
testTask.handleCosmosDbChanges(changes);
}).start();
-
+
int recordCount = 0;
- while(recordCount == 0) {
+ while (recordCount == 0) {
JsonNode jsonNode = this.queue.poll();
if (jsonNode != null) {
recordCount++;
@@ -135,7 +135,7 @@ public void testPoll() throws InterruptedException, JsonProcessingException, Ill
testTask.handleCosmosDbChanges(changes);
}).start();
- List result=testTask.poll();
+ List result = testTask.poll();
Assert.assertEquals(1, result.size());
AtomicBoolean shouldFillMoreRecords =
(AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask);
@@ -162,7 +162,7 @@ public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, J
(AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask);
shouldFillMoreRecords.set(false);
- List result=testTask.poll();
+ List result = testTask.poll();
Assert.assertEquals(0, result.size());
Assert.assertTrue(shouldFillMoreRecords.get());
}
@@ -179,7 +179,7 @@ public void testPollWithMessageKey() throws InterruptedException, JsonProcessing
testTask.handleCosmosDbChanges(changes);
}).start();
- List result=testTask.poll();
+ List result = testTask.poll();
Assert.assertEquals(1, result.size());
Assert.assertEquals("123", result.get(0).key());
}
@@ -217,7 +217,7 @@ public void testZeroBatchSize() throws InterruptedException, JsonProcessingExcep
testTask.handleCosmosDbChanges(changes);
}).start();
- List result=testTask.poll();
+ List result = testTask.poll();
Assert.assertEquals(0, result.size());
}
@@ -236,12 +236,11 @@ public void testSmallBufferSize() throws InterruptedException, JsonProcessingExc
testTask.handleCosmosDbChanges(changes);
}).start();
- List result=testTask.poll();
+ List result = testTask.poll();
Assert.assertEquals(1, result.size());
}
-
- @Test(expected=IllegalStateException.class)
+ @Test(expected = IllegalStateException.class)
public void testEmptyAssignedContainerThrowsIllegalStateException() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
@@ -258,4 +257,32 @@ public void testEmptyAssignedContainerThrowsIllegalStateException() throws Inter
testTask.poll();
}
+
+ @Test
+ public void testStop() throws JsonProcessingException, IllegalAccessException, InterruptedException {
+ // validate when stop() being called, even though the queue is not empty, it can still return successfully
+ String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode actualObj = mapper.readTree(jsonString);
+ List changes = new ArrayList<>();
+ changes.add(actualObj);
+
+ new Thread(() -> {
+ testTask.handleCosmosDbChanges(changes);
+ }).start();
+
+ Thread.sleep(500); // give some time for the above task to run
+
+ testTask.stop();
+ AtomicBoolean isRunning =
+ (AtomicBoolean) FieldUtils.readField(
+ FieldUtils.getField(CosmosDBSourceTask.class, "running", true),
+ testTask
+ );
+ Assert.assertFalse(isRunning.get());
+
+ // revert back to running status to avoid interrupt other tests
+ FieldUtils.writeField(testTask, "running", new AtomicBoolean(true), true);
+ testTask.poll();
+ }
}