From 8e1de6651689e516d955f7ec950d0efc426c406d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8A=E6=85=A7=E5=88=A9?= Date: Mon, 12 Aug 2024 17:54:33 +0800 Subject: [PATCH] fix: refresh leader on disconnect (#1140) --- .../error/ConnectionFailureException.java | 45 +++++++++++++++++++ .../sofa/jraft/rpc/impl/BoltRpcClient.java | 9 ++++ .../sofa/jraft/util/internal/ThrowUtil.java | 10 ++++- .../rhea/client/DefaultRheaKVRpcService.java | 18 ++++---- .../pd/DefaultPlacementDriverRpcService.java | 18 ++++---- 5 files changed, 81 insertions(+), 19 deletions(-) create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/error/ConnectionFailureException.java diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/error/ConnectionFailureException.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/error/ConnectionFailureException.java new file mode 100644 index 000000000..ea6b28deb --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/error/ConnectionFailureException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.error; + +/** + * Rpc connection failure exception. + */ +public class ConnectionFailureException extends RemotingException { + private static final long serialVersionUID = -5958618149334588246L; + + public ConnectionFailureException() { + super(); + } + + public ConnectionFailureException(String message) { + super(message); + } + + public ConnectionFailureException(String message, Throwable cause) { + super(message, cause); + } + + public ConnectionFailureException(Throwable cause) { + super(cause); + } + + public ConnectionFailureException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java index 6a45ca65f..3f913cd26 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java @@ -16,6 +16,7 @@ */ package com.alipay.sofa.jraft.rpc.impl; +import java.net.ConnectException; import java.util.Map; import java.util.concurrent.Executor; @@ -23,6 +24,7 @@ import com.alipay.remoting.RejectedExecutionPolicy; import com.alipay.remoting.config.BoltClientOption; import com.alipay.sofa.jraft.ReplicatorGroup; +import com.alipay.sofa.jraft.error.ConnectionFailureException; import com.alipay.sofa.jraft.error.InvokeTimeoutException; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.option.RpcOptions; @@ -32,6 +34,7 @@ import com.alipay.sofa.jraft.rpc.impl.core.ClientServiceConnectionEventProcessor; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Requires; +import com.alipay.sofa.jraft.util.internal.ThrowUtil; /** * Bolt rpc client impl. @@ -100,6 +103,9 @@ public Object invokeSync(final Endpoint endpoint, final Object request, final In } catch (final com.alipay.remoting.rpc.exception.InvokeTimeoutException e) { throw new InvokeTimeoutException(e); } catch (final com.alipay.remoting.exception.RemotingException e) { + if (ThrowUtil.getRootCause(e) instanceof ConnectException) { + throw new ConnectionFailureException(e); + } throw new RemotingException(e); } } @@ -115,6 +121,9 @@ public void invokeAsync(final Endpoint endpoint, final Object request, final Inv } catch (final com.alipay.remoting.rpc.exception.InvokeTimeoutException e) { throw new InvokeTimeoutException(e); } catch (final com.alipay.remoting.exception.RemotingException e) { + if (ThrowUtil.getRootCause(e) instanceof ConnectException) { + throw new ConnectionFailureException(e); + } throw new RemotingException(e); } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/internal/ThrowUtil.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/internal/ThrowUtil.java index c179a89ad..1e7eaef9c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/internal/ThrowUtil.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/internal/ThrowUtil.java @@ -61,11 +61,19 @@ public static T cutCause(final T cause) { if (rootCause != cause) { cause.setStackTrace(rootCause.getStackTrace()); - causeUpdater.set(cause, cause); + causeUpdater.set(cause, rootCause); } return cause; } + public static Throwable getRootCause(final Throwable cause) { + Throwable rootCause = cause; + while (rootCause.getCause() != null) { + rootCause = rootCause.getCause(); + } + return rootCause; + } + private ThrowUtil() { } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVRpcService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVRpcService.java index 23b23cac3..b376730fd 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVRpcService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVRpcService.java @@ -24,8 +24,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alipay.remoting.exception.RemotingException; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.error.ConnectionFailureException; import com.alipay.sofa.jraft.rhea.client.failover.FailoverClosure; import com.alipay.sofa.jraft.rhea.client.pd.AbstractPlacementDriverClient; import com.alipay.sofa.jraft.rhea.client.pd.PlacementDriverClient; @@ -140,12 +140,7 @@ public void complete(final Object result, final Throwable err) { closure.run(new Status(-1, "RPC failed with address: %s, response: %s", endpoint, response)); } } else { - if (err instanceof RemotingException) { - closure.setError(Errors.RPC_CONNECTION_ERROR); - closure.run(new Status(-1, "RPC failed occur exception %s", err.getMessage())); - } else { - closure.failure(err); - } + closure.failure(err); } } @@ -157,8 +152,13 @@ public Executor executor() { try { this.rpcClient.invokeAsync(endpoint, request, invokeCtx, invokeCallback, this.rpcTimeoutMillis); - } catch (final Throwable t) { - closure.failure(t); + } catch (final Throwable err) { + if (err instanceof ConnectionFailureException) { + closure.setError(Errors.RPC_CONNECTION_ERROR); + closure.run(new Status(-1, "RPC failed occur exception %s", err.getMessage())); + } else { + closure.failure(err); + } } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/DefaultPlacementDriverRpcService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/DefaultPlacementDriverRpcService.java index 2bcd14c71..817114898 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/DefaultPlacementDriverRpcService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/DefaultPlacementDriverRpcService.java @@ -24,8 +24,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alipay.remoting.exception.RemotingException; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.error.ConnectionFailureException; import com.alipay.sofa.jraft.rhea.client.failover.FailoverClosure; import com.alipay.sofa.jraft.rhea.cmd.pd.BaseRequest; import com.alipay.sofa.jraft.rhea.cmd.pd.BaseResponse; @@ -111,12 +111,7 @@ public void complete(final Object result, final Throwable err) { closure.run(new Status(-1, "RPC failed with address: %s, response: %s", endpoint, response)); } } else { - if (err instanceof RemotingException) { - closure.setError(Errors.RPC_CONNECTION_ERROR); - closure.run(new Status(-1, "RPC failed occur exception %s", err.getMessage())); - } else { - closure.failure(err); - } + closure.failure(err); } } @@ -128,8 +123,13 @@ public Executor executor() { try { this.rpcClient.invokeAsync(endpoint, request, invokeCtx, invokeCallback, this.rpcTimeoutMillis); - } catch (final Throwable t) { - closure.failure(t); + } catch (final Throwable err) { + if (err instanceof ConnectionFailureException) { + closure.setError(Errors.RPC_CONNECTION_ERROR); + closure.run(new Status(-1, "RPC failed occur exception %s", err.getMessage())); + } else { + closure.failure(err); + } } }