diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java index ff4f9b88d25..15be3004c27 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java @@ -67,8 +67,9 @@ public Invoker buildInvokerChain(final Invoker originalInvoker, String for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker next = last; - last = new FilterChainNode<>(originalInvoker, next, filter); + last = new CopyOfFilterChainNode<>(originalInvoker, next, filter); } + return new CallbackRegistrationInvoker<>(last, filters); } return last; @@ -103,8 +104,9 @@ public ClusterInvoker buildClusterInvokerChain(final ClusterInvoker or for (int i = filters.size() - 1; i >= 0; i--) { final ClusterFilter filter = filters.get(i); final Invoker next = last; - last = new ClusterFilterChainNode<>(originalInvoker, next, filter); + last = new CopyOfClusterFilterChainNode<>(originalInvoker, next, filter); } + return new ClusterCallbackRegistrationInvoker<>(originalInvoker, last, filters); } return last; diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java index 7edab0899a1..a329959e3d7 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java @@ -16,8 +16,11 @@ */ package org.apache.dubbo.rpc.cluster.filter; +import org.apache.dubbo.common.Experimental; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.SPI; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.rpc.BaseFilter; import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invocation; @@ -28,6 +31,9 @@ import org.apache.dubbo.rpc.cluster.ClusterInvoker; import org.apache.dubbo.rpc.cluster.Directory; +import java.util.List; +import java.util.stream.Collectors; + import static org.apache.dubbo.common.extension.ExtensionScope.APPLICATION; @SPI(value = "default", scope = APPLICATION) @@ -144,12 +150,208 @@ public String toString() { * @param */ class ClusterFilterChainNode, FILTER extends BaseFilter> - extends FilterChainNode implements ClusterInvoker { + extends FilterChainNode implements ClusterInvoker { public ClusterFilterChainNode(TYPE originalInvoker, Invoker nextNode, FILTER filter) { super(originalInvoker, nextNode, filter); } + @Override + public URL getRegistryUrl() { + return getOriginalInvoker().getRegistryUrl(); + } + + @Override + public Directory getDirectory() { + return getOriginalInvoker().getDirectory(); + } + + @Override + public boolean isDestroyed() { + return getOriginalInvoker().isDestroyed(); + } + } + + class CallbackRegistrationInvoker implements Invoker { + static final Logger LOGGER = LoggerFactory.getLogger(CallbackRegistrationInvoker.class); + final Invoker filterInvoker; + final List filters; + + public CallbackRegistrationInvoker(Invoker filterInvoker, List filters) { + this.filterInvoker = filterInvoker; + this.filters = filters; + } + + @Override + public Result invoke(Invocation invocation) throws RpcException { + Result asyncResult = filterInvoker.invoke(invocation); + asyncResult.whenCompleteWithContext((r, t) -> { + for (int i = filters.size() - 1; i >= 0; i--) { + FILTER filter = filters.get(i); + try { + if (filter instanceof ListenableFilter) { + ListenableFilter listenableFilter = ((ListenableFilter) filter); + Filter.Listener listener = listenableFilter.listener(invocation); + try { + if (listener != null) { + if (t == null) { + listener.onResponse(r, filterInvoker, invocation); + } else { + listener.onError(t, filterInvoker, invocation); + } + } + } finally { + listenableFilter.removeListener(invocation); + } + } else if (filter instanceof FILTER.Listener) { + FILTER.Listener listener = (FILTER.Listener) filter; + if (t == null) { + listener.onResponse(r, filterInvoker, invocation); + } else { + listener.onError(t, filterInvoker, invocation); + } + } + } catch (Throwable filterThrowable) { + LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName())); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList()))); + } + throw filterThrowable; + } + } + }); + + return asyncResult; + } + + @Override + public Class getInterface() { + return filterInvoker.getInterface(); + } + + @Override + public URL getUrl() { + return filterInvoker.getUrl(); + } + + @Override + public boolean isAvailable() { + return filterInvoker.isAvailable(); + } + + @Override + public void destroy() { + filterInvoker.destroy(); + } + } + + class ClusterCallbackRegistrationInvoker extends CallbackRegistrationInvoker + implements ClusterInvoker { + private ClusterInvoker originalInvoker; + + public ClusterCallbackRegistrationInvoker(ClusterInvoker originalInvoker, Invoker filterInvoker, List filters) { + super(filterInvoker, filters); + this.originalInvoker = originalInvoker; + } + + public ClusterInvoker getOriginalInvoker() { + return originalInvoker; + } + + @Override + public URL getRegistryUrl() { + return getOriginalInvoker().getRegistryUrl(); + } + + @Override + public Directory getDirectory() { + return getOriginalInvoker().getDirectory(); + } + + @Override + public boolean isDestroyed() { + return getOriginalInvoker().isDestroyed(); + } + } + + + @Experimental("Works for the same purpose as FilterChainNode, replace FilterChainNode with this one when proved stable enough") + class CopyOfFilterChainNode, FILTER extends BaseFilter> implements Invoker { + TYPE originalInvoker; + Invoker nextNode; + FILTER filter; + + public CopyOfFilterChainNode(TYPE originalInvoker, Invoker nextNode, FILTER filter) { + this.originalInvoker = originalInvoker; + this.nextNode = nextNode; + this.filter = filter; + } + + public TYPE getOriginalInvoker() { + return originalInvoker; + } + + @Override + public Class getInterface() { + return originalInvoker.getInterface(); + } + + @Override + public URL getUrl() { + return originalInvoker.getUrl(); + } + + @Override + public boolean isAvailable() { + return originalInvoker.isAvailable(); + } + + @Override + public Result invoke(Invocation invocation) throws RpcException { + Result asyncResult; + try { + asyncResult = filter.invoke(nextNode, invocation); + } catch (Exception e) { + if (filter instanceof ListenableFilter) { + ListenableFilter listenableFilter = ((ListenableFilter) filter); + try { + Filter.Listener listener = listenableFilter.listener(invocation); + if (listener != null) { + listener.onError(e, originalInvoker, invocation); + } + } finally { + listenableFilter.removeListener(invocation); + } + } else if (filter instanceof FILTER.Listener) { + FILTER.Listener listener = (FILTER.Listener) filter; + listener.onError(e, originalInvoker, invocation); + } + throw e; + } finally { + + } + return asyncResult; + } + + @Override + public void destroy() { + originalInvoker.destroy(); + } + + @Override + public String toString() { + return originalInvoker.toString(); + } + } + + @Experimental("Works for the same purpose as ClusterFilterChainNode, replace ClusterFilterChainNode with this one when proved stable enough") + class CopyOfClusterFilterChainNode, FILTER extends BaseFilter> + extends CopyOfFilterChainNode implements ClusterInvoker { + public CopyOfClusterFilterChainNode(TYPE originalInvoker, Invoker nextNode, FILTER filter) { + super(originalInvoker, nextNode, filter); + } + + @Override public URL getRegistryUrl() { return getOriginalInvoker().getRegistryUrl(); diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java index 8a579b8d9fa..7a5779aa2c0 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java @@ -19,16 +19,17 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.protocol.AbstractInvoker; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; public class DefaultFilterChainBuilderTest { @@ -62,8 +63,8 @@ protected Result doInvoke(Invocation invocation) throws Throwable { } }; invokerAfterBuild = defaultFilterChainBuilder.buildInvokerChain(invokerWithFilter, REFERENCE_FILTER_KEY, CONSUMER); - Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.FilterChainNode); - Assertions.assertTrue(((FilterChainBuilder.FilterChainNode) invokerAfterBuild).filter instanceof LogFilter); + Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.CallbackRegistrationInvoker); + Assertions.assertEquals(1, ((FilterChainBuilder.CallbackRegistrationInvoker) invokerAfterBuild).filters.size()); } @@ -97,8 +98,7 @@ protected Result doInvoke(Invocation invocation) throws Throwable { } }; invokerAfterBuild = defaultFilterChainBuilder.buildInvokerChain(invokerWithFilter, REFERENCE_FILTER_KEY, CONSUMER); - Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.FilterChainNode); - Assertions.assertTrue(((FilterChainBuilder.FilterChainNode) invokerAfterBuild).filter instanceof LogFilter); + Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.CallbackRegistrationInvoker); } } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java index 4b9fba5b9d7..1fd407508e6 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java @@ -295,7 +295,7 @@ public void testInvokerDestroyAndReList() { } invokers.clear(); MockInvoker invoker3 = new MockInvoker<>(Demo.class, url); - invoker3.setResult(AsyncRpcResult.newDefaultAsyncResult(null)); + invoker3.setResult(AsyncRpcResult.newDefaultAsyncResult(mock(RpcInvocation.class))); invokers.add(invoker3); dic.notify(invokers); return null; diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java index bb2ff1b8557..b95d0ef1a5e 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java @@ -23,6 +23,7 @@ import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.cluster.Directory; import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.model.ModuleModel; @@ -41,8 +42,6 @@ import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.rpc.Constants.MERGER_KEY; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -55,7 +54,7 @@ public class MergeableClusterInvokerTest { private Directory directory = mock(Directory.class); private Invoker firstInvoker = mock(Invoker.class); private Invoker secondInvoker = mock(Invoker.class); - private Invocation invocation = mock(Invocation.class); + private Invocation invocation = mock(RpcInvocation.class); private ModuleModel moduleModel = mock(ModuleModel.class); private MergeableClusterInvoker mergeableClusterInvoker; @@ -98,7 +97,7 @@ public void setUp() throws Exception { directory = mock(Directory.class); firstInvoker = mock(Invoker.class); secondInvoker = mock(Invoker.class); - invocation = mock(Invocation.class); + invocation = mock(RpcInvocation.class); } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java index 41eff47ed96..6245b71cf4a 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java @@ -28,8 +28,8 @@ import org.apache.dubbo.rpc.cluster.filter.DemoService; import org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder; import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; - import org.apache.dubbo.rpc.model.ApplicationModel; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -69,7 +69,7 @@ public void testBuildClusterInvokerChain() { Invoker invoker = demoCluster.join(directory, true); Assertions.assertTrue(invoker instanceof AbstractCluster.ClusterFilterInvoker); Assertions.assertTrue(((AbstractCluster.ClusterFilterInvoker) invoker).getFilterInvoker() - instanceof FilterChainBuilder.ClusterFilterChainNode); + instanceof FilterChainBuilder.ClusterCallbackRegistrationInvoker); } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java index 64f4cd3eff5..746bc1b7ad5 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java @@ -22,7 +22,7 @@ import java.lang.annotation.Target; /** - * Indicating unstable API, may get removed or changed in the next release. + * Indicating unstable API, may get removed or changed in future releases. */ @Retention(RetentionPolicy.CLASS) @Target({ diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java index f564ea29090..7b9de273efa 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java @@ -503,4 +503,9 @@ public interface CommonConstants { String ENABLE_CONNECTIVITY_VALIDATION = "dubbo.connectivity.validation"; String DUBBO_INTERNAL_APPLICATION = "DUBBO_INTERNAL_APPLICATION"; + + String WORKING_CLASSLOADER_KEY = "WORKING_CLASSLOADER"; + + String STAGED_CLASSLOADER_KEY = "STAGED_CLASSLOADER"; + } diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java index 69a3d5e6fd7..74e1e695924 100644 --- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java +++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java @@ -26,9 +26,9 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.HashSet; /** * The abstraction of {@link ExporterListener} is to record exported exporters, which should be extended by different sub-classes. @@ -56,12 +56,13 @@ public abstract class AbstractRegistryCenterExporterListener implements Exporter @Override public void exported(Exporter exporter) throws RpcException { ListenerExporterWrapper listenerExporterWrapper = (ListenerExporterWrapper) exporter; - FilterChainBuilder.FilterChainNode filterChainNode = (FilterChainBuilder.FilterChainNode) listenerExporterWrapper.getInvoker(); - if (filterChainNode == null || - filterChainNode.getInterface() != getInterface()) { + FilterChainBuilder.CallbackRegistrationInvoker callbackRegistrationInvoker = (FilterChainBuilder.CallbackRegistrationInvoker) listenerExporterWrapper.getInvoker(); + if (callbackRegistrationInvoker == null || + callbackRegistrationInvoker.getInterface() != getInterface()) { return; } exportedExporters.add(exporter); + FilterChainBuilder.CopyOfFilterChainNode filterChainNode = getFilterChainNode(callbackRegistrationInvoker); do { Filter filter = this.getFilter(filterChainNode); if (filter != null) { @@ -96,7 +97,24 @@ public Set getFilters() { /** * Use reflection to obtain {@link Filter} */ - private Filter getFilter(FilterChainBuilder.FilterChainNode filterChainNode) { + private FilterChainBuilder.CopyOfFilterChainNode getFilterChainNode(FilterChainBuilder.CallbackRegistrationInvoker callbackRegistrationInvoker) { + if (callbackRegistrationInvoker != null) { + Field field = null; + try { + field = callbackRegistrationInvoker.getClass().getDeclaredField("filterInvoker"); + field.setAccessible(true); + return (FilterChainBuilder.CopyOfFilterChainNode) field.get(callbackRegistrationInvoker); + } catch (NoSuchFieldException | IllegalAccessException e) { + // ignore + } + } + return null; + } + + /** + * Use reflection to obtain {@link Filter} + */ + private Filter getFilter(FilterChainBuilder.CopyOfFilterChainNode filterChainNode) { if (filterChainNode != null) { Field field = null; try { @@ -111,17 +129,17 @@ private Filter getFilter(FilterChainBuilder.FilterChainNode filterChainNode) { } /** - * Use reflection to obtain {@link FilterChainBuilder.FilterChainNode} + * Use reflection to obtain {@link FilterChainBuilder.CopyOfFilterChainNode} */ - private FilterChainBuilder.FilterChainNode getNextNode(FilterChainBuilder.FilterChainNode filterChainNode) { + private FilterChainBuilder.CopyOfFilterChainNode getNextNode(FilterChainBuilder.CopyOfFilterChainNode filterChainNode) { if (filterChainNode != null) { Field field = null; try { field = filterChainNode.getClass().getDeclaredField("nextNode"); field.setAccessible(true); Object object = field.get(filterChainNode); - if (object instanceof FilterChainBuilder.FilterChainNode) { - return (FilterChainBuilder.FilterChainNode) object; + if (object instanceof FilterChainBuilder.CopyOfFilterChainNode) { + return (FilterChainBuilder.CopyOfFilterChainNode) object; } } catch (NoSuchFieldException | IllegalAccessException e) { // ignore diff --git a/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java b/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java index aac62875b2a..6e12f6fe569 100644 --- a/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java +++ b/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java @@ -24,7 +24,9 @@ import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.model.ApplicationModel; + import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertNull; @@ -43,7 +45,7 @@ class ProviderAuthFilterTest { void testAuthDisabled() { URL url = mock(URL.class); Invoker invoker = mock(Invoker.class); - Invocation invocation = mock(Invocation.class); + Invocation invocation = mock(RpcInvocation.class); when(invoker.getUrl()).thenReturn(url); ProviderAuthFilter providerAuthFilter = new ProviderAuthFilter(ApplicationModel.defaultModel()); providerAuthFilter.invoke(invoker, invocation); @@ -58,7 +60,7 @@ void testAuthEnabled() { .addParameter(CommonConstants.APPLICATION_KEY, "test") .addParameter(Constants.SERVICE_AUTH, true); Invoker invoker = mock(Invoker.class); - Invocation invocation = mock(Invocation.class); + Invocation invocation = mock(RpcInvocation.class); when(invoker.getUrl()).thenReturn(url); ProviderAuthFilter providerAuthFilter = new ProviderAuthFilter(ApplicationModel.defaultModel()); providerAuthFilter.invoke(invoker, invocation); @@ -74,7 +76,7 @@ void testAuthFailed() { .addParameter(CommonConstants.APPLICATION_KEY, "test") .addParameter(Constants.SERVICE_AUTH, true); Invoker invoker = mock(Invoker.class); - Invocation invocation = mock(Invocation.class); + Invocation invocation = mock(RpcInvocation.class); when(invocation.getAttachment(Constants.REQUEST_SIGNATURE_KEY)).thenReturn(null); when(invoker.getUrl()).thenReturn(url); @@ -92,7 +94,7 @@ void testAuthFailedWhenNoSignature() { .addParameter(CommonConstants.APPLICATION_KEY, "test") .addParameter(Constants.SERVICE_AUTH, true); Invoker invoker = mock(Invoker.class); - Invocation invocation = mock(Invocation.class); + Invocation invocation = mock(RpcInvocation.class); when(invocation.getAttachment(Constants.REQUEST_SIGNATURE_KEY)).thenReturn(null); when(invoker.getUrl()).thenReturn(url); @@ -107,7 +109,7 @@ void testAuthFailedWhenNoAccessKeyPair() { .addParameter(CommonConstants.APPLICATION_KEY, "test-provider") .addParameter(Constants.SERVICE_AUTH, true); Invoker invoker = mock(Invoker.class); - Invocation invocation = mock(Invocation.class); + Invocation invocation = mock(RpcInvocation.class); when(invocation.getObjectAttachment(Constants.REQUEST_SIGNATURE_KEY)).thenReturn("dubbo"); when(invocation.getObjectAttachment(Constants.AK_KEY)).thenReturn("ak"); when(invocation.getObjectAttachment(CommonConstants.CONSUMER)).thenReturn("test-consumer"); @@ -135,7 +137,7 @@ void testAuthFailedWhenParameterError() { .addParameter(Constants.SERVICE_AUTH, true); Invoker invoker = mock(Invoker.class); - Invocation invocation = mock(Invocation.class); + Invocation invocation = mock(RpcInvocation.class); when(invocation.getObjectAttachment(Constants.AK_KEY)).thenReturn("ak"); when(invocation.getObjectAttachment(CommonConstants.CONSUMER)).thenReturn("test-consumer"); when(invocation.getObjectAttachment(Constants.REQUEST_TIMESTAMP_KEY)).thenReturn(currentTimeMillis); @@ -168,7 +170,7 @@ void testAuthSuccessfully() { .addParameter(CommonConstants.APPLICATION_KEY, "test-provider") .addParameter(Constants.SERVICE_AUTH, true); Invoker invoker = mock(Invoker.class); - Invocation invocation = mock(Invocation.class); + Invocation invocation = mock(RpcInvocation.class); when(invocation.getAttachment(Constants.AK_KEY)).thenReturn("ak"); when(invocation.getAttachment(CommonConstants.CONSUMER)).thenReturn("test-consumer"); when(invocation.getAttachment(Constants.REQUEST_TIMESTAMP_KEY)).thenReturn(String.valueOf(currentTimeMillis)); diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java index 1f51a54d4ff..0635d290c2e 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java @@ -74,4 +74,50 @@ public interface AsyncContext { * */ void signalContextSwitch(); + + /** + * Reset Context is not necessary. Only reset context after result was write back if it is necessary. + * + * + * public class AsyncServiceImpl implements AsyncService { + * public String sayHello(String name) { + * final AsyncContext asyncContext = RpcContext.startAsync(); + * new Thread(() -> { + *

+ * // the right place to use this method + * asyncContext.signalContextSwitch(); + *

+ * try { + * Thread.sleep(500); + * } catch (InterruptedException e) { + * e.printStackTrace(); + * } + * asyncContext.write("Hello " + name + ", response from provider."); + * // only reset after asyncContext.write() + * asyncContext.resetContext(); + * }).start(); + * return null; + * } + * } + * + * + * + * public class AsyncServiceImpl implements AsyncService { + * public CompletableFuture sayHello(String name) { + * CompletableFuture future = new CompletableFuture(); + * final AsyncContext asyncContext = RpcContext.startAsync(); + * new Thread(() -> { + * // the right place to use this method + * asyncContext.signalContextSwitch(); + * // some operations... + * future.complete(); + * // only reset after future.complete() + * asyncContext.resetContext(); + * }).start(); + * return future; + * } + * } + * + */ + void resetContext(); } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java index 1331b21e0d5..9f9a083f877 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java @@ -26,12 +26,13 @@ public class AsyncContextImpl implements AsyncContext { private CompletableFuture future; - private RpcContextAttachment storedContext; - private RpcContextAttachment storedServerContext; + private final RpcContext.RestoreContext restoreContext; + private final ClassLoader restoreClassLoader; + private ClassLoader stagedClassLoader; public AsyncContextImpl() { - this.storedContext = RpcContext.getClientAttachment(); - this.storedServerContext = RpcContext.getServerContext(); + restoreContext = RpcContext.storeContext(false); + restoreClassLoader = Thread.currentThread().getContextClassLoader(); } @Override @@ -67,9 +68,19 @@ public void start() { @Override public void signalContextSwitch() { - RpcContext.restoreContext(storedContext); - RpcContext.restoreServerContext(storedServerContext); - // Restore any other contexts in here if necessary. + RpcContext.restoreContext(restoreContext); + if (restoreClassLoader != null) { + stagedClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(restoreClassLoader); + } + } + + @Override + public void resetContext() { + RpcContext.removeContext(); + if (stagedClassLoader != null) { + Thread.currentThread().setContextClassLoader(restoreClassLoader); + } } public CompletableFuture getInternalFuture() { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java index 1accccd5bc2..80a46da8bdb 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.threadpool.ThreadlessExecutor; import org.apache.dubbo.rpc.model.ConsumerMethodModel; +import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -51,10 +52,10 @@ public class AsyncRpcResult implements Result { /** * RpcContext may already have been changed when callback happens, it happens when the same thread is used to execute another RPC call. - * So we should keep the reference of current RpcContext instance and restore it before callback being executed. + * So we should keep the copy of current RpcContext instance and restore it before callback being executed. */ - private RpcContextAttachment storedContext; - private RpcContextAttachment storedServerContext; + private RpcContext.RestoreContext storedContext; + private Executor executor; private Invocation invocation; @@ -64,8 +65,12 @@ public class AsyncRpcResult implements Result { public AsyncRpcResult(CompletableFuture future, Invocation invocation) { this.responseFuture = future; this.invocation = invocation; - this.storedContext = RpcContext.getClientAttachment(); - this.storedServerContext = RpcContext.getServerContext(); + RpcInvocation rpcInvocation = (RpcInvocation) invocation; + if (InvokeMode.SYNC != rpcInvocation.getInvokeMode() && !future.isDone()) { + this.storedContext = RpcContext.storeContext(false); + } else { + this.storedContext = RpcContext.storeContext(true); + } } /** @@ -193,10 +198,17 @@ public Object recreate() throws Throwable { public Result whenCompleteWithContext(BiConsumer fn) { this.responseFuture = this.responseFuture.whenComplete((v, t) -> { - beforeContext.accept(v, t); + RpcContext.RestoreContext tmpContext = RpcContext.storeContext(false); + RpcContext.restoreContext(storedContext); + fn.accept(v, t); - afterContext.accept(v, t); + + RpcContext.restoreContext(tmpContext); }); + + // Necessary! update future in context, see https://github.com/apache/dubbo/issues/9461 + RpcContext.getServiceContext().setFuture(new FutureAdapter<>(this.responseFuture)); + return this; } @@ -280,24 +292,6 @@ public void setExecutor(Executor executor) { this.executor = executor; } - /** - * tmp context to use when the thread switch to Dubbo thread. - */ - private RpcContextAttachment tmpContext; - - private RpcContextAttachment tmpServerContext; - private BiConsumer beforeContext = (appResponse, t) -> { - tmpContext = RpcContext.getClientAttachment(); - tmpServerContext = RpcContext.getServerContext(); - RpcContext.restoreContext(storedContext); - RpcContext.restoreServerContext(storedServerContext); - }; - - private BiConsumer afterContext = (appResponse, t) -> { - RpcContext.restoreContext(tmpContext); - RpcContext.restoreServerContext(tmpServerContext); - }; - /** * Some utility methods used to quickly generate default AsyncRpcResult instance. */ diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java index 594954f5fc5..02c643abf53 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java @@ -22,10 +22,33 @@ public interface BaseFilter { */ Result invoke(Invoker invoker, Invocation invocation) throws RpcException; + /** + * This callback listener applies to both synchronous and asynchronous calls, please put logics that need to be executed + * on return of rpc result in onResponse or onError respectively based on it is normal return or exception return. + *

+ * There's something that needs to pay attention on legacy synchronous style filer refactor, the thing is, try to move logics + * previously defined in the 'finally block' to both onResponse and onError. + */ interface Listener { + /** + * This method will only be called on successful remote rpc execution, that means, the service in on remote received + * the request and the result (normal or exceptional) returned successfully. + * + * @param appResponse, the rpc call result, it can represent both normal result and exceptional result + * @param invoker, context + * @param invocation, context + */ void onResponse(Result appResponse, Invoker invoker, Invocation invocation); + /** + * This method will be called on detection of framework exceptions, for example, TimeoutException, NetworkException + * Exception raised in Filters, etc. + * + * @param t, framework exception + * @param invoker, context + * @param invocation, context + */ void onError(Throwable t, Invoker invoker, Invocation invocation); } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java index 160ddcfe1b9..b590bfbb35c 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java @@ -23,9 +23,10 @@ * It's recommended to implement Filter.Listener directly for callback registration, check the default implementation, * see {@link org.apache.dubbo.rpc.filter.ExceptionFilter}, for example. *

- * If you do not want to share Listener instance between RPC calls. You can use ListenableFilter + * If you do not want to share Listener instance between RPC calls. ListenableFilter can be used * to keep a 'one Listener each RPC call' model. */ +@Deprecated public abstract class ListenableFilter implements Filter { protected Listener listener = null; diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java index 13f108bc19f..1db82f189bd 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java @@ -122,10 +122,6 @@ public static RpcContextAttachment getServerContext() { return SERVER_LOCAL.get(); } - public static void restoreServerContext(RpcContextAttachment oldServerContext) { - SERVER_LOCAL.set(oldServerContext); - } - /** * remove server side context. * @@ -198,10 +194,6 @@ public void clearAfterEachInvoke(boolean remove) { this.remove = remove; } - public static void restoreContext(RpcContextAttachment oldContext) { - CLIENT_ATTACHMENT.set(oldContext); - } - /** * remove context. * @@ -810,4 +802,70 @@ public void setConsumerUrl(URL consumerUrl) { public static void setRpcContext(URL url) { RpcServiceContext.setRpcContext(url); } + + protected static RestoreContext storeContext(boolean needCopy) { + return new RestoreContext(needCopy); + } + + protected static void restoreContext(RestoreContext restoreContext) { + if (restoreContext != null) { + restoreContext.restore(); + } + } + + protected static void restoreClientAttachment(RpcContextAttachment oldContext) { + CLIENT_ATTACHMENT.set(oldContext); + } + + protected static void restoreServerContext(RpcContextAttachment oldServerContext) { + SERVER_LOCAL.set(oldServerContext); + } + + protected static void restoreServerAttachment(RpcContextAttachment oldServerContext) { + SERVER_ATTACHMENT.set(oldServerContext); + } + + protected static void restoreServiceContext(RpcServiceContext oldServiceContext) { + SERVICE_CONTEXT.set(oldServiceContext); + } + + /** + * Used to temporarily store and restore all kinds of contexts of current thread. + */ + public static class RestoreContext { + private final RpcServiceContext serviceContext; + private final RpcContextAttachment clientAttachment; + private final RpcContextAttachment serverAttachment; + private final RpcContextAttachment serverLocal; + + public RestoreContext(boolean needCopy) { + serviceContext = getServiceContext().copyOf(needCopy); + clientAttachment = getClientAttachment().copyOf(needCopy); + serverAttachment = getServerAttachment().copyOf(needCopy); + serverLocal = getServerContext().copyOf(needCopy); + } + + public void restore() { + if (serviceContext != null) { + restoreServiceContext(serviceContext); + } else { + removeServiceContext(); + } + if (clientAttachment != null) { + restoreClientAttachment(clientAttachment); + } else { + removeClientAttachment(); + } + if (serverAttachment != null) { + restoreServerAttachment(serverAttachment); + } else { + removeServerAttachment(); + } + if (serverLocal != null) { + restoreServerContext(serverLocal); + } else { + removeServerContext(); + } + } + } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java index ddbd9813a7d..aec69433bef 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java @@ -201,4 +201,28 @@ public Object get(String key) { return getAttachment(key); } + /** + * Also see {@link RpcServiceContext#copyOf(boolean)} + * + * @return a copy of RpcContextAttachment with deep copied attachments + */ + public RpcContextAttachment copyOf(boolean needCopy) { + if (!isValid()) { + return null; + } + + if (needCopy) { + RpcContextAttachment copy = new RpcContextAttachment(); + if (CollectionUtils.isNotEmptyMap(attachments)) { + copy.attachments.putAll(this.attachments); + } + return copy; + } else { + return this; + } + } + + private boolean isValid() { + return CollectionUtils.isNotEmptyMap(attachments); + } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java index ebec31f7d14..e61632d0535 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java @@ -41,6 +41,9 @@ public class RpcServiceContext extends RpcContext { protected RpcServiceContext() { } + // RPC service context updated before each service call. + private URL consumerUrl; + private List urls; private URL url; @@ -584,9 +587,6 @@ public AsyncContext getAsyncContext() { return asyncContext; } - // RPC service context updated before each service call. - private URL consumerUrl; - @Override public String getGroup() { if (consumerUrl == null) { @@ -649,4 +649,39 @@ public static void setRpcContext(URL url) { RpcServiceContext rpcContext = RpcContext.getServiceContext(); rpcContext.setConsumerUrl(url); } + + /** + * Only part of the properties are copied, the others are either not used currently or can be got from invocation. + * Also see {@link RpcContextAttachment#copyOf(boolean)} + * + * @param needCopy + * @return a shallow copy of RpcServiceContext + */ + public RpcServiceContext copyOf(boolean needCopy) { + if (!isValid()) { + return this; + } + + if (needCopy) { + RpcServiceContext copy = new RpcServiceContext(); + copy.consumerUrl = this.consumerUrl; + copy.localAddress = this.localAddress; + copy.remoteAddress = this.remoteAddress; + copy.invocation = this.invocation; + copy.asyncContext = this.asyncContext; + return copy; + } else { + return this; + } + } + + + private boolean isValid() { + return this.consumerUrl != null + || this.localAddress != null + || this.remoteAddress != null + || this.invocation != null + || this.asyncContext != null; + } + } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderCallbackFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderCallbackFilter.java new file mode 100644 index 00000000000..a9b12694476 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderCallbackFilter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.filter; + +import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.rpc.BaseFilter; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; + +import static org.apache.dubbo.common.constants.CommonConstants.WORKING_CLASSLOADER_KEY; + +/** + * Switch thread context class loader on filter callback. + */ +@Activate(group = CommonConstants.PROVIDER, order = Integer.MAX_VALUE) +public class ClassLoaderCallbackFilter implements Filter, BaseFilter.Listener { + + @Override + public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { + return invoker.invoke(invocation); + } + + @Override + public void onResponse(Result appResponse, Invoker invoker, Invocation invocation) { + setClassLoader(invoker, invocation); + } + + @Override + public void onError(Throwable t, Invoker invoker, Invocation invocation) { + setClassLoader(invoker, invocation); + } + + private void setClassLoader(Invoker invoker, Invocation invocation) { + ClassLoader workingClassLoader = (ClassLoader) invocation.get(WORKING_CLASSLOADER_KEY); + if (workingClassLoader != null) { + Thread.currentThread().setContextClassLoader(workingClassLoader); + } + } +} diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java index c71e2d117c5..6a14fe1241d 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java @@ -18,27 +18,51 @@ import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.rpc.BaseFilter; import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; +import static org.apache.dubbo.common.constants.CommonConstants.STAGED_CLASSLOADER_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.WORKING_CLASSLOADER_KEY; + /** * Set the current execution thread class loader to service interface's class loader. */ @Activate(group = CommonConstants.PROVIDER, order = -30000) -public class ClassLoaderFilter implements Filter { +public class ClassLoaderFilter implements Filter, BaseFilter.Listener { @Override public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { - ClassLoader ocl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader()); + ClassLoader stagedClassLoader = Thread.currentThread().getContextClassLoader(); + ClassLoader effectiveClassLoader = invoker.getInterface().getClassLoader(); + invocation.put(STAGED_CLASSLOADER_KEY, stagedClassLoader); + invocation.put(WORKING_CLASSLOADER_KEY, effectiveClassLoader); + + Thread.currentThread().setContextClassLoader(effectiveClassLoader); try { return invoker.invoke(invocation); } finally { - Thread.currentThread().setContextClassLoader(ocl); + Thread.currentThread().setContextClassLoader(stagedClassLoader); } } + @Override + public void onResponse(Result appResponse, Invoker invoker, Invocation invocation) { + resetClassLoader(invoker, invocation); + } + + @Override + public void onError(Throwable t, Invoker invoker, Invocation invocation) { + resetClassLoader(invoker, invocation); + } + + private void resetClassLoader(Invoker invoker, Invocation invocation) { + ClassLoader stagedClassLoader = (ClassLoader) invocation.get(STAGED_CLASSLOADER_KEY); + if (stagedClassLoader != null) { + Thread.currentThread().setContextClassLoader(stagedClassLoader); + } + } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java index 08a1286ead4..3e38c677e2f 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java @@ -133,7 +133,7 @@ public Result invoke(Invoker invoker, Invocation invocation) throws RpcExcept context.clearAfterEachInvoke(true); RpcContext.removeServerAttachment(); RpcContext.removeServiceContext(); - // IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread. + // IMPORTANT! For async scenario, context must be removed from current thread, so a new RpcContext is always created for the next invoke for the same thread. RpcContext.getClientAttachment().removeAttachment(TIME_COUNTDOWN_KEY); RpcContext.removeServerContext(); } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java index 2230f507965..336671a2b70 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java @@ -108,10 +108,10 @@ public Result invoke(Invocation invocation) throws RpcException { } private CompletableFuture wrapWithFuture(Object value) { - if (RpcContext.getServiceContext().isAsyncStarted()) { - return ((AsyncContextImpl)(RpcContext.getServiceContext().getAsyncContext())).getInternalFuture(); - } else if (value instanceof CompletableFuture) { + if (value instanceof CompletableFuture) { return (CompletableFuture) value; + } else if (RpcContext.getServiceContext().isAsyncStarted()) { + return ((AsyncContextImpl) (RpcContext.getServiceContext().getAsyncContext())).getInternalFuture(); } return CompletableFuture.completedFuture(value); } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter index 7c526c25aec..34d56b4358e 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter +++ b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter @@ -4,10 +4,11 @@ genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter token=org.apache.dubbo.rpc.filter.TokenFilter accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter +classloader-callback=org.apache.dubbo.rpc.filter.ClassLoaderCallbackFilter context=org.apache.dubbo.rpc.filter.ContextFilter exception=org.apache.dubbo.rpc.filter.ExceptionFilter executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter compatible=org.apache.dubbo.rpc.filter.CompatibleFilter timeout=org.apache.dubbo.rpc.filter.TimeoutFilter -tps=org.apache.dubbo.rpc.filter.TpsLimitFilter \ No newline at end of file +tps=org.apache.dubbo.rpc.filter.TpsLimitFilter diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java index 4f1e6e94487..a7dbd8f2936 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java @@ -202,4 +202,9 @@ public void testObjectAttachment() { rpcContext.setObjectAttachments(map); Assertions.assertEquals(map, rpcContext.getObjectAttachments()); } + + @Test + public void testRestore() { + + } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java index 7750195c297..9a5152bc6ae 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java @@ -22,6 +22,7 @@ import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.support.DemoService; import org.apache.dubbo.rpc.support.Type; @@ -49,7 +50,7 @@ public void tearDown() { @Test public void testInvokerGeneric() { - invocation = mock(Invocation.class); + invocation = mock(RpcInvocation.class); given(invocation.getMethodName()).willReturn("$enumlength"); given(invocation.getParameterTypes()).willReturn(new Class[]{Enum.class}); given(invocation.getArguments()).willReturn(new Object[]{"hello"}); @@ -69,7 +70,7 @@ public void testInvokerGeneric() { @Test public void testResultHasException() { - invocation = mock(Invocation.class); + invocation = mock(RpcInvocation.class); given(invocation.getMethodName()).willReturn("enumlength"); given(invocation.getParameterTypes()).willReturn(new Class[]{Enum.class}); given(invocation.getArguments()).willReturn(new Object[]{"hello"}); @@ -90,7 +91,7 @@ public void testResultHasException() { @Test public void testInvokerJsonPojoSerialization() throws Exception { - invocation = mock(Invocation.class); + invocation = mock(RpcInvocation.class); given(invocation.getMethodName()).willReturn("enumlength"); given(invocation.getParameterTypes()).willReturn(new Class[]{Type[].class}); given(invocation.getArguments()).willReturn(new Object[]{"hello"}); @@ -100,7 +101,8 @@ public void testInvokerJsonPojoSerialization() throws Exception { given(invoker.getInterface()).willReturn(DemoService.class); AppResponse result = new AppResponse(); result.setValue("High"); - given(invoker.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult(result, invocation)); + AsyncRpcResult defaultAsyncResult = AsyncRpcResult.newDefaultAsyncResult(result, invocation); + given(invoker.invoke(invocation)).willReturn(defaultAsyncResult); URL url = URL.valueOf("test://test:11/test?group=dubbo&version=1.1&serialization=json"); given(invoker.getUrl()).willReturn(url); @@ -112,7 +114,7 @@ public void testInvokerJsonPojoSerialization() throws Exception { @Test public void testInvokerNonJsonEnumSerialization() throws Exception { - invocation = mock(Invocation.class); + invocation = mock(RpcInvocation.class); given(invocation.getMethodName()).willReturn("enumlength"); given(invocation.getParameterTypes()).willReturn(new Class[]{Type[].class}); given(invocation.getArguments()).willReturn(new Object[]{"hello"}); @@ -122,7 +124,8 @@ public void testInvokerNonJsonEnumSerialization() throws Exception { given(invoker.getInterface()).willReturn(DemoService.class); AppResponse result = new AppResponse(); result.setValue("High"); - given(invoker.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult(result, invocation)); + AsyncRpcResult defaultAsyncResult = AsyncRpcResult.newDefaultAsyncResult(result, invocation); + given(invoker.invoke(invocation)).willReturn(defaultAsyncResult); URL url = URL.valueOf("test://test:11/test?group=dubbo&version=1.1"); given(invoker.getUrl()).willReturn(url); @@ -134,7 +137,7 @@ public void testInvokerNonJsonEnumSerialization() throws Exception { @Test public void testInvokerNonJsonNonPojoSerialization() { - invocation = mock(Invocation.class); + invocation = mock(RpcInvocation.class); given(invocation.getMethodName()).willReturn("echo"); given(invocation.getParameterTypes()).willReturn(new Class[]{String.class}); given(invocation.getArguments()).willReturn(new Object[]{"hello"}); @@ -154,7 +157,7 @@ public void testInvokerNonJsonNonPojoSerialization() { @Test public void testInvokerNonJsonPojoSerialization() { - invocation = mock(Invocation.class); + invocation = mock(RpcInvocation.class); given(invocation.getMethodName()).willReturn("echo"); given(invocation.getParameterTypes()).willReturn(new Class[]{String.class}); given(invocation.getArguments()).willReturn(new Object[]{"hello"}); diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java index 76ee02480c7..b31d22595ab 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java @@ -22,6 +22,7 @@ import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.support.DemoService; import org.junit.jupiter.api.Test; @@ -37,7 +38,7 @@ public class EchoFilterTest { @SuppressWarnings("unchecked") @Test public void testEcho() { - Invocation invocation = mock(Invocation.class); + Invocation invocation = mock(RpcInvocation.class); given(invocation.getMethodName()).willReturn("$echo"); given(invocation.getParameterTypes()).willReturn(new Class[]{Enum.class}); given(invocation.getArguments()).willReturn(new Object[]{"hello"}); diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java index be853653335..256427377c5 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java @@ -21,6 +21,7 @@ import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.support.BlockMyInvoker; import org.junit.jupiter.api.Assertions; @@ -56,7 +57,7 @@ public void testInvokeWithTimeout() throws Exception { URL url = URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&timeout=" + timeout); Invoker invoker = new BlockMyInvoker(url, (timeout + 100)); - Invocation invocation = Mockito.mock(Invocation.class); + Invocation invocation = Mockito.mock(RpcInvocation.class); when(invocation.getMethodName()).thenReturn("testInvokeWithTimeout"); Result result = timeoutFilter.invoke(invoker, invocation); diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java index 2c8a618429d..ae8a69cc5d0 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java @@ -17,8 +17,8 @@ package org.apache.dubbo.rpc.support; import org.apache.dubbo.rpc.AttachmentsAdapter; -import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.model.ServiceModel; import java.util.HashMap; @@ -34,7 +34,7 @@ /** * MockInvocation.java */ -public class MockInvocation implements Invocation { +public class MockInvocation extends RpcInvocation { private Map attachments;