Skip to content

Commit

Permalink
Add enableHeartbeatChannelEventListener for ClientConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma committed Oct 8, 2023
1 parent 68cd0e6 commit f1c382d
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 17 deletions.
55 changes: 41 additions & 14 deletions client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class ClientConfig {
private boolean sendLatencyEnable = Boolean.parseBoolean(System.getProperty(SEND_LATENCY_ENABLE, "false"));
private boolean startDetectorEnable = Boolean.parseBoolean(System.getProperty(START_DETECTOR_ENABLE, "false"));

private boolean enableHeartbeatChannelEventListener = true;

public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
Expand Down Expand Up @@ -201,6 +203,7 @@ public void resetClientConfig(final ClientConfig cc) {
this.useHeartbeatV2 = cc.useHeartbeatV2;
this.startDetectorEnable = cc.startDetectorEnable;
this.sendLatencyEnable = cc.sendLatencyEnable;
this.enableHeartbeatChannelEventListener = cc.enableHeartbeatChannelEventListener;
this.detectInterval = cc.detectInterval;
this.detectTimeout = cc.detectTimeout;
}
Expand Down Expand Up @@ -228,6 +231,7 @@ public ClientConfig cloneClientConfig() {
cc.enableStreamRequestType = enableStreamRequestType;
cc.useHeartbeatV2 = useHeartbeatV2;
cc.startDetectorEnable = startDetectorEnable;
cc.enableHeartbeatChannelEventListener = enableHeartbeatChannelEventListener;
cc.sendLatencyEnable = sendLatencyEnable;
cc.detectInterval = detectInterval;
cc.detectTimeout = detectTimeout;
Expand Down Expand Up @@ -418,6 +422,14 @@ public void setStartDetectorEnable(boolean startDetectorEnable) {
this.startDetectorEnable = startDetectorEnable;
}

public boolean isEnableHeartbeatChannelEventListener() {
return enableHeartbeatChannelEventListener;
}

public void setEnableHeartbeatChannelEventListener(boolean enableHeartbeatChannelEventListener) {
this.enableHeartbeatChannelEventListener = enableHeartbeatChannelEventListener;
}

public int getDetectTimeout() {
return this.detectTimeout;
}
Expand All @@ -444,19 +456,34 @@ public void setUseHeartbeatV2(boolean useHeartbeatV2) {

@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr
+ ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads
+ ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval
+ ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
+ ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException
+ ", unitMode=" + unitMode + ", unitName=" + unitName
+ ", vipChannelEnabled=" + vipChannelEnabled + ", useTLS=" + useTLS
+ ", socksProxyConfig=" + socksProxyConfig + ", language=" + language.name()
+ ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout
+ ", decodeReadBody=" + decodeReadBody + ", decodeDecompressBody=" + decodeDecompressBody
+ ", sendLatencyEnable=" + sendLatencyEnable + ", startDetectorEnable=" + startDetectorEnable
+ ", enableStreamRequestType=" + enableStreamRequestType + ", useHeartbeatV2=" + useHeartbeatV2 + "]";
return "ClientConfig{" +
"namesrvAddr='" + namesrvAddr + '\'' +
", clientIP='" + clientIP + '\'' +
", instanceName='" + instanceName + '\'' +
", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads +
", namespace='" + namespace + '\'' +
", namespaceInitialized=" + namespaceInitialized +
", accessChannel=" + accessChannel +
", pollNameServerInterval=" + pollNameServerInterval +
", heartbeatBrokerInterval=" + heartbeatBrokerInterval +
", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval +
", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException +
", unitMode=" + unitMode +
", unitName='" + unitName + '\'' +
", decodeReadBody=" + decodeReadBody +
", decodeDecompressBody=" + decodeDecompressBody +
", vipChannelEnabled=" + vipChannelEnabled +
", useHeartbeatV2=" + useHeartbeatV2 +
", useTLS=" + useTLS +
", socksProxyConfig='" + socksProxyConfig + '\'' +
", mqClientApiTimeout=" + mqClientApiTimeout +
", detectTimeout=" + detectTimeout +
", detectInterval=" + detectInterval +
", language=" + language +
", enableStreamRequestType=" + enableStreamRequestType +
", sendLatencyEnable=" + sendLatencyEnable +
", startDetectorEnable=" + startDetectorEnable +
", enableHeartbeatChannelEventListener=" + enableHeartbeatChannelEventListener +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.nettyClientConfig.setSocksProxyConfig(clientConfig.getSocksProxyConfig());
ClientRemotingProcessor clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig,
new ChannelEventListener() {
ChannelEventListener channelEventListener;
if (clientConfig.isEnableHeartbeatChannelEventListener()) {
channelEventListener = new ChannelEventListener() {
private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = MQClientInstance.this.brokerAddrTable;
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
Expand All @@ -179,7 +180,11 @@ public void onChannelException(String remoteAddr, Channel channel) {
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
}
});
};
} else {
channelEventListener = null;
}
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, channelEventListener);

if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
Expand Down

0 comments on commit f1c382d

Please sign in to comment.