diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index bb0fe352285..f9843cc0231 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -94,6 +94,8 @@ public class ClientConfig { private boolean sendLatencyEnable = Boolean.parseBoolean(System.getProperty(SEND_LATENCY_ENABLE, "false")); private boolean startDetectorEnable = Boolean.parseBoolean(System.getProperty(START_DETECTOR_ENABLE, "false")); + private boolean enableHeartbeatChannelEventListener = true; + public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); @@ -201,6 +203,7 @@ public void resetClientConfig(final ClientConfig cc) { this.useHeartbeatV2 = cc.useHeartbeatV2; this.startDetectorEnable = cc.startDetectorEnable; this.sendLatencyEnable = cc.sendLatencyEnable; + this.enableHeartbeatChannelEventListener = cc.enableHeartbeatChannelEventListener; this.detectInterval = cc.detectInterval; this.detectTimeout = cc.detectTimeout; } @@ -228,6 +231,7 @@ public ClientConfig cloneClientConfig() { cc.enableStreamRequestType = enableStreamRequestType; cc.useHeartbeatV2 = useHeartbeatV2; cc.startDetectorEnable = startDetectorEnable; + cc.enableHeartbeatChannelEventListener = enableHeartbeatChannelEventListener; cc.sendLatencyEnable = sendLatencyEnable; cc.detectInterval = detectInterval; cc.detectTimeout = detectTimeout; @@ -418,6 +422,14 @@ public void setStartDetectorEnable(boolean startDetectorEnable) { this.startDetectorEnable = startDetectorEnable; } + public boolean isEnableHeartbeatChannelEventListener() { + return enableHeartbeatChannelEventListener; + } + + public void setEnableHeartbeatChannelEventListener(boolean enableHeartbeatChannelEventListener) { + this.enableHeartbeatChannelEventListener = enableHeartbeatChannelEventListener; + } + public int getDetectTimeout() { return this.detectTimeout; } @@ -444,19 +456,34 @@ public void setUseHeartbeatV2(boolean useHeartbeatV2) { @Override public String toString() { - return "ClientConfig [namesrvAddr=" + namesrvAddr - + ", clientIP=" + clientIP + ", instanceName=" + instanceName - + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads - + ", pollNameServerInterval=" + pollNameServerInterval - + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval - + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval - + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException - + ", unitMode=" + unitMode + ", unitName=" + unitName - + ", vipChannelEnabled=" + vipChannelEnabled + ", useTLS=" + useTLS - + ", socksProxyConfig=" + socksProxyConfig + ", language=" + language.name() - + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout - + ", decodeReadBody=" + decodeReadBody + ", decodeDecompressBody=" + decodeDecompressBody - + ", sendLatencyEnable=" + sendLatencyEnable + ", startDetectorEnable=" + startDetectorEnable - + ", enableStreamRequestType=" + enableStreamRequestType + ", useHeartbeatV2=" + useHeartbeatV2 + "]"; + return "ClientConfig{" + + "namesrvAddr='" + namesrvAddr + '\'' + + ", clientIP='" + clientIP + '\'' + + ", instanceName='" + instanceName + '\'' + + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + + ", namespace='" + namespace + '\'' + + ", namespaceInitialized=" + namespaceInitialized + + ", accessChannel=" + accessChannel + + ", pollNameServerInterval=" + pollNameServerInterval + + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval + + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + + ", unitMode=" + unitMode + + ", unitName='" + unitName + '\'' + + ", decodeReadBody=" + decodeReadBody + + ", decodeDecompressBody=" + decodeDecompressBody + + ", vipChannelEnabled=" + vipChannelEnabled + + ", useHeartbeatV2=" + useHeartbeatV2 + + ", useTLS=" + useTLS + + ", socksProxyConfig='" + socksProxyConfig + '\'' + + ", mqClientApiTimeout=" + mqClientApiTimeout + + ", detectTimeout=" + detectTimeout + + ", detectInterval=" + detectInterval + + ", language=" + language + + ", enableStreamRequestType=" + enableStreamRequestType + + ", sendLatencyEnable=" + sendLatencyEnable + + ", startDetectorEnable=" + startDetectorEnable + + ", enableHeartbeatChannelEventListener=" + enableHeartbeatChannelEventListener + + '}'; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index f9b7950e12d..09534a1768b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -153,8 +153,9 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS()); this.nettyClientConfig.setSocksProxyConfig(clientConfig.getSocksProxyConfig()); ClientRemotingProcessor clientRemotingProcessor = new ClientRemotingProcessor(this); - this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, - new ChannelEventListener() { + ChannelEventListener channelEventListener; + if (clientConfig.isEnableHeartbeatChannelEventListener()) { + channelEventListener = new ChannelEventListener() { private final ConcurrentMap> brokerAddrTable = MQClientInstance.this.brokerAddrTable; @Override public void onChannelConnect(String remoteAddr, Channel channel) { @@ -179,7 +180,11 @@ public void onChannelException(String remoteAddr, Channel channel) { @Override public void onChannelIdle(String remoteAddr, Channel channel) { } - }); + }; + } else { + channelEventListener = null; + } + this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, channelEventListener); if (this.clientConfig.getNamesrvAddr() != null) { this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());