diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java index be61c1c8e..4dbca2251 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java @@ -156,7 +156,10 @@ public void start(final Map parsedConfig) public void stop() { LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:stop", this.id)); - // do nothing + if (this.sink != null) + { + this.sink.setIsStoppedToTrue(); // close cleaner thread + } } /** diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java index 3311a6154..7d5b6ed9b 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java @@ -66,6 +66,11 @@ public interface SnowflakeSinkService */ void close(Collection partitions); + /** + * close all cleaner thread but have no effect on sink service context + */ + void setIsStoppedToTrue(); + /** * retrieve sink service status * @return true is closed diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index da181de91..f06991c99 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -174,6 +174,12 @@ public void closeAll() pipes.clear(); } + @Override + public void setIsStoppedToTrue() + { + this.isStopped = true; // release all cleaner and flusher threads + } + @Override public boolean isClosed() {