diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index bb0fe352285..f9843cc0231 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -94,6 +94,8 @@ public class ClientConfig { private boolean sendLatencyEnable = Boolean.parseBoolean(System.getProperty(SEND_LATENCY_ENABLE, "false")); private boolean startDetectorEnable = Boolean.parseBoolean(System.getProperty(START_DETECTOR_ENABLE, "false")); + private boolean enableHeartbeatChannelEventListener = true; + public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); @@ -201,6 +203,7 @@ public void resetClientConfig(final ClientConfig cc) { this.useHeartbeatV2 = cc.useHeartbeatV2; this.startDetectorEnable = cc.startDetectorEnable; this.sendLatencyEnable = cc.sendLatencyEnable; + this.enableHeartbeatChannelEventListener = cc.enableHeartbeatChannelEventListener; this.detectInterval = cc.detectInterval; this.detectTimeout = cc.detectTimeout; } @@ -228,6 +231,7 @@ public ClientConfig cloneClientConfig() { cc.enableStreamRequestType = enableStreamRequestType; cc.useHeartbeatV2 = useHeartbeatV2; cc.startDetectorEnable = startDetectorEnable; + cc.enableHeartbeatChannelEventListener = enableHeartbeatChannelEventListener; cc.sendLatencyEnable = sendLatencyEnable; cc.detectInterval = detectInterval; cc.detectTimeout = detectTimeout; @@ -418,6 +422,14 @@ public void setStartDetectorEnable(boolean startDetectorEnable) { this.startDetectorEnable = startDetectorEnable; } + public boolean isEnableHeartbeatChannelEventListener() { + return enableHeartbeatChannelEventListener; + } + + public void setEnableHeartbeatChannelEventListener(boolean enableHeartbeatChannelEventListener) { + this.enableHeartbeatChannelEventListener = enableHeartbeatChannelEventListener; + } + public int getDetectTimeout() { return this.detectTimeout; } @@ -444,19 +456,34 @@ public void setUseHeartbeatV2(boolean useHeartbeatV2) { @Override public String toString() { - return "ClientConfig [namesrvAddr=" + namesrvAddr - + ", clientIP=" + clientIP + ", instanceName=" + instanceName - + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads - + ", pollNameServerInterval=" + pollNameServerInterval - + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval - + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval - + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException - + ", unitMode=" + unitMode + ", unitName=" + unitName - + ", vipChannelEnabled=" + vipChannelEnabled + ", useTLS=" + useTLS - + ", socksProxyConfig=" + socksProxyConfig + ", language=" + language.name() - + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout - + ", decodeReadBody=" + decodeReadBody + ", decodeDecompressBody=" + decodeDecompressBody - + ", sendLatencyEnable=" + sendLatencyEnable + ", startDetectorEnable=" + startDetectorEnable - + ", enableStreamRequestType=" + enableStreamRequestType + ", useHeartbeatV2=" + useHeartbeatV2 + "]"; + return "ClientConfig{" + + "namesrvAddr='" + namesrvAddr + '\'' + + ", clientIP='" + clientIP + '\'' + + ", instanceName='" + instanceName + '\'' + + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + + ", namespace='" + namespace + '\'' + + ", namespaceInitialized=" + namespaceInitialized + + ", accessChannel=" + accessChannel + + ", pollNameServerInterval=" + pollNameServerInterval + + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval + + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + + ", unitMode=" + unitMode + + ", unitName='" + unitName + '\'' + + ", decodeReadBody=" + decodeReadBody + + ", decodeDecompressBody=" + decodeDecompressBody + + ", vipChannelEnabled=" + vipChannelEnabled + + ", useHeartbeatV2=" + useHeartbeatV2 + + ", useTLS=" + useTLS + + ", socksProxyConfig='" + socksProxyConfig + '\'' + + ", mqClientApiTimeout=" + mqClientApiTimeout + + ", detectTimeout=" + detectTimeout + + ", detectInterval=" + detectInterval + + ", language=" + language + + ", enableStreamRequestType=" + enableStreamRequestType + + ", sendLatencyEnable=" + sendLatencyEnable + + ", startDetectorEnable=" + startDetectorEnable + + ", enableHeartbeatChannelEventListener=" + enableHeartbeatChannelEventListener + + '}'; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 2407e573736..e152be81193 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -79,6 +79,7 @@ import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -246,10 +247,16 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor, RPCHook rpcHook, final ClientConfig clientConfig) { + this(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, null); + } + + public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, + final ClientRemotingProcessor clientRemotingProcessor, + RPCHook rpcHook, final ClientConfig clientConfig, final ChannelEventListener channelEventListener) { this.clientConfig = clientConfig; topAddressing = new DefaultTopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName()); topAddressing.registerChangeCallBack(this); - this.remotingClient = new NettyRemotingClient(nettyClientConfig, null); + this.remotingClient = new NettyRemotingClient(nettyClientConfig, channelEventListener); this.clientRemotingProcessor = clientRemotingProcessor; // Inject stream rpc hook first to make reserve field signature diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 9484b26f8d2..09534a1768b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.impl.factory; +import io.netty.channel.Channel; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -65,6 +66,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueueAssignment; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.HeartbeatV2Result; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -151,7 +153,38 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS()); this.nettyClientConfig.setSocksProxyConfig(clientConfig.getSocksProxyConfig()); ClientRemotingProcessor clientRemotingProcessor = new ClientRemotingProcessor(this); - this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig); + ChannelEventListener channelEventListener; + if (clientConfig.isEnableHeartbeatChannelEventListener()) { + channelEventListener = new ChannelEventListener() { + private final ConcurrentMap> brokerAddrTable = MQClientInstance.this.brokerAddrTable; + @Override + public void onChannelConnect(String remoteAddr, Channel channel) { + for (Map.Entry> addressEntry : brokerAddrTable.entrySet()) { + for (String address : addressEntry.getValue().values()) { + if (address.equals(remoteAddr)) { + sendHeartbeatToAllBrokerWithLockV2(false); + break; + } + } + } + } + + @Override + public void onChannelClose(String remoteAddr, Channel channel) { + } + + @Override + public void onChannelException(String remoteAddr, Channel channel) { + } + + @Override + public void onChannelIdle(String remoteAddr, Channel channel) { + } + }; + } else { + channelEventListener = null; + } + this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, channelEventListener); if (this.clientConfig.getNamesrvAddr() != null) { this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr()); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index d784351a5f8..8631d0447d8 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -229,6 +229,8 @@ public void initChannel(SocketChannel ch) throws Exception { handler.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } + nettyEventExecutor.start(); + TimerTask timerTaskScanResponseTable = new TimerTask() { @Override public void run(Timeout timeout) {