Skip to content

Commit

Permalink
Merge pull request #545 from xinlian12/fixSourceConnectorDeadlockIssue
Browse files Browse the repository at this point in the history
fixInfiniteLoopIssueWhenSourceTaskGotCancelled
  • Loading branch information
xinlian12 authored Jan 25, 2024
2 parents a2fb2b0 + c594c79 commit 64709c1
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## Release History
### 1.13.0-beta.1 (Unreleased)
#### Other Changes
* Fixed an issue where source connector can be stuck in an infinite loop when task got cancelled. [PR 545](https://github.com/microsoft/kafka-connect-cosmosdb/pull/545)

### 1.12.0 (2023-12-18)
#### New Features
* Updated `azure-cosmos` version to 4.53.1.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.azure.cosmos.kafka</groupId>
<artifactId>kafka-connect-cosmos</artifactId>
<version>1.12.0</version>
<version>1.13.0-beta.1</version>

<name> kafka-connect-cosmos</name>
<url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.implementations;

import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class CosmosKafkaSchedulers {
private static final String COSMOS_KAFKA_CFP_THREAD_NAME = "cosmos-kafka-cfp-bounded-elastic";
private static final int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS

// Custom bounded elastic scheduler for kafka connector
public static final Scheduler COSMOS_KAFKA_CFP_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
COSMOS_KAFKA_CFP_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
true
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.kafka.connect.CosmosDBConfig;
import com.azure.cosmos.kafka.connect.TopicContainerMap;
import com.azure.cosmos.kafka.connect.implementations.CosmosKafkaSchedulers;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
Expand All @@ -25,7 +26,6 @@
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -87,7 +87,7 @@ public void start(Map<String, String> map) {
// Initiate Cosmos change feed processor
changeFeedProcessor = getChangeFeedProcessor(config.getWorkerName(), feedContainer, leaseContainer, config.useLatestOffset());
changeFeedProcessor.start()
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(CosmosKafkaSchedulers.COSMOS_KAFKA_CFP_BOUNDED_ELASTIC)
.doOnSuccess(aVoid -> running.set(true))
.subscribe();

Expand Down Expand Up @@ -191,21 +191,13 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
@Override
public void stop() {
logger.info("Stopping CosmosDB source task.");
while (!this.queue.isEmpty()) {
// Wait till the items are drained by poll before stopping.
try {
sleep(500);
} catch (InterruptedException e) {
logger.error("Interrupted! Failed to stop the task", e);
// Restore interrupted state...
Thread.currentThread().interrupt();
}
}
// NOTE: poll() method and stop() method are both called from the same thread,
// so it is important not to include any changes which may block both places forever
running.set(false);

// Release all the resources.
if (changeFeedProcessor != null) {
changeFeedProcessor.stop();
changeFeedProcessor.stop().block();
changeFeedProcessor = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void setup() throws IllegalAccessException {
//Mock query results and iterator for getting lease container token
CosmosPagedFlux<JsonNode> mockLeaseQueryResults = (CosmosPagedFlux<JsonNode>) Mockito.mock(CosmosPagedFlux.class);
when(mockLeaseContainer.queryItems(anyString(), any(), eq(JsonNode.class))).thenReturn(mockLeaseQueryResults);

Iterable<JsonNode> mockLeaseQueryIterable = (Iterable<JsonNode>) Mockito.mock(Iterable.class);
when(mockLeaseQueryResults.toIterable()).thenReturn(mockLeaseQueryIterable);

Expand All @@ -107,9 +107,9 @@ public void testHandleChanges() throws JsonProcessingException, IllegalAccessExc
new Thread(() -> {
testTask.handleCosmosDbChanges(changes);
}).start();

int recordCount = 0;
while(recordCount == 0) {
while (recordCount == 0) {
JsonNode jsonNode = this.queue.poll();
if (jsonNode != null) {
recordCount++;
Expand All @@ -135,7 +135,7 @@ public void testPoll() throws InterruptedException, JsonProcessingException, Ill
testTask.handleCosmosDbChanges(changes);
}).start();

List<SourceRecord> result=testTask.poll();
List<SourceRecord> result = testTask.poll();
Assert.assertEquals(1, result.size());
AtomicBoolean shouldFillMoreRecords =
(AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask);
Expand All @@ -162,7 +162,7 @@ public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, J
(AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask);
shouldFillMoreRecords.set(false);

List<SourceRecord> result=testTask.poll();
List<SourceRecord> result = testTask.poll();
Assert.assertEquals(0, result.size());
Assert.assertTrue(shouldFillMoreRecords.get());
}
Expand All @@ -179,7 +179,7 @@ public void testPollWithMessageKey() throws InterruptedException, JsonProcessing
testTask.handleCosmosDbChanges(changes);
}).start();

List<SourceRecord> result=testTask.poll();
List<SourceRecord> result = testTask.poll();
Assert.assertEquals(1, result.size());
Assert.assertEquals("123", result.get(0).key());
}
Expand Down Expand Up @@ -217,7 +217,7 @@ public void testZeroBatchSize() throws InterruptedException, JsonProcessingExcep
testTask.handleCosmosDbChanges(changes);
}).start();

List<SourceRecord> result=testTask.poll();
List<SourceRecord> result = testTask.poll();
Assert.assertEquals(0, result.size());
}

Expand All @@ -236,12 +236,11 @@ public void testSmallBufferSize() throws InterruptedException, JsonProcessingExc
testTask.handleCosmosDbChanges(changes);
}).start();

List<SourceRecord> result=testTask.poll();
List<SourceRecord> result = testTask.poll();
Assert.assertEquals(1, result.size());
}


@Test(expected=IllegalStateException.class)
@Test(expected = IllegalStateException.class)
public void testEmptyAssignedContainerThrowsIllegalStateException() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
Expand All @@ -258,4 +257,32 @@ public void testEmptyAssignedContainerThrowsIllegalStateException() throws Inter

testTask.poll();
}

@Test
public void testStop() throws JsonProcessingException, IllegalAccessException, InterruptedException {
// validate when stop() being called, even though the queue is not empty, it can still return successfully
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
changes.add(actualObj);

new Thread(() -> {
testTask.handleCosmosDbChanges(changes);
}).start();

Thread.sleep(500); // give some time for the above task to run

testTask.stop();
AtomicBoolean isRunning =
(AtomicBoolean) FieldUtils.readField(
FieldUtils.getField(CosmosDBSourceTask.class, "running", true),
testTask
);
Assert.assertFalse(isRunning.get());

// revert back to running status to avoid interrupt other tests
FieldUtils.writeField(testTask, "running", new AtomicBoolean(true), true);
testTask.poll();
}
}

0 comments on commit 64709c1

Please sign in to comment.