diff --git a/pom.xml b/pom.xml index 29e5609..e6b821b 100644 --- a/pom.xml +++ b/pom.xml @@ -37,7 +37,7 @@ under the License. io.github.jeff-zou flink-connector-redis - 1.2.3 + 1.2.4 jar diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java index 94da829..446a6ef 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java @@ -4,6 +4,7 @@ import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; +import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -405,4 +406,9 @@ public RedisFuture> hgetAll(String key) { } return result; } + + @Override + public RedisClusterAsyncCommands getAsyncCommands() { + return clusterAsyncCommands; + } } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java index 9c96430..fd95331 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java @@ -1,6 +1,7 @@ package org.apache.flink.streaming.connectors.redis.common.container; import io.lettuce.core.RedisFuture; +import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; import java.io.IOException; import java.io.Serializable; @@ -212,4 +213,11 @@ public interface RedisCommandsContainer extends Serializable { * @param value */ void srem(String setName, String value); + + /** + * get redis async commands. + * + * @return + */ + RedisClusterAsyncCommands getAsyncCommands(); } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java index 72fff43..1d79d8e 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java @@ -4,6 +4,7 @@ import io.lettuce.core.RedisFuture; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -421,4 +422,9 @@ public void srem(String setName, String value) { throw e; } } + + @Override + public RedisClusterAsyncCommands getAsyncCommands() { + return asyncCommands; + } }