Skip to content

Commit

Permalink
[ISSUE #7321] Refector NettyRemotingAbstract with unify future implem…
Browse files Browse the repository at this point in the history
…entation (#7322)

* Refector NettyRemotingAbstract

* Add invoke with future method

* Deprecate InvokeCallback#operationComplete

* Add operationSuccess and operationException for InvokeCallback

* fix unit test

* fix unit test

* Keep InvokeCallback#operationComplete

* Optimize invokeAsyncImpl operationComplete

* Add unit test for NettyRemotingClient

* fix checkstyle
  • Loading branch information
drpmma authored Oct 8, 2023
1 parent c36bb78 commit 8415608
Show file tree
Hide file tree
Showing 18 changed files with 1,029 additions and 583 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.BrokerSyncInfo;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
Expand Down Expand Up @@ -107,6 +108,8 @@
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
Expand All @@ -124,8 +127,6 @@
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
Expand All @@ -151,7 +152,6 @@ public class BrokerOuterAPI {
private final RpcClient rpcClient;
private String nameSrvAddr = null;


public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, new DynamicalExtFieldRPCHook(), new ClientMetadata());
}
Expand Down Expand Up @@ -459,7 +459,7 @@ public List<RegisterBrokerResult> registerBrokerAll(
* @param filterServerList
* @param oneway
* @param timeoutMills
* @param compressed default false
* @param compressed default false
* @return
*/
public List<RegisterBrokerResult> registerBrokerAll(
Expand Down Expand Up @@ -643,7 +643,6 @@ public void registerSingleTopicAll(
queueDatas.add(queueData);
final byte[] topicRouteBody = topicRouteData.encode();


List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
Expand Down Expand Up @@ -910,25 +909,33 @@ public void lockBatchMQAsync(
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);

request.setBody(requestBody.encode());
this.remotingClient.invokeAsync(addr, request, timeoutMillis, responseFuture -> {
if (callback == null) {
return;
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {

}

try {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
if (response.getCode() == ResponseCode.SUCCESS) {
LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(),
LockBatchResponseBody.class);
Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
callback.onSuccess(messageQueues);
} else {
callback.onException(new MQBrokerException(response.getCode(), response.getRemark()));
}
@Override
public void operationSucceed(RemotingCommand response) {
if (callback == null) {
return;
}
} catch (Throwable ignored) {
if (response.getCode() == ResponseCode.SUCCESS) {
LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(),
LockBatchResponseBody.class);
Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
callback.onSuccess(messageQueues);
} else {
callback.onException(new MQBrokerException(response.getCode(), response.getRemark()));
}
}

@Override
public void operationFail(Throwable throwable) {
if (callback == null) {
return;
}
callback.onException(throwable);
}
});
}
Expand All @@ -942,22 +949,30 @@ public void unlockBatchMQAsync(

request.setBody(requestBody.encode());

this.remotingClient.invokeAsync(addr, request, timeoutMillis, responseFuture -> {
if (callback == null) {
return;
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {

}

try {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
if (response.getCode() == ResponseCode.SUCCESS) {
callback.onSuccess();
} else {
callback.onException(new MQBrokerException(response.getCode(), response.getRemark()));
}
@Override
public void operationSucceed(RemotingCommand response) {
if (callback == null) {
return;
}
} catch (Throwable ignored) {
if (response.getCode() == ResponseCode.SUCCESS) {
callback.onSuccess();
} else {
callback.onException(new MQBrokerException(response.getCode(), response.getRemark()));
}
}

@Override
public void operationFail(Throwable throwable) {
if (callback == null) {
return;
}
callback.onException(throwable);
}
});
}
Expand All @@ -983,21 +998,27 @@ public CompletableFuture<SendResult> sendMessageToSpecificBrokerAsync(String bro
CompletableFuture<SendResult> cf = new CompletableFuture<>();
final String msgId = msg.getMsgId();
try {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> {
RemotingCommand response = responseFuture.getResponseCommand();
if (null != response) {
SendResult sendResult = null;
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {

}

@Override
public void operationSucceed(RemotingCommand response) {
try {
sendResult = this.processSendResponse(brokerName, msg, response);
SendResult sendResult = processSendResponse(brokerName, msg, response);
cf.complete(sendResult);
} catch (MQBrokerException | RemotingCommandException e) {
LOGGER.error("processSendResponse in sendMessageToSpecificBrokerAsync failed, msgId=" + msgId, e);
cf.completeExceptionally(e);
}
} else {
cf.complete(null);
}

@Override
public void operationFail(Throwable throwable) {
cf.completeExceptionally(throwable);
}
});
} catch (Throwable t) {
LOGGER.error("invokeAsync failed in sendMessageToSpecificBrokerAsync, msgId=" + msgId, t);
Expand Down Expand Up @@ -1057,7 +1078,7 @@ private SendResult processSendResponse(
}
if (sendStatus != null) {
SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);

//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
Expand All @@ -1073,8 +1094,8 @@ private SendResult processSendResponse(
uniqMsgId = sb.toString();
}
SendResult sendResult = new SendResult(sendStatus,
uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
Expand Down Expand Up @@ -1218,8 +1239,9 @@ public SyncStateSet alterSyncStateSet(
/**
* Broker try to elect itself as a master in broker set
*/
public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String controllerAddress, String clusterName, String brokerName,
Long brokerId) throws Exception {
public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String controllerAddress, String clusterName,
String brokerName,
Long brokerId) throws Exception {

final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, requestHeader);
Expand All @@ -1237,7 +1259,8 @@ public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String controllerA
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public GetNextBrokerIdResponseHeader getNextBrokerId(final String clusterName, final String brokerName, final String controllerAddress) throws Exception {
public GetNextBrokerIdResponseHeader getNextBrokerId(final String clusterName, final String brokerName,
final String controllerAddress) throws Exception {
final GetNextBrokerIdRequestHeader requestHeader = new GetNextBrokerIdRequestHeader(clusterName, brokerName);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_NEXT_BROKER_ID, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
Expand All @@ -1248,7 +1271,8 @@ public GetNextBrokerIdResponseHeader getNextBrokerId(final String clusterName, f
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final String brokerName, final Long brokerId, final String registerCheckCode, final String controllerAddress) throws Exception {
public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final String brokerName,
final Long brokerId, final String registerCheckCode, final String controllerAddress) throws Exception {
final ApplyBrokerIdRequestHeader requestHeader = new ApplyBrokerIdRequestHeader(clusterName, brokerName, brokerId, registerCheckCode);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_APPLY_BROKER_ID, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
Expand All @@ -1259,7 +1283,9 @@ public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception {
public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> registerBrokerToController(
final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress,
final String controllerAddress) throws Exception {
final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId, brokerAddress);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
Expand Down Expand Up @@ -1355,16 +1381,25 @@ public CompletableFuture<PullResult> pullMessageFromSpecificBrokerAsync(String b

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
CompletableFuture<PullResult> pullResultFuture = new CompletableFuture<>();
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> {
if (responseFuture.getCause() != null) {
pullResultFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
return;
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {

}
try {
PullResultExt pullResultExt = this.processPullResponse(responseFuture.getResponseCommand(), brokerAddr);
this.processPullResult(pullResultExt, brokerName, queueId);
pullResultFuture.complete(pullResultExt);
} catch (Exception e) {

@Override
public void operationSucceed(RemotingCommand response) {
try {
PullResultExt pullResultExt = processPullResponse(response, brokerAddr);
processPullResult(pullResultExt, brokerName, queueId);
pullResultFuture.complete(pullResultExt);
} catch (Exception e) {
pullResultFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
}
}

@Override
public void operationFail(Throwable throwable) {
pullResultFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
}
});
Expand Down
Loading

0 comments on commit 8415608

Please sign in to comment.