From 04ecc413fffc74a80c1f3da77e780a7a708aeb1c Mon Sep 17 00:00:00 2001 From: ren ran Date: Fri, 26 Jan 2024 20:55:40 +0800 Subject: [PATCH 1/3] Avoid NPE in MultiNodePipelineBase.java should get connection first and then create new pipeline queue, otherwise it would cause NPE when timeout for getting connection and call sync() method --- .../java/redis/clients/jedis/MultiNodePipelineBase.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index a01ff4d1bb..13f2730ab4 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -63,9 +63,6 @@ protected final Response appendCommand(CommandObject commandObject) { queue = pipelinedResponses.get(nodeKey); connection = connections.get(nodeKey); } else { - pipelinedResponses.putIfAbsent(nodeKey, new LinkedList<>()); - queue = pipelinedResponses.get(nodeKey); - Connection newOne = getConnection(nodeKey); connections.putIfAbsent(nodeKey, newOne); connection = connections.get(nodeKey); @@ -73,6 +70,9 @@ protected final Response appendCommand(CommandObject commandObject) { log.debug("Duplicate connection to {}, closing it.", nodeKey); IOUtils.closeQuietly(newOne); } + + pipelinedResponses.putIfAbsent(nodeKey, new LinkedList<>()); + queue = pipelinedResponses.get(nodeKey); } connection.sendCommand(commandObject.getArguments()); From 40f402ed4cbd74b53cdf2ad3b29e1ec7858045e6 Mon Sep 17 00:00:00 2001 From: ren ran Date: Sun, 28 Jan 2024 20:46:21 +0800 Subject: [PATCH 2/3] pipelinedResponses should be clear pipelinedResponses should be clear in MultiNodePipelineBase.java and avoid this pipeline object reporting ENP when executing close() and reuse --- src/main/java/redis/clients/jedis/MultiNodePipelineBase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 13f2730ab4..33c1418592 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -86,6 +86,7 @@ public void close() { try { sync(); } finally { + pipelinedResponses.clear(); connections.values().forEach(IOUtils::closeQuietly); } } From 4c3be6d6f9401fe914e6d9ef1f6da378b3ffac46 Mon Sep 17 00:00:00 2001 From: ren ran Date: Sat, 3 Aug 2024 20:31:53 +0800 Subject: [PATCH 3/3] refresh connection for pipeline when getting JedisMovedDataException --- src/main/java/redis/clients/jedis/ClusterPipeline.java | 5 +++++ .../java/redis/clients/jedis/MultiNodePipelineBase.java | 7 ++++++- src/main/java/redis/clients/jedis/ShardedPipeline.java | 5 +++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/ClusterPipeline.java b/src/main/java/redis/clients/jedis/ClusterPipeline.java index a122b41688..0a51f5dec6 100644 --- a/src/main/java/redis/clients/jedis/ClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/ClusterPipeline.java @@ -72,6 +72,11 @@ protected Connection getConnection(HostAndPort nodeKey) { return provider.getConnection(nodeKey); } + @Override + protected void refreshConnection() { + provider.renewSlotCache(); + } + public Response spublish(String channel, String message) { return appendCommand(commandObjects.spublish(channel, message)); } diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 33c1418592..2c065fc6e1 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory; import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.jedis.graph.GraphCommandObjects; import redis.clients.jedis.providers.ConnectionProvider; import redis.clients.jedis.util.IOUtils; @@ -53,6 +54,8 @@ protected final void prepareGraphCommands(ConnectionProvider connectionProvider) protected abstract Connection getConnection(HostAndPort nodeKey); + protected abstract void refreshConnection(); + @Override protected final Response appendCommand(CommandObject commandObject) { HostAndPort nodeKey = getNodeKey(commandObject.getArguments()); @@ -86,7 +89,6 @@ public void close() { try { sync(); } finally { - pipelinedResponses.clear(); connections.values().forEach(IOUtils::closeQuietly); } } @@ -112,6 +114,9 @@ public final void sync() { try { List unformatted = connection.getMany(queue.size()); for (Object o : unformatted) { + if (o instanceof JedisMovedDataException) { + refreshConnection(); + } queue.poll().set(o); } } catch (JedisConnectionException jce) { diff --git a/src/main/java/redis/clients/jedis/ShardedPipeline.java b/src/main/java/redis/clients/jedis/ShardedPipeline.java index 8ad89cc59e..cedf17ee36 100644 --- a/src/main/java/redis/clients/jedis/ShardedPipeline.java +++ b/src/main/java/redis/clients/jedis/ShardedPipeline.java @@ -58,6 +58,11 @@ protected Connection getConnection(HostAndPort nodeKey) { return provider.getConnection(nodeKey); } + @Override + protected void refreshConnection() { + // do nothing + } + /** * This method must be called after constructor, if graph commands are going to be used. */