Skip to content

Commit

Permalink
fix async call issues, rpc context and response future callback race …
Browse files Browse the repository at this point in the history
…conditions (#9464)

fixes #9461 #8602
  • Loading branch information
chickenlj authored Dec 25, 2021
1 parent 8f5b905 commit 098787a
Show file tree
Hide file tree
Showing 28 changed files with 610 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new FilterChainNode<>(originalInvoker, next, filter);
last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
}
return new CallbackRegistrationInvoker<>(last, filters);
}

return last;
Expand Down Expand Up @@ -103,8 +104,9 @@ public <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> or
for (int i = filters.size() - 1; i >= 0; i--) {
final ClusterFilter filter = filters.get(i);
final Invoker<T> next = last;
last = new ClusterFilterChainNode<>(originalInvoker, next, filter);
last = new CopyOfClusterFilterChainNode<>(originalInvoker, next, filter);
}
return new ClusterCallbackRegistrationInvoker<>(originalInvoker, last, filters);
}

return last;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.dubbo.rpc.cluster.filter;

import org.apache.dubbo.common.Experimental;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.BaseFilter;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
Expand All @@ -28,6 +31,9 @@
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.cluster.Directory;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.extension.ExtensionScope.APPLICATION;

@SPI(value = "default", scope = APPLICATION)
Expand Down Expand Up @@ -144,12 +150,208 @@ public String toString() {
* @param <TYPE>
*/
class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter>
extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
super(originalInvoker, nextNode, filter);
}


@Override
public URL getRegistryUrl() {
return getOriginalInvoker().getRegistryUrl();
}

@Override
public Directory<T> getDirectory() {
return getOriginalInvoker().getDirectory();
}

@Override
public boolean isDestroyed() {
return getOriginalInvoker().isDestroyed();
}
}

class CallbackRegistrationInvoker<T, FILTER extends BaseFilter> implements Invoker<T> {
static final Logger LOGGER = LoggerFactory.getLogger(CallbackRegistrationInvoker.class);
final Invoker<T> filterInvoker;
final List<FILTER> filters;

public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<FILTER> filters) {
this.filterInvoker = filterInvoker;
this.filters = filters;
}

@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation);
asyncResult.whenCompleteWithContext((r, t) -> {
for (int i = filters.size() - 1; i >= 0; i--) {
FILTER filter = filters.get(i);
try {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} catch (Throwable filterThrowable) {
LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));
}
throw filterThrowable;
}
}
});

return asyncResult;
}

@Override
public Class<T> getInterface() {
return filterInvoker.getInterface();
}

@Override
public URL getUrl() {
return filterInvoker.getUrl();
}

@Override
public boolean isAvailable() {
return filterInvoker.isAvailable();
}

@Override
public void destroy() {
filterInvoker.destroy();
}
}

class ClusterCallbackRegistrationInvoker<T, FILTER extends BaseFilter> extends CallbackRegistrationInvoker<T, FILTER>
implements ClusterInvoker<T> {
private ClusterInvoker<T> originalInvoker;

public ClusterCallbackRegistrationInvoker(ClusterInvoker<T> originalInvoker, Invoker<T> filterInvoker, List<FILTER> filters) {
super(filterInvoker, filters);
this.originalInvoker = originalInvoker;
}

public ClusterInvoker<T> getOriginalInvoker() {
return originalInvoker;
}

@Override
public URL getRegistryUrl() {
return getOriginalInvoker().getRegistryUrl();
}

@Override
public Directory<T> getDirectory() {
return getOriginalInvoker().getDirectory();
}

@Override
public boolean isDestroyed() {
return getOriginalInvoker().isDestroyed();
}
}


@Experimental("Works for the same purpose as FilterChainNode, replace FilterChainNode with this one when proved stable enough")
class CopyOfFilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T> {
TYPE originalInvoker;
Invoker<T> nextNode;
FILTER filter;

public CopyOfFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
this.originalInvoker = originalInvoker;
this.nextNode = nextNode;
this.filter = filter;
}

public TYPE getOriginalInvoker() {
return originalInvoker;
}

@Override
public Class<T> getInterface() {
return originalInvoker.getInterface();
}

@Override
public URL getUrl() {
return originalInvoker.getUrl();
}

@Override
public boolean isAvailable() {
return originalInvoker.isAvailable();
}

@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(nextNode, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, originalInvoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
listener.onError(e, originalInvoker, invocation);
}
throw e;
} finally {

}
return asyncResult;
}

@Override
public void destroy() {
originalInvoker.destroy();
}

@Override
public String toString() {
return originalInvoker.toString();
}
}

@Experimental("Works for the same purpose as ClusterFilterChainNode, replace ClusterFilterChainNode with this one when proved stable enough")
class CopyOfClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter>
extends CopyOfFilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
public CopyOfClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
super(originalInvoker, nextNode, filter);
}


@Override
public URL getRegistryUrl() {
return getOriginalInvoker().getRegistryUrl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;

public class DefaultFilterChainBuilderTest {

Expand Down Expand Up @@ -62,8 +63,8 @@ protected Result doInvoke(Invocation invocation) throws Throwable {
}
};
invokerAfterBuild = defaultFilterChainBuilder.buildInvokerChain(invokerWithFilter, REFERENCE_FILTER_KEY, CONSUMER);
Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.FilterChainNode);
Assertions.assertTrue(((FilterChainBuilder.FilterChainNode<?, ?, ?>) invokerAfterBuild).filter instanceof LogFilter);
Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.CallbackRegistrationInvoker);
Assertions.assertEquals(1, ((FilterChainBuilder.CallbackRegistrationInvoker<?, ?>) invokerAfterBuild).filters.size());

}

Expand Down Expand Up @@ -97,8 +98,7 @@ protected Result doInvoke(Invocation invocation) throws Throwable {
}
};
invokerAfterBuild = defaultFilterChainBuilder.buildInvokerChain(invokerWithFilter, REFERENCE_FILTER_KEY, CONSUMER);
Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.FilterChainNode);
Assertions.assertTrue(((FilterChainBuilder.FilterChainNode<?, ?, ?>) invokerAfterBuild).filter instanceof LogFilter);
Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.CallbackRegistrationInvoker);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public void testInvokerDestroyAndReList() {
}
invokers.clear();
MockInvoker<Demo> invoker3 = new MockInvoker<>(Demo.class, url);
invoker3.setResult(AsyncRpcResult.newDefaultAsyncResult(null));
invoker3.setResult(AsyncRpcResult.newDefaultAsyncResult(mock(RpcInvocation.class)));
invokers.add(invoker3);
dic.notify(invokers);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ModuleModel;
Expand All @@ -41,8 +42,6 @@

import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.rpc.Constants.MERGER_KEY;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -55,7 +54,7 @@ public class MergeableClusterInvokerTest {
private Directory directory = mock(Directory.class);
private Invoker firstInvoker = mock(Invoker.class);
private Invoker secondInvoker = mock(Invoker.class);
private Invocation invocation = mock(Invocation.class);
private Invocation invocation = mock(RpcInvocation.class);
private ModuleModel moduleModel = mock(ModuleModel.class);

private MergeableClusterInvoker<MenuService> mergeableClusterInvoker;
Expand Down Expand Up @@ -98,7 +97,7 @@ public void setUp() throws Exception {
directory = mock(Directory.class);
firstInvoker = mock(Invoker.class);
secondInvoker = mock(Invoker.class);
invocation = mock(Invocation.class);
invocation = mock(RpcInvocation.class);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.dubbo.rpc.cluster.filter.DemoService;
import org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;

import org.apache.dubbo.rpc.model.ApplicationModel;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -69,7 +69,7 @@ public void testBuildClusterInvokerChain() {
Invoker<?> invoker = demoCluster.join(directory, true);
Assertions.assertTrue(invoker instanceof AbstractCluster.ClusterFilterInvoker);
Assertions.assertTrue(((AbstractCluster.ClusterFilterInvoker<?>) invoker).getFilterInvoker()
instanceof FilterChainBuilder.ClusterFilterChainNode);
instanceof FilterChainBuilder.ClusterCallbackRegistrationInvoker);


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.lang.annotation.Target;

/**
* Indicating unstable API, may get removed or changed in the next release.
* Indicating unstable API, may get removed or changed in future releases.
*/
@Retention(RetentionPolicy.CLASS)
@Target({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,4 +503,9 @@ public interface CommonConstants {
String ENABLE_CONNECTIVITY_VALIDATION = "dubbo.connectivity.validation";

String DUBBO_INTERNAL_APPLICATION = "DUBBO_INTERNAL_APPLICATION";

String WORKING_CLASSLOADER_KEY = "WORKING_CLASSLOADER";

String STAGED_CLASSLOADER_KEY = "STAGED_CLASSLOADER";

}
Loading

0 comments on commit 098787a

Please sign in to comment.