Skip to content

Commit

Permalink
try moving finally block (#9496)
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenlj authored Dec 26, 2021
1 parent 098787a commit 2759f38
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,35 +93,40 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}

try {
// pass default timeout set by end user (ReferenceConfig)
Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
if (countDown != null) {
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
if (timeoutCountDown.isExpired()) {
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
+ invocation.getMethodName() + ", terminate directly."), invocation);
}
// pass default timeout set by end user (ReferenceConfig)
Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
if (countDown != null) {
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
if (timeoutCountDown.isExpired()) {
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
+ invocation.getMethodName() + ", terminate directly."), invocation);
}

RpcContext.removeServerContext();
return invoker.invoke(invocation);
} finally {
RpcContext.removeServiceContext();
RpcContext.removeClientAttachment();
}

RpcContext.removeServerContext();
return invoker.invoke(invocation);
}

@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// pass attachments to result
RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());

removeContext();
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
removeContext();
}

private void removeContext() {
RpcContext.removeServiceContext();
RpcContext.removeClientAttachment();
// server context must not be removed because user might use it on callback.
// So the clear of is delayed til the start of the next rpc call, see RpcContext.removeServerContext(); in invoke() above
// RpcContext.removeServerContext();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -508,4 +508,6 @@ public interface CommonConstants {

String STAGED_CLASSLOADER_KEY = "STAGED_CLASSLOADER";

String PROVIDER_ASYNC_KEY = "PROVIDER_ASYNC";

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class AsyncContextImpl implements AsyncContext {
private ClassLoader stagedClassLoader;

public AsyncContextImpl() {
restoreContext = RpcContext.storeContext(false);
restoreContext = RpcContext.storeContext();
restoreClassLoader = Thread.currentThread().getContextClassLoader();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_ASYNC_KEY;
import static org.apache.dubbo.common.utils.ReflectUtils.defaultReturn;

/**
Expand Down Expand Up @@ -59,17 +60,19 @@ public class AsyncRpcResult implements Result {
private Executor executor;

private Invocation invocation;
private final boolean async;

private CompletableFuture<AppResponse> responseFuture;

public AsyncRpcResult(CompletableFuture<AppResponse> future, Invocation invocation) {
this.responseFuture = future;
this.invocation = invocation;
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
if (InvokeMode.SYNC != rpcInvocation.getInvokeMode() && !future.isDone()) {
this.storedContext = RpcContext.storeContext(false);
if ((rpcInvocation.get(PROVIDER_ASYNC_KEY) != null || InvokeMode.SYNC != rpcInvocation.getInvokeMode()) && !future.isDone()) {
async = true;
this.storedContext = RpcContext.clearAndStoreContext();
} else {
this.storedContext = RpcContext.storeContext(true);
async = false;
}
}

Expand Down Expand Up @@ -198,12 +201,10 @@ public Object recreate() throws Throwable {

public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) {
this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
RpcContext.RestoreContext tmpContext = RpcContext.storeContext(false);
RpcContext.restoreContext(storedContext);

if (async) {
RpcContext.restoreContext(storedContext);
}
fn.accept(v, t);

RpcContext.restoreContext(tmpContext);
});

// Necessary! update future in context, see https://github.com/apache/dubbo/issues/9461
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,8 +803,14 @@ public static void setRpcContext(URL url) {
RpcServiceContext.setRpcContext(url);
}

protected static RestoreContext storeContext(boolean needCopy) {
return new RestoreContext(needCopy);
protected static RestoreContext clearAndStoreContext() {
RestoreContext restoreContext = new RestoreContext();
RpcContext.removeContext();
return restoreContext;
}

protected static RestoreContext storeContext() {
return new RestoreContext();
}

protected static void restoreContext(RestoreContext restoreContext) {
Expand All @@ -813,22 +819,6 @@ protected static void restoreContext(RestoreContext restoreContext) {
}
}

protected static void restoreClientAttachment(RpcContextAttachment oldContext) {
CLIENT_ATTACHMENT.set(oldContext);
}

protected static void restoreServerContext(RpcContextAttachment oldServerContext) {
SERVER_LOCAL.set(oldServerContext);
}

protected static void restoreServerAttachment(RpcContextAttachment oldServerContext) {
SERVER_ATTACHMENT.set(oldServerContext);
}

protected static void restoreServiceContext(RpcServiceContext oldServiceContext) {
SERVICE_CONTEXT.set(oldServiceContext);
}

/**
* Used to temporarily store and restore all kinds of contexts of current thread.
*/
Expand All @@ -838,31 +828,31 @@ public static class RestoreContext {
private final RpcContextAttachment serverAttachment;
private final RpcContextAttachment serverLocal;

public RestoreContext(boolean needCopy) {
serviceContext = getServiceContext().copyOf(needCopy);
clientAttachment = getClientAttachment().copyOf(needCopy);
serverAttachment = getServerAttachment().copyOf(needCopy);
serverLocal = getServerContext().copyOf(needCopy);
public RestoreContext() {
serviceContext = getServiceContext().copyOf(false);
clientAttachment = getClientAttachment().copyOf(false);
serverAttachment = getServerAttachment().copyOf(false);
serverLocal = getServerContext().copyOf(false);
}

public void restore() {
if (serviceContext != null) {
restoreServiceContext(serviceContext);
SERVICE_CONTEXT.set(serviceContext);
} else {
removeServiceContext();
}
if (clientAttachment != null) {
restoreClientAttachment(clientAttachment);
CLIENT_ATTACHMENT.set(clientAttachment);
} else {
removeClientAttachment();
}
if (serverAttachment != null) {
restoreServerAttachment(serverAttachment);
SERVER_ATTACHMENT.set(serverAttachment);
} else {
removeServerAttachment();
}
if (serverLocal != null) {
restoreServerContext(serverLocal);
SERVER_LOCAL.set(serverLocal);
} else {
removeServerContext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,10 +658,6 @@ public static void setRpcContext(URL url) {
* @return a shallow copy of RpcServiceContext
*/
public RpcServiceContext copyOf(boolean needCopy) {
if (!isValid()) {
return this;
}

if (needCopy) {
RpcServiceContext copy = new RpcServiceContext();
copy.consumerUrl = this.consumerUrl;
Expand All @@ -675,13 +671,4 @@ public RpcServiceContext copyOf(boolean needCopy) {
}
}


private boolean isValid() {
return this.consumerUrl != null
|| this.localAddress != null
|| this.remoteAddress != null
|| this.invocation != null
|| this.asyncContext != null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,27 +126,29 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
((RpcInvocation) invocation).setInvoker(invoker);
}

try {
context.clearAfterEachInvoke(false);
return invoker.invoke(invocation);
} finally {
context.clearAfterEachInvoke(true);
RpcContext.removeServerAttachment();
RpcContext.removeServiceContext();
// IMPORTANT! For async scenario, context must be removed from current thread, so a new RpcContext is always created for the next invoke for the same thread.
RpcContext.getClientAttachment().removeAttachment(TIME_COUNTDOWN_KEY);
RpcContext.removeServerContext();
}
context.clearAfterEachInvoke(false);

return invoker.invoke(invocation);
}

@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// pass attachments to result
appResponse.addObjectAttachments(RpcContext.getServerContext().getObjectAttachments());
removeContext();
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
removeContext();
}

private void removeContext() {
RpcContext.getServerAttachment().clearAfterEachInvoke(true); // TODO, not necessary anymore

RpcContext.removeServerAttachment();
RpcContext.removeClientAttachment();
RpcContext.removeServiceContext();
RpcContext.removeServerContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_ASYNC_KEY;

/**
* This Invoker works on provider side, delegates RPC to interface implementation.
*/
Expand Down Expand Up @@ -82,7 +84,7 @@ public void destroy() {
public Result invoke(Invocation invocation) throws RpcException {
try {
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
CompletableFuture<Object> future = wrapWithFuture(value, invocation);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse(invocation);
if (t != null) {
Expand All @@ -107,10 +109,12 @@ public Result invoke(Invocation invocation) throws RpcException {
}
}

private CompletableFuture<Object> wrapWithFuture(Object value) {
private CompletableFuture<Object> wrapWithFuture(Object value, Invocation invocation) {
if (value instanceof CompletableFuture) {
invocation.put(PROVIDER_ASYNC_KEY, Boolean.TRUE);
return (CompletableFuture<Object>) value;
} else if (RpcContext.getServiceContext().isAsyncStarted()) {
invocation.put(PROVIDER_ASYNC_KEY, Boolean.TRUE);
return ((AsyncContextImpl) (RpcContext.getServiceContext().getAsyncContext())).getInternalFuture();
}
return CompletableFuture.completedFuture(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -62,7 +62,7 @@ public void testSetContext() {
given(invoker.getUrl()).willReturn(url);

contextFilter.invoke(invoker, invocation);
assertNull(RpcContext.getServiceContext().getInvoker());
assertNotNull(RpcContext.getServiceContext().getInvoker());
}

@Test
Expand All @@ -71,6 +71,6 @@ public void testWithAttachments() {
Invoker<DemoService> invoker = new MyInvoker<DemoService>(url);
Invocation invocation = new MockInvocation();
Result result = contextFilter.invoke(invoker, invocation);
assertNull(RpcContext.getServiceContext().getInvoker());
assertNotNull(RpcContext.getServiceContext().getInvoker());
}
}

0 comments on commit 2759f38

Please sign in to comment.