Skip to content

Commit

Permalink
Add ChannelEventListener for MQClientAPIImpl
Browse files Browse the repository at this point in the history
* add heartbeat when channel connect
  • Loading branch information
drpmma committed Oct 8, 2023
1 parent 8415608 commit 7c11d15
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
Expand Down Expand Up @@ -246,10 +247,16 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, null);
}

public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig, final ChannelEventListener channelEventListener) {
this.clientConfig = clientConfig;
topAddressing = new DefaultTopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
topAddressing.registerChangeCallBack(this);
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.remotingClient = new NettyRemotingClient(nettyClientConfig, channelEventListener);
this.clientRemotingProcessor = clientRemotingProcessor;

// Inject stream rpc hook first to make reserve field signature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.impl.factory;

import io.netty.channel.Channel;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.HeartbeatV2Result;
import org.apache.rocketmq.remoting.exception.RemotingException;
Expand Down Expand Up @@ -151,7 +153,35 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.nettyClientConfig.setSocksProxyConfig(clientConfig.getSocksProxyConfig());
ClientRemotingProcessor clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig,
new ChannelEventListener() {
private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = MQClientInstance.this.brokerAddrTable;
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
log.info("onChannelConnect {}", remoteAddr);
for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) {
for (String address : addressEntry.getValue().values()) {
if (address.equals(remoteAddr)) {
log.info("onChannelConnect {} send heartbeat", remoteAddr);
sendHeartbeatToAllBrokerWithLockV2(false);
break;
}
}
}
}

@Override
public void onChannelClose(String remoteAddr, Channel channel) {
}

@Override
public void onChannelException(String remoteAddr, Channel channel) {
}

@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
}
});

if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ public void initChannel(SocketChannel ch) throws Exception {
handler.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}

nettyEventExecutor.start();

TimerTask timerTaskScanResponseTable = new TimerTask() {
@Override
public void run(Timeout timeout) {
Expand Down

0 comments on commit 7c11d15

Please sign in to comment.