> redisScriptRateLimiter) {
+ this.redisScriptRateLimiter = redisScriptRateLimiter;
+ }
@Override
public int getOrder() {
diff --git a/extension/extension-notify-email/pom.xml b/extension/extension-notify-email/pom.xml
index 37becc700..c9b2a0776 100644
--- a/extension/extension-notify-email/pom.xml
+++ b/extension/extension-notify-email/pom.xml
@@ -12,13 +12,18 @@
- org.springframework.boot
- spring-boot-starter-mail
+ org.springframework
+ spring-context-support
- org.springframework.boot
- spring-boot-starter-thymeleaf
+ org.eclipse.angus
+ jakarta.mail
+
+
+
+ org.thymeleaf
+ thymeleaf
diff --git a/extension/extension-notify-email/src/main/java/org/dromara/dynamictp/extension/notify/email/EmailNotifier.java b/extension/extension-notify-email/src/main/java/org/dromara/dynamictp/extension/notify/email/EmailNotifier.java
index 820c92bd8..1184f5fd5 100644
--- a/extension/extension-notify-email/src/main/java/org/dromara/dynamictp/extension/notify/email/EmailNotifier.java
+++ b/extension/extension-notify-email/src/main/java/org/dromara/dynamictp/extension/notify/email/EmailNotifier.java
@@ -17,6 +17,7 @@
package org.dromara.dynamictp.extension.notify.email;
+import jakarta.mail.internet.MimeMessage;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.em.NotifyPlatformEnum;
@@ -28,8 +29,6 @@
import org.thymeleaf.TemplateEngine;
import org.thymeleaf.context.Context;
-import javax.annotation.Resource;
-import javax.mail.internet.MimeMessage;
import java.util.Date;
/**
@@ -47,11 +46,14 @@ public class EmailNotifier extends AbstractNotifier {
@Value("${spring.mail.title:ThreadPool Notify}")
private String title;
- @Resource
- private JavaMailSender javaMailSender;
+ private final JavaMailSender javaMailSender;
- @Resource
- private TemplateEngine templateEngine;
+ private final TemplateEngine templateEngine;
+
+ public EmailNotifier(JavaMailSender javaMailSender, TemplateEngine templateEngine) {
+ this.javaMailSender = javaMailSender;
+ this.templateEngine = templateEngine;
+ }
@Override
public String platform() {
diff --git a/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/JVMTIUtil.java b/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/JVMTIUtil.java
index 04736a329..9726a03de 100644
--- a/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/JVMTIUtil.java
+++ b/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/JVMTIUtil.java
@@ -29,23 +29,23 @@ public class JVMTIUtil {
private static String libName;
static {
- if (OSUtils.isMac()) {
+ if (OSUtil.isMac()) {
libName = "libJniLibrary.dylib";
}
- if (OSUtils.isLinux()) {
- if (OSUtils.isArm32()) {
+ if (OSUtil.isLinux()) {
+ if (OSUtil.isArm32()) {
libName = "libJniLibrary-arm.so";
- } else if (OSUtils.isArm64()) {
+ } else if (OSUtil.isArm64()) {
libName = "libJniLibrary-aarch64.so";
- } else if (OSUtils.isX8664()) {
+ } else if (OSUtil.isX8664()) {
libName = "libJniLibrary-x64.so";
} else {
- libName = "libJniLibrary-" + OSUtils.arch() + ".so";
+ libName = "libJniLibrary-" + OSUtil.arch() + ".so";
}
}
- if (OSUtils.isWindows()) {
+ if (OSUtil.isWindows()) {
libName = "libJniLibrary-x64.dll";
- if (OSUtils.isX86()) {
+ if (OSUtil.isX86()) {
libName = "libJniLibrary-x86.dll";
}
}
diff --git a/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/OSUtils.java b/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/OSUtil.java
similarity index 99%
rename from jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/OSUtils.java
rename to jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/OSUtil.java
index 88d93ca02..e98100eda 100644
--- a/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/OSUtils.java
+++ b/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/OSUtil.java
@@ -26,7 +26,7 @@
* @author dragon-zhang
* @since 1.1.6
*/
-public class OSUtils {
+public class OSUtil {
private static final String OPERATING_SYSTEM_NAME = System.getProperty("os.name").toLowerCase(Locale.ENGLISH);
@@ -49,7 +49,7 @@ public class OSUtils {
arch = normalizeArch(OPERATING_SYSTEM_ARCH);
}
- private OSUtils() {
+ private OSUtil() {
}
public static boolean isWindows() {
diff --git a/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/PlatformEnum.java b/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/PlatformEnum.java
index b781c8838..332c46d0d 100644
--- a/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/PlatformEnum.java
+++ b/jvmti/jvmti-runtime/src/main/java/org/dromara/dynamictp/jvmti/PlatformEnum.java
@@ -19,9 +19,8 @@
/**
* Enum of supported operating systems.
- *
* This file is copied from here
- *
+ *
* @author dragon-zhang
* @since 1.1.6
*/
diff --git a/jvmti/jvmti-runtime/src/main/resources/libJniLibrary-x64.dll b/jvmti/jvmti-runtime/src/main/resources/libJniLibrary-x64.dll
index c6c1e844f..ceba69f86 100644
Binary files a/jvmti/jvmti-runtime/src/main/resources/libJniLibrary-x64.dll and b/jvmti/jvmti-runtime/src/main/resources/libJniLibrary-x64.dll differ
diff --git a/logging/src/main/java/org/dromara/dynamictp/logging/logback/DtpLogbackLogging.java b/logging/src/main/java/org/dromara/dynamictp/logging/logback/DtpLogbackLogging.java
index 47f982092..07984a8ad 100644
--- a/logging/src/main/java/org/dromara/dynamictp/logging/logback/DtpLogbackLogging.java
+++ b/logging/src/main/java/org/dromara/dynamictp/logging/logback/DtpLogbackLogging.java
@@ -18,10 +18,9 @@
package org.dromara.dynamictp.logging.logback;
import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.util.ContextInitializer;
+import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.logging.AbstractDtpLogging;
import org.dromara.dynamictp.logging.LogHelper;
-import lombok.extern.slf4j.Slf4j;
/**
* DtpLogbackLogging related
@@ -40,7 +39,7 @@ public class DtpLogbackLogging extends AbstractDtpLogging {
public void loadConfiguration() {
try {
loggerContext = new LoggerContext();
- new ContextInitializer(loggerContext).configureByResource(getResourceUrl(LOGBACK_LOCATION));
+// new ContextInitializer(loggerContext).configureByResource(getResourceUrl(LOGBACK_LOCATION));
} catch (Exception e) {
log.error("Cannot initialize dtp logback logging.");
}
diff --git a/pom.xml b/pom.xml
index 98f6f643f..6dbeded23 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,17 +9,19 @@
pom
DynamicTp Project
- 🔥🔥🔥轻量级动态线程池,内置监控告警功能,基于主流配置中心(已支持Nacos、Apollo、Zookeeper、Consul、Etcd、Polaris,可通过SPI自定义实现)
+
+ 🔥🔥🔥轻量级动态线程池,内置监控告警功能,基于主流配置中心(已支持Nacos、Apollo、Zookeeper、Consul、Etcd、Polaris,可通过SPI自定义实现)
+
https://github.com/yanhom1314/dynamic-tp
- 1.1.9.1
-
- 8
- 8
-
- 2.7.18
- 2021.0.8
+
+ 1.1.9.1-3.x
+ 17
+ 17
+
+ 3.1.4
+ 2022.0.3
31.1-jre
4.4
@@ -27,12 +29,13 @@
1.3.0
3.1.0
3.8.1
- 2.4
- 3.2.0
+ 3.2.0
+ 3.6.2
1.6
- 1.6.7
+ 1.6.13
3.7.1
3.1.0
+ 2.8.2
3.3.0
@@ -191,21 +194,43 @@
org.apache.maven.plugins
- maven-compiler-plugin
- ${maven-compiler-plugin.version}
+ maven-surefire-plugin
+ 3.0.0
- UTF-8
-
- ${maven.compiler.target}
+ 1
+ false
+
+ --add-opens=java.base/java.lang=ALL-UNNAMED
+ --add-opens=java.base/java.util=ALL-UNNAMED
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ ${maven-compiler-plugin.version}
+ true
+
+
+ main-compile
+ compile
+
+ compile
+ testCompile
+
+
+
+ ${maven.compiler.target}
+ UTF-8
+
+
+
+
+
org.apache.maven.plugins
maven-source-plugin
${maven-source-plugin.version}
-
- true
-
package
@@ -215,19 +240,21 @@
-
org.apache.maven.plugins
maven-javadoc-plugin
${maven-javadoc-plugin.version}
- package
+ package
jar
+
+ false
+
@@ -245,11 +272,10 @@
-
org.apache.maven.plugins
maven-gpg-plugin
- 1.6
+ ${maven-gpg-plugin.version}
verify
@@ -263,7 +289,7 @@
org.sonatype.plugins
nexus-staging-maven-plugin
- 1.6.7
+ ${nexus-staging-maven-plugin.version}
true
ossrh
@@ -271,7 +297,6 @@
false
-
org.apache.maven.plugins
diff --git a/spring/src/main/java/org/dromara/dynamictp/spring/DtpPostProcessor.java b/spring/src/main/java/org/dromara/dynamictp/spring/DtpPostProcessor.java
index d2c285137..80e0cbef7 100644
--- a/spring/src/main/java/org/dromara/dynamictp/spring/DtpPostProcessor.java
+++ b/spring/src/main/java/org/dromara/dynamictp/spring/DtpPostProcessor.java
@@ -21,6 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.lang3.StringUtils;
+import org.dromara.dynamictp.common.constant.DynamicTpConst;
import org.dromara.dynamictp.common.plugin.DtpInterceptorRegistry;
import org.dromara.dynamictp.common.util.ConstructorUtil;
import org.dromara.dynamictp.common.util.ReflectionUtil;
@@ -32,6 +33,7 @@
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.dromara.dynamictp.core.support.proxy.ScheduledThreadPoolExecutorProxy;
import org.dromara.dynamictp.core.support.proxy.ThreadPoolExecutorProxy;
+import org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrappers;
import org.springframework.beans.BeansException;
@@ -54,6 +56,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@@ -88,13 +91,15 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
@Override
public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException {
- if (!(bean instanceof ThreadPoolExecutor) && !(bean instanceof ThreadPoolTaskExecutor)) {
+ if (!(bean instanceof ThreadPoolExecutor) && !(bean instanceof ThreadPoolTaskExecutor) &&
+ !(bean.getClass().getName().equals(DynamicTpConst.THREAD_PER_TASK_EXECUTOR)) &&
+ !(bean instanceof VirtualThreadExecutorProxy)) {
return bean;
}
if (bean instanceof DtpExecutor) {
return registerAndReturnDtp(bean);
}
- // register juc ThreadPoolExecutor or ThreadPoolTaskExecutor
+ // register juc ThreadPoolExecutor or ThreadPoolTaskExecutor or VirtualThreadExecutor
return registerAndReturnCommon(bean, beanName);
}
@@ -121,6 +126,9 @@ private Object registerAndReturnCommon(Object bean, String beanName) {
} else {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
if (!(beanDefinition instanceof AnnotatedBeanDefinition)) {
+ if (beanDefinition.getBeanClassName().equals(VirtualThreadExecutorProxy.class.getName())) {
+ return doRegisterAndReturnCommon(bean, beanName);
+ }
return bean;
}
AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;
@@ -148,13 +156,18 @@ private Object doRegisterAndReturnCommon(Object bean, String poolName) {
try {
ReflectionUtil.setFieldValue("threadPoolExecutor", bean, proxy);
tryWrapTaskDecorator(poolName, poolTaskExecutor, proxy);
- } catch (IllegalAccessException ignored) { }
+ } catch (IllegalAccessException ignored) {
+ }
DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE);
return bean;
}
Executor proxy;
if (bean instanceof ScheduledThreadPoolExecutor) {
proxy = newScheduledTpProxy(poolName, (ScheduledThreadPoolExecutor) bean);
+ } else if (bean.getClass().getName().equals("java.util.concurrent.ThreadPerTaskExecutor")) {
+ proxy = newVirtualThreadProxy(poolName, (ExecutorService) bean);
+ } else if (bean instanceof VirtualThreadExecutorProxy) {
+ proxy = (Executor) bean;
} else {
proxy = newProxy(poolName, (ThreadPoolExecutor) bean);
}
@@ -184,6 +197,10 @@ private ScheduledThreadPoolExecutorProxy newScheduledTpProxy(String name, Schedu
return proxy;
}
+ private VirtualThreadExecutorProxy newVirtualThreadProxy(String name, ExecutorService originExecutor) {
+ return new VirtualThreadExecutorProxy(originExecutor);
+ }
+
private void tryWrapTaskDecorator(String poolName, ThreadPoolTaskExecutor poolTaskExecutor, ThreadPoolExecutorProxy proxy) throws IllegalAccessException {
Object taskDecorator = ReflectionUtil.getFieldValue("taskDecorator", poolTaskExecutor);
if (Objects.isNull(taskDecorator)) {
diff --git a/spring/src/main/java/org/dromara/dynamictp/spring/annotation/DtpBeanDefinitionRegistrar.java b/spring/src/main/java/org/dromara/dynamictp/spring/annotation/DtpBeanDefinitionRegistrar.java
index c90f62335..d7f43758a 100644
--- a/spring/src/main/java/org/dromara/dynamictp/spring/annotation/DtpBeanDefinitionRegistrar.java
+++ b/spring/src/main/java/org/dromara/dynamictp/spring/annotation/DtpBeanDefinitionRegistrar.java
@@ -21,6 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.collections4.CollectionUtils;
+import org.dromara.dynamictp.common.em.JreEnum;
import org.dromara.dynamictp.common.entity.DtpExecutorProps;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.core.executor.ExecutorType;
@@ -30,6 +31,7 @@
import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor;
import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
import org.dromara.dynamictp.core.support.binder.BinderHelper;
+import org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrappers;
import org.dromara.dynamictp.spring.util.BeanRegistrationUtil;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
@@ -40,25 +42,11 @@
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ThreadFactory;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.ALLOW_CORE_THREAD_TIMEOUT;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.AWAIT_TERMINATION_SECONDS;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.AWARE_NAMES;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.NOTIFY_ENABLED;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.NOTIFY_ITEMS;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLATFORM_IDS;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLUGIN_NAMES;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.PRE_START_ALL_CORE_THREADS;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.QUEUE_TIMEOUT;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.REJECT_ENHANCED;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.REJECT_HANDLER_TYPE;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.RUN_TIMEOUT;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.TASK_WRAPPERS;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.THREAD_POOL_ALIAS_NAME;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.THREAD_POOL_NAME;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRY_INTERRUPT_WHEN_TIMEOUT;
-import static org.dromara.dynamictp.common.constant.DynamicTpConst.WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN;
+import static org.dromara.dynamictp.common.constant.DynamicTpConst.*;
import static org.dromara.dynamictp.common.em.QueueTypeEnum.buildLbq;
import static org.dromara.dynamictp.common.entity.NotifyItem.mergeAllNotifyItems;
@@ -94,7 +82,13 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B
}
Class> executorTypeClass = ExecutorType.getClass(e.getExecutorType());
Map propertyValues = buildPropertyValues(e);
- Object[] args = buildConstructorArgs(executorTypeClass, e);
+ Object[] args;
+ try {
+ args = buildConstructorArgs(executorTypeClass, e);
+ } catch (UnsupportedOperationException exception) {
+ log.warn("DynamicTp virtual thread executor {} register warn: update your JDK version or don't use virtual thread executor!", e.getThreadPoolName());
+ return;
+ }
BeanRegistrationUtil.register(registry, e.getThreadPoolName(), executorTypeClass, propertyValues, args);
});
}
@@ -103,15 +97,19 @@ private Map buildPropertyValues(DtpExecutorProps props) {
Map propertyValues = Maps.newHashMap();
propertyValues.put(THREAD_POOL_NAME, props.getThreadPoolName());
propertyValues.put(THREAD_POOL_ALIAS_NAME, props.getThreadPoolAliasName());
- propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut());
- propertyValues.put(WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN, props.isWaitForTasksToCompleteOnShutdown());
- propertyValues.put(AWAIT_TERMINATION_SECONDS, props.getAwaitTerminationSeconds());
- propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads());
- propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType());
- propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced());
- propertyValues.put(RUN_TIMEOUT, props.getRunTimeout());
- propertyValues.put(TRY_INTERRUPT_WHEN_TIMEOUT, props.isTryInterrupt());
- propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());
+
+ if (!props.getExecutorType().equals(ExecutorType.VIRTUAL.getName())) {
+ propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut());
+ propertyValues.put(WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN, props.isWaitForTasksToCompleteOnShutdown());
+ propertyValues.put(AWAIT_TERMINATION_SECONDS, props.getAwaitTerminationSeconds());
+ propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads());
+ propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType());
+ propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced());
+ propertyValues.put(RUN_TIMEOUT, props.getRunTimeout());
+ propertyValues.put(TRY_INTERRUPT_WHEN_TIMEOUT, props.isTryInterrupt());
+ propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());
+ }
+
val notifyItems = mergeAllNotifyItems(props.getNotifyItems());
propertyValues.put(NOTIFY_ITEMS, notifyItems);
propertyValues.put(PLATFORM_IDS, props.getPlatformIds());
@@ -124,9 +122,17 @@ private Map buildPropertyValues(DtpExecutorProps props) {
return propertyValues;
}
- private Object[] buildConstructorArgs(Class> clazz, DtpExecutorProps props) {
+ private Object[] buildConstructorArgs(Class> clazz, DtpExecutorProps props) throws UnsupportedOperationException {
BlockingQueue taskQueue;
- if (clazz.equals(EagerDtpExecutor.class)) {
+ if (clazz.equals(VirtualThreadExecutorProxy.class)) {
+ if (JreEnum.lessThan(JreEnum.JAVA_21)) {
+ throw new UnsupportedOperationException();
+ }
+ ThreadFactory factory = Thread.ofVirtual().name(props.getThreadPoolName()).factory();
+ return new Object[]{
+ Executors.newThreadPerTaskExecutor(factory)
+ };
+ } else if (clazz.equals(EagerDtpExecutor.class)) {
taskQueue = new TaskQueue(props.getQueueCapacity());
} else if (clazz.equals(PriorityDtpExecutor.class)) {
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator());
diff --git a/starter/starter-adapter/starter-adapter-brpc/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-brpc/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..b34d62555
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-brpc/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.adapter.brpc.autoconfigure.BrpcTpAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-adapter/starter-adapter-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..cdfbe5368
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.adapter.common.autoconfigure.AdapterCommonAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-adapter/starter-adapter-dubbo/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-dubbo/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..031abc5f8
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-dubbo/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1,2 @@
+org.dromara.dynamictp.starter.adapter.dubbo.autoconfigure.ApacheDubboTpAutoConfiguration
+org.dromara.dynamictp.starter.adapter.dubbo.autoconfigure.AlibabaDubboTpAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-adapter/starter-adapter-grpc/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-grpc/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..ffb277cfe
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-grpc/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.adapter.grpc.autoconfigure.GrpcTpAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-adapter/starter-adapter-hystrix/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-hystrix/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..622ef2ece
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-hystrix/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.adapter.hystrix.autoconfigure.HystrixTpAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-adapter/starter-adapter-motan/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-motan/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..242dd55bd
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-motan/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.adapter.motan.autoconfigure.MotanTpAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-adapter/starter-adapter-okhttp3/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-okhttp3/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..bd80ba4ce
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-okhttp3/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.adapter.okhttp3.autoconfigure.Okhttp3TpAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-adapter/starter-adapter-rabbitmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-rabbitmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..6b9d66129
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-rabbitmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.adapter.rabbitmq.autoconfigure.RabbitMqTpAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-adapter/starter-adapter-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..461f00ec2
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1,2 @@
+org.dromara.dynamictp.starter.adapter.rocketmq.autoconfigure.RocketMqTpAutoConfiguration
+org.dromara.dynamictp.starter.adapter.rocketmq.autoconfigure.AliyunOnsRocketMqAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-adapter/starter-adapter-sofa/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-sofa/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..cf7691e23
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-sofa/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.adapter.sofa.autoconfigure.SofaTpAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-adapter/starter-adapter-tars/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-tars/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..1fe8fcf04
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-tars/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.adapter.tars.autoconfigure.TarsTpAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
index e95156d5c..3e3b73892 100644
--- a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
+++ b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
@@ -50,19 +50,33 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Hashtable;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantLock;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
@@ -79,12 +93,15 @@
* necessary size. In addition, the optional {@linkplain #setGrowthResistance(float) growth resistance feature} can
* be used to further govern the thread pool size.
*
+ * Additionally, this thread pool implementation supports scheduling of tasks.
+ * The scheduled tasks will execute on the main pool.
+ *
* New instances of this thread pool are created by constructing and configuring a {@link Builder} instance, and calling
* its {@link Builder#build() build()} method.
*
* @author David M. Lloyd
*/
-public class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements ManageableThreadPoolExecutorService {
+public class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements ManageableThreadPoolExecutorService, ScheduledExecutorService {
private static final Thread[] NO_THREADS = new Thread[0];
static {
@@ -145,10 +162,6 @@ public class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements
*/
public static final boolean DISABLE_HINT = readBooleanPropertyPrefixed("disable", false);
- /**
- * Update the tail pointer opportunistically.
- */
- static final boolean UPDATE_TAIL = readBooleanPropertyPrefixed("update-tail", false);
/**
* Update the summary statistics.
*/
@@ -207,8 +220,21 @@ public class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements
private final Object handle;
/**
* The access control context of the creating thread.
+ * Will be set to null when the MBean is not registered.
+ */
+ private volatile AccessControlContext acc;
+ /**
+ * The context handler for the user-defined context.
+ */
+ private final ContextHandler> contextHandler;
+ /**
+ * The task for scheduled execution.
+ */
+ private final SchedulerTask schedulerTask = new SchedulerTask();
+ /**
+ * The scheduler thread.
*/
- private final AccessControlContext acc;
+ private final Thread schedulerThread;
// =======================================================
// Current state fields
@@ -296,9 +322,6 @@ public class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements
private static final long activeCountOffset;
private static final long peakQueueSizeOffset;
- private static final Object sequenceBase;
- private static final long sequenceOffset;
-
static {
try {
terminationWaitersOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("terminationWaiters"));
@@ -308,9 +331,6 @@ public class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements
peakThreadCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakThreadCount"));
activeCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("activeCount"));
peakQueueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakQueueSize"));
-
- sequenceBase = unsafe.staticFieldBase(EnhancedQueueExecutor.class.getDeclaredField("sequence"));
- sequenceOffset = unsafe.staticFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("sequence"));
} catch (NoSuchFieldException e) {
throw new NoSuchFieldError(e.getMessage());
}
@@ -360,20 +380,23 @@ public class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements
// Constructor
// =======================================================
- static volatile int sequence = 1;
+ static final AtomicInteger sequence = new AtomicInteger(1);
private final String mBeanName;
- protected EnhancedQueueExecutor(final Builder builder) {
+ public EnhancedQueueExecutor(final Builder builder) {
super();
- this.acc = getContext();
int maxSize = builder.getMaximumPoolSize();
int coreSize = min(builder.getCorePoolSize(), maxSize);
this.handoffExecutor = builder.getHandoffExecutor();
this.exceptionHandler = builder.getExceptionHandler();
this.threadFactory = builder.getThreadFactory();
+ this.schedulerThread = threadFactory.newThread(schedulerTask);
+ String schedulerName = this.schedulerThread.getName();
+ this.schedulerThread.setName(schedulerName + " (scheduler)");
this.terminationTask = builder.getTerminationTask();
this.growthResistance = builder.getGrowthResistance();
+ this.contextHandler = builder.getContextHandler();
final Duration keepAliveTime = builder.getKeepAliveTime();
// initial dead node
// thread stat
@@ -383,11 +406,13 @@ protected EnhancedQueueExecutor(final Builder builder) {
mxBean = new MXBeanImpl();
mBeanName = builder.getMBeanName();
if (! DISABLE_MBEAN && builder.isRegisterMBean()) {
+ this.acc = getContext();
final String configuredName = builder.getMBeanName();
- final String finalName = configuredName != null ? configuredName : "threadpool-" + unsafe.getAndAddInt(sequenceBase, sequenceOffset, 1);
+ final String finalName = configuredName != null ? configuredName : "threadpool-" + sequence.getAndIncrement();
handle = doPrivileged(new MBeanRegisterAction(finalName, mxBean), acc);
} else {
handle = null;
+ this.acc = null;
}
}
@@ -433,6 +458,7 @@ public static final class Builder {
private int maxQueueSize = Integer.MAX_VALUE;
private boolean registerMBean = REGISTER_MBEAN;
private String mBeanName;
+ private ContextHandler> contextHandler = ContextHandler.NONE;
/**
* Construct a new instance.
@@ -757,12 +783,49 @@ public Builder setMBeanName(final String mBeanName) {
this.mBeanName = mBeanName;
return this;
}
+
+ /**
+ * Get the context handler for the user-defined context.
+ *
+ * @return the context handler for the user-defined context (not {@code null})
+ */
+ public ContextHandler> getContextHandler() {
+ return contextHandler;
+ }
+
+ /**
+ * Set the context handler for the user-defined context.
+ *
+ * @param contextHandler the context handler for the user-defined context
+ * @return this builder
+ */
+ public Builder setContextHandler(final ContextHandler> contextHandler) {
+ Assert.checkNotNullParam("contextHandler", contextHandler);
+ this.contextHandler = contextHandler;
+ return this;
+ }
}
// =======================================================
// ExecutorService
// =======================================================
+ public ThreadFactory getThreadFactory() {
+ return threadFactory;
+ }
+
+ public MXBeanImpl getMxBean() {
+ return mxBean;
+ }
+
+ public Runnable getTerminationTask() {
+ return terminationTask;
+ }
+
+ public String getMBeanName() {
+ return mBeanName;
+ }
+
/**
* Execute a task.
*
@@ -770,7 +833,7 @@ public Builder setMBeanName(final String mBeanName) {
*/
public void execute(Runnable runnable) {
Assert.checkNotNullParam("runnable", runnable);
- final Runnable realRunnable = JBossExecutors.classLoaderPreservingTaskUnchecked(runnable);
+ final Task realRunnable = new Task(runnable, contextHandler.captureContext());
int result;
result = tryExecute(realRunnable);
boolean ok = false;
@@ -796,22 +859,6 @@ public void execute(Runnable runnable) {
}
}
- public ThreadFactory getThreadFactory() {
- return threadFactory;
- }
-
- public MXBeanImpl getMxBean() {
- return mxBean;
- }
-
- public Runnable getTerminationTask() {
- return terminationTask;
- }
-
- public String getMBeanName() {
- return mBeanName;
- }
-
/**
* Request that shutdown be initiated for this thread pool. This is equivalent to calling
* {@link #shutdown(boolean) shutdown(false)}; see that method for more information.
@@ -834,14 +881,19 @@ public List shutdownNow() {
QNode headNext;
for (;;) {
headNext = head.getNext();
+ if (headNext == head) {
+ // a racing consumer has already consumed it (and moved head)
+ head = this.head;
+ continue;
+ }
if (headNext instanceof TaskNode) {
TaskNode taskNode = (TaskNode) headNext;
if (compareAndSetHead(head, taskNode)) {
- if (! NO_QUEUE_LIMIT) {
- decreaseQueueSize();
- }
+ // save from GC nepotism
+ head.setNextOrdered(head);
+ if (! NO_QUEUE_LIMIT) decreaseQueueSize();
head = taskNode;
- list.add(taskNode.task);
+ list.add(taskNode.task.handoff());
}
// retry
} else {
@@ -884,7 +936,7 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws
Assert.checkNotNullParam("unit", unit);
if (timeout > 0) {
final Thread thread = Thread.currentThread();
- if (runningThreads.contains(thread)) {
+ if (runningThreads.contains(thread) || thread == schedulerThread) {
throw Messages.msg.cannotAwaitWithin();
}
Waiter waiters = this.terminationWaiters;
@@ -911,6 +963,39 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws
return isTerminated();
}
+ // =======================================================
+ // ScheduledExecutorService
+ // =======================================================
+
+ public ScheduledFuture> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+ startScheduleThread();
+ return schedulerTask.schedule(new RunnableScheduledFuture(command, delay, unit));
+ }
+
+ public ScheduledFuture schedule(final Callable callable, final long delay, final TimeUnit unit) {
+ startScheduleThread();
+ return schedulerTask.schedule(new CallableScheduledFuture(callable, delay, unit));
+ }
+
+ public ScheduledFuture> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
+ startScheduleThread();
+ return schedulerTask.schedule(new FixedRateRunnableScheduledFuture(command, initialDelay, period, unit));
+ }
+
+ public ScheduledFuture> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
+ startScheduleThread();
+ return schedulerTask.schedule(new FixedDelayRunnableScheduledFuture(command, initialDelay, delay, unit));
+ }
+
+ private void startScheduleThread() {
+ // this should be fairly quick...
+ if (schedulerThread.getState() == Thread.State.NEW) try {
+ schedulerThread.start();
+ } catch (IllegalThreadStateException ignored) {
+ // make sure it's race-proof
+ }
+ }
+
// =======================================================
// Management
// =======================================================
@@ -959,6 +1044,8 @@ public void shutdown(boolean interrupt) {
if (isShutdownRequested(newStatus) != isShutdownRequested(oldStatus)) {
assert ! isShutdownRequested(oldStatus); // because it can only ever be set, not cleared
// we initiated shutdown
+ // terminate the scheduler
+ schedulerTask.shutdown();
// clear out all consumers and append a dummy waiter node
TaskNode tail = this.tail;
QNode tailNext;
@@ -1442,9 +1529,9 @@ public Void run() {
// =======================================================
final class ThreadBody implements Runnable {
- private Runnable initialTask;
+ private Task initialTask;
- ThreadBody(final Runnable initialTask) {
+ ThreadBody(final Task initialTask) {
this.initialTask = initialTask;
}
@@ -1458,7 +1545,7 @@ public void run() {
runningThreads.add(currentThread);
// run the initial task
- doRunTask(getAndClearInitialTask());
+ nullToNop(getAndClearInitialTask()).run();
// Eagerly allocate a PoolThreadNode for the next time it's needed
PoolThreadNode nextPoolThreadNode = new PoolThreadNode(currentThread);
@@ -1468,7 +1555,7 @@ public void run() {
node = getOrAddNode(nextPoolThreadNode);
if (node instanceof TaskNode) {
// task node was removed
- doRunTask(((TaskNode) node).getAndClearTask());
+ ((TaskNode) node).getAndClearTask().run();
continue;
} else if (node == nextPoolThreadNode) {
// pool thread node was added
@@ -1480,11 +1567,11 @@ public void run() {
long elapsed = 0L;
waitingForTask: for (;;) {
Runnable task = newNode.getTask();
- assert task != ACCEPTED && task != GAVE_UP;
+ assert task != ACCEPTED && task != GAVE_UP && task != null;
if (task != WAITING && task != EXIT) {
if (newNode.compareAndSetTask(task, ACCEPTED)) {
// we have a task to run, so run it and then abandon the node
- doRunTask(task);
+ task.run();
// rerun outer
continue processingQueue;
}
@@ -1557,50 +1644,43 @@ private QNode getOrAddNode(PoolThreadNode nextPoolThreadNode) {
for (;;) {
head = EnhancedQueueExecutor.this.head;
headNext = head.getNext();
- if (headNext instanceof TaskNode) {
- TaskNode taskNode = (TaskNode) headNext;
- if (compareAndSetHead(head, taskNode)) {
- if (! NO_QUEUE_LIMIT) decreaseQueueSize();
- return taskNode;
- }
- } else if (headNext instanceof PoolThreadNode || headNext == null) {
- nextPoolThreadNode.setNextRelaxed(headNext);
- if (head.compareAndSetNext(headNext, nextPoolThreadNode)) {
- return nextPoolThreadNode;
+ // headNext == head can happen if another consumer has already consumed head:
+ // retry with a fresh head
+ if (headNext != head) {
+ if (headNext instanceof TaskNode) {
+ TaskNode taskNode = (TaskNode) headNext;
+ if (compareAndSetHead(head, taskNode)) {
+ // save from GC Nepotism: generational GCs don't like
+ // cross-generational references, so better to "clean-up" head::next
+ // to save dragging head::next into the old generation.
+ // Clean-up cannot just null out next
+ head.setNextOrdered(head);
+ if (!NO_QUEUE_LIMIT) decreaseQueueSize();
+ return taskNode;
+ }
+ } else if (headNext instanceof PoolThreadNode || headNext == null) {
+ nextPoolThreadNode.setNextRelaxed(headNext);
+ if (head.compareAndSetNext(headNext, nextPoolThreadNode)) {
+ return nextPoolThreadNode;
+ } else if (headNext != null) {
+ // GC Nepotism:
+ // save dragging headNext into old generation
+ // (although being a PoolThreadNode it won't make a big difference)
+ nextPoolThreadNode.setNextRelaxed(null);
+ }
+ } else {
+ assert headNext instanceof TerminateWaiterNode;
+ return headNext;
}
- } else {
- assert headNext instanceof TerminateWaiterNode;
- return headNext;
}
if (UPDATE_STATISTICS) spinMisses.increment();
- JDKSpecific.onSpinWait();
}
}
private Runnable getAndClearInitialTask() {
- try {
- return initialTask;
- } finally {
- this.initialTask = null;
- }
- }
-
- void doRunTask(final Runnable task) {
- if (task != null) {
- if (isShutdownInterrupt(threadStatus)) {
- Thread.currentThread().interrupt();
- } else {
- Thread.interrupted();
- }
- if (UPDATE_ACTIVE_COUNT) incrementActiveCount();
- safeRun(task);
- if (UPDATE_ACTIVE_COUNT) {
- decrementActiveCount();
- if (UPDATE_STATISTICS) {
- completedTaskCounter.increment();
- }
- }
- }
+ Runnable initial = initialTask;
+ this.initialTask = null;
+ return initial;
}
}
@@ -1703,7 +1783,7 @@ boolean tryDeallocateThread(long oldStat) {
* @return {@code true} if the thread was started, {@code false} otherwise
* @throws RejectedExecutionException if {@code runnable} is not {@code null} and the thread could not be created or started
*/
- boolean doStartThread(Runnable runnable) throws RejectedExecutionException {
+ boolean doStartThread(Task runnable) throws RejectedExecutionException {
Thread thread;
try {
thread = threadFactory.newThread(new ThreadBody(runnable));
@@ -1737,83 +1817,29 @@ boolean doStartThread(Runnable runnable) throws RejectedExecutionException {
// Task submission
// =======================================================
- private int tryExecute(final Runnable runnable) {
+ private int tryExecute(final Task runnable) {
QNode tailNext;
- if (TAIL_LOCK) lockTail();
TaskNode tail = this.tail;
TaskNode node = null;
for (;;) {
tailNext = tail.getNext();
- if (tailNext instanceof TaskNode) {
- TaskNode tailNextTaskNode;
- do {
- if (UPDATE_STATISTICS) spinMisses.increment();
- tailNextTaskNode = (TaskNode) tailNext;
- // retry
- tail = tailNextTaskNode;
- tailNext = tail.getNext();
- } while (tailNext instanceof TaskNode);
- // opportunistically update for the possible benefit of other threads
- if (UPDATE_TAIL) compareAndSetTail(tail, tailNextTaskNode);
- }
- // we've progressed to the first non-task node, as far as we can see
- assert ! (tailNext instanceof TaskNode);
- if (tailNext instanceof PoolThreadNode) {
- final QNode tailNextNext = tailNext.getNext();
- // state change ex1:
- // tail(snapshot).next ← tail(snapshot).next(snapshot).next(snapshot)
- // succeeds: -
- // cannot succeed: sh2
- // preconditions:
- // tail(snapshot) is a dead TaskNode
- // tail(snapshot).next is PoolThreadNode
- // tail(snapshot).next.next* is PoolThreadNode or null
- // additional success postconditions: -
- // failure postconditions: -
- // post-actions (succeed):
- // run state change ex2
- // post-actions (fail):
- // retry with new tail(snapshot)
- if (tail.compareAndSetNext(tailNext, tailNextNext)) {
- assert tail instanceof TaskNode && tail.task == null;
- PoolThreadNode consumerNode = (PoolThreadNode) tailNext;
- // state change ex2:
- // tail(snapshot).next(snapshot).task ← runnable
- // succeeds: ex1
- // preconditions:
- // tail(snapshot).next(snapshot).task = WAITING
- // post-actions (succeed):
- // unpark thread and return
- // post-actions (fail):
- // retry outer with new tail(snapshot)
- if (consumerNode.compareAndSetTask(WAITING, runnable)) {
- if (TAIL_LOCK) unlockTail();
- consumerNode.unpark();
- return EXE_OK;
- }
- // otherwise the consumer gave up or was exited already, so fall out and...
- }
- if (UPDATE_STATISTICS) spinMisses.increment();
- // retry with new tail(snapshot) as was foretold
- tail = this.tail;
+ if (tailNext == tail) {
+ // tail is already consumed, retry with new tail(snapshot)
} else if (tailNext == null) {
// no consumers available; maybe we can start one
int tr = tryAllocateThread(growthResistance);
if (tr == AT_YES) {
- if (TAIL_LOCK) unlockTail();
return EXE_CREATE_THREAD;
}
if (tr == AT_SHUTDOWN) {
- if (TAIL_LOCK) unlockTail();
return EXE_REJECT_SHUTDOWN;
}
assert tr == AT_NO;
// no; try to enqueue
- if (! NO_QUEUE_LIMIT && ! increaseQueueSize()) {
+ if (!NO_QUEUE_LIMIT && !increaseQueueSize()) {
// queue is full
// OK last effort to create a thread, disregarding growth limit
tr = tryAllocateThread(0.0f);
- if (TAIL_LOCK) unlockTail();
if (tr == AT_YES) {
return EXE_CREATE_THREAD;
}
@@ -1841,21 +1867,66 @@ private int tryExecute(final Runnable runnable) {
// try to update tail to the new node; if this CAS fails then tail already points at past the node
// this is because tail can only ever move forward, and the task list is always strongly connected
compareAndSetTail(tail, node);
- if (TAIL_LOCK) unlockTail();
return EXE_OK;
}
// we failed; we have to drop the queue size back down again to compensate before we can retry
- if (! NO_QUEUE_LIMIT) decreaseQueueSize();
- if (UPDATE_STATISTICS) spinMisses.increment();
- // retry with new tail(snapshot)
- tail = this.tail;
+ if (!NO_QUEUE_LIMIT) decreaseQueueSize();
+ } else if (tailNext instanceof PoolThreadNode) {
+ final QNode tailNextNext = tailNext.getNext();
+ // state change ex1:
+ // tail(snapshot).next ← tail(snapshot).next(snapshot).next(snapshot)
+ // succeeds: -
+ // cannot succeed: sh2
+ // preconditions:
+ // tail(snapshot) is a dead TaskNode
+ // tail(snapshot).next is PoolThreadNode
+ // tail(snapshot).next.next* is PoolThreadNode or null
+ // additional success postconditions: -
+ // failure postconditions: -
+ // post-actions (succeed):
+ // run state change ex2
+ // post-actions (fail):
+ // retry with new tail(snapshot)
+ if (tail.compareAndSetNext(tailNext, tailNextNext)) {
+ assert tail instanceof TaskNode;
+ PoolThreadNode consumerNode = (PoolThreadNode) tailNext;
+ // state change ex2:
+ // tail(snapshot).next(snapshot).task ← runnable
+ // succeeds: ex1
+ // preconditions:
+ // tail(snapshot).next(snapshot).task = WAITING
+ // post-actions (succeed):
+ // unpark thread and return
+ // post-actions (fail):
+ // retry outer with new tail(snapshot)
+ if (consumerNode.compareAndSetTask(WAITING, runnable)) {
+ // GC Nepotism:
+ // We can save consumerNode::next from being dragged into
+ // old generation, if possible
+ consumerNode.compareAndSetNext(tailNextNext, null);
+ consumerNode.unpark();
+ return EXE_OK;
+ }
+ // otherwise the consumer gave up or was exited already, so fall out and...
+ }
+ } else if (tailNext instanceof TaskNode) {
+ TaskNode tailNextTaskNode = (TaskNode) tailNext;
+ // Opportunistically update tail to the next node. If this operation has been handled by
+ // another thread we fall back to the loop and try again instead of duplicating effort.
+ if (compareAndSetTail(tail, tailNextTaskNode)) {
+ tail = tailNextTaskNode;
+ // bypass the on-spin-miss path because we've updated the next task node to tailNextTaskNode.
+ continue;
+ }
} else {
- if (TAIL_LOCK) unlockTail();
// no consumers are waiting and the tail(snapshot).next node is non-null and not a task node, therefore it must be a...
assert tailNext instanceof TerminateWaiterNode;
// shutting down
return EXE_REJECT_SHUTDOWN;
}
+ // retry with new tail(snapshot)
+ if (UPDATE_STATISTICS) spinMisses.increment();
+ tail = this.tail;
}
// not reached
}
@@ -1866,21 +1937,41 @@ private int tryExecute(final Runnable runnable) {
void completeTermination() {
// be kind and un-interrupt the thread for the termination task
- Thread.interrupted();
- final Runnable terminationTask = this.terminationTask;
- this.terminationTask = null;
- safeRun(terminationTask);
- // notify all waiters
- Waiter waiters = getAndSetTerminationWaiters(TERMINATE_COMPLETE_WAITER);
- while (waiters != null) {
- unpark(waiters.getThread());
- waiters = waiters.getNext();
- }
- tail.setNext(TERMINATE_COMPLETE);
- if (! DISABLE_MBEAN) {
- final Object handle = this.handle;
- if (handle != null) {
- doPrivileged(new MBeanUnregisterAction(handle), acc);
+ boolean intr = Thread.interrupted();
+ try {
+ final Runnable terminationTask = JBossExecutors.classLoaderPreservingTask(this.terminationTask);
+ this.terminationTask = null;
+ try {
+ terminationTask.run();
+ } catch (Throwable t) {
+ try {
+ exceptionHandler.uncaughtException(Thread.currentThread(), t);
+ } catch (Throwable ignored) {
+ // nothing else we can safely do here
+ }
+ }
+ // notify all waiters
+ Waiter waiters = getAndSetTerminationWaiters(TERMINATE_COMPLETE_WAITER);
+ while (waiters != null) {
+ unpark(waiters.getThread());
+ waiters = waiters.getNext();
+ }
+ tail.setNext(TERMINATE_COMPLETE);
+ if (!DISABLE_MBEAN) {
+ //The check for DISABLE_MBEAN is redundant as acc would be null,
+ //but GraalVM needs the hint so to not make JMX reachable.
+ if (this.acc != null) {
+ final Object handle = this.handle;
+ if (handle != null) {
+ intr = intr || Thread.interrupted();
+ doPrivileged(new MBeanUnregisterAction(handle), acc);
+ }
+ this.acc = null;
+ }
+ }
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
}
}
}
@@ -2049,61 +2140,46 @@ static boolean isAllowCoreTimeout(final long oldVal) {
// Utilities
// =======================================================
- void safeRun(final Runnable task) {
- if (task == null) return;
- final Thread currentThread = Thread.currentThread();
- JBossExecutors.clearContextClassLoader(currentThread);
- try {
- task.run();
- } catch (Throwable t) {
- try {
- exceptionHandler.uncaughtException(Thread.currentThread(), t);
- } catch (Throwable ignored) {
- // nothing else we can safely do here
- }
- } finally {
- JBossExecutors.clearContextClassLoader(currentThread);
- // clear interrupt status
- Thread.interrupted();
- }
- }
-
- void rejectException(final Runnable task, final Throwable cause) {
+ void rejectException(final Task task, final Throwable cause) {
try {
- handoffExecutor.execute(task);
+ handoffExecutor.execute(task.handoff());
} catch (Throwable t) {
t.addSuppressed(cause);
throw t;
}
}
- void rejectNoThread(final Runnable task) {
+ void rejectNoThread(final Task task) {
try {
- handoffExecutor.execute(task);
+ handoffExecutor.execute(task.handoff());
} catch (Throwable t) {
t.addSuppressed(new RejectedExecutionException("No threads available"));
throw t;
}
}
- void rejectQueueFull(final Runnable task) {
+ void rejectQueueFull(final Task task) {
try {
- handoffExecutor.execute(task);
+ handoffExecutor.execute(task.handoff());
} catch (Throwable t) {
t.addSuppressed(new RejectedExecutionException("Queue is full"));
throw t;
}
}
- void rejectShutdown(final Runnable task) {
+ void rejectShutdown(final Task task) {
try {
- handoffExecutor.execute(task);
+ handoffExecutor.execute(task.handoff());
} catch (Throwable t) {
t.addSuppressed(new RejectedExecutionException("Executor is being shut down"));
throw t;
}
}
+ static Runnable nullToNop(final Runnable task) {
+ return task == null ? NullRunnable.getInstance() : task;
+ }
+
// =======================================================
// Node classes
// =======================================================
@@ -2137,6 +2213,10 @@ void setNext(final QNode node) {
void setNextRelaxed(final QNode node) {
unsafe.putObject(this, nextOffset, node);
}
+
+ void setNextOrdered(final QNode node) {
+ unsafe.putOrderedObject(this, nextOffset, node);
+ }
}
/** Padding between PoolThreadNode task and parked fields and QNode.next */
@@ -2290,19 +2370,17 @@ static final class TerminateWaiterNode extends QNode {
}
static final class TaskNode extends QNode {
- volatile Runnable task;
+ Task task;
- TaskNode(final Runnable task) {
+ TaskNode(final Task task) {
// we always start task nodes with a {@code null} next
this.task = task;
}
Runnable getAndClearTask() {
- try {
- return task;
- } finally {
- this.task = null;
- }
+ Runnable result = task;
+ task = null;
+ return result;
}
}
@@ -2438,4 +2516,908 @@ public long getSpinMissCount() {
return EnhancedQueueExecutor.this.spinMisses.longValue();
}
}
+
+ // =======================================================
+ // Basic task wrapper
+ // =======================================================
+
+ final class Task implements Runnable {
+
+ private final Runnable delegate;
+ private final ClassLoader contextClassLoader;
+ private final Object context;
+
+ Task(Runnable delegate, final Object context) {
+ Assert.checkNotNullParam("delegate", delegate);
+ this.delegate = delegate;
+ this.context = context;
+ this.contextClassLoader = JBossExecutors.getContextClassLoader(Thread.currentThread());
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public void run() {
+ if (isShutdownInterrupt(threadStatus)) {
+ Thread.currentThread().interrupt();
+ } else {
+ Thread.interrupted();
+ }
+ if (UPDATE_ACTIVE_COUNT) incrementActiveCount();
+ final Thread currentThread = Thread.currentThread();
+ final ClassLoader old = JBossExecutors.getAndSetContextClassLoader(currentThread, contextClassLoader);
+ try {
+ ((ContextHandler)contextHandler).runWith(delegate, context);
+ } catch (Throwable t) {
+ try {
+ exceptionHandler.uncaughtException(Thread.currentThread(), t);
+ } catch (Throwable ignored) {
+ // nothing else we can safely do here
+ }
+ } finally {
+ JBossExecutors.setContextClassLoader(currentThread, old);
+ }
+ Thread.interrupted();
+ if (UPDATE_ACTIVE_COUNT) {
+ decrementActiveCount();
+ if (UPDATE_STATISTICS) {
+ completedTaskCounter.increment();
+ }
+ }
+ }
+
+ /**
+ * Extracts the original runnable without EQE-specific state updating. This runnable does retain the original
+ * context classloader from the submitting thread.
+ */
+ Runnable handoff() {
+ return new ContextClassLoaderSavingRunnable(contextClassLoader, delegate);
+ }
+
+ @Override
+ public String toString() {
+ return "Task{delegate=" + delegate + ", contextClassLoader=" + contextClassLoader + '}';
+ }
+ }
+
+ // =======================================================
+ // Scheduled future tasks
+ // =======================================================
+
+ static final int ASF_ST_WAITING = 0;
+ static final int ASF_ST_CANCELLED = 1;
+ static final int ASF_ST_SUBMITTED = 2;
+ static final int ASF_ST_RUNNING = 3;
+ static final int ASF_ST_FINISHED = 4;
+ static final int ASF_ST_FAILED = 5;
+ static final int ASF_ST_REJECTED = 6;
+
+ static final AbstractScheduledFuture>[] NO_FUTURES = new AbstractScheduledFuture>[0];
+
+ static final AtomicLong SCHEDULED_TASK_SEQ = new AtomicLong();
+
+ /**
+ * An implementation of {@link ScheduledFuture} which is wrapped by {@link Task}.
+ *
+ * @param the result type
+ */
+ abstract class AbstractScheduledFuture implements ScheduledFuture, Runnable {
+ final long seq = SCHEDULED_TASK_SEQ.getAndIncrement();
+ /**
+ * The task which is wrapping this one.
+ */
+ final Task wrappingTask;
+ /**
+ * The scheduled time for this task, in nanoseconds since the scheduler thread was born.
+ * Can be mutated in subclasses if the task is recurring, but only under lock.
+ */
+ volatile long when;
+ /**
+ * The state of this task; one of {@code ASF_ST_*}.
+ */
+ volatile int state = ASF_ST_WAITING;
+ /**
+ * The actual result; only valid in {@code ASF_ST_FINISHED} (where it is of type {@code V}),
+ * or in {@code ASF_ST_FAILED} (where it is of type {@code Throwable}),
+ * or in {@code ASF_ST_REJECTED} (where it is of type {@code RejectedExecutionException}).
+ */
+ volatile Object result;
+ /**
+ * The thread which is currently live for this task.
+ */
+ Thread liveThread;
+
+ AbstractScheduledFuture(long delay, TimeUnit unit) {
+ when = Math.addExact(schedulerTask.age(), unit.toNanos(delay));
+ wrappingTask = new Task(this, contextHandler.captureContext());
+ }
+
+ public int compareTo(final Delayed o) {
+ return o instanceof AbstractScheduledFuture> ? compareTo((AbstractScheduledFuture>) o) : wrongType();
+ }
+
+ public int compareTo(final AbstractScheduledFuture> other) {
+ int cmp = Long.compare(when, other.when);
+ if (cmp == 0) cmp = Long.compare(seq, other.seq);
+ return cmp;
+ }
+
+ public long getDelay(final TimeUnit unit) {
+ return unit.convert(Math.max(0, when - schedulerTask.age()), TimeUnit.NANOSECONDS);
+ }
+
+ public boolean isCancelled() {
+ return state == ASF_ST_CANCELLED;
+ }
+
+ public boolean isDone() {
+ int state = this.state;
+ return state == ASF_ST_FINISHED || state == ASF_ST_FAILED || state == ASF_ST_CANCELLED || state == ASF_ST_REJECTED;
+ }
+
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ int state;
+ synchronized (this) {
+ state = this.state;
+ switch (state) {
+ case ASF_ST_WAITING:
+ case ASF_ST_SUBMITTED: {
+ this.state = ASF_ST_CANCELLED;
+ return true;
+ }
+ case ASF_ST_RUNNING: {
+ if (mayInterruptIfRunning) {
+ liveThread.interrupt();
+ }
+ return false;
+ }
+ case ASF_ST_CANCELLED: {
+ return true;
+ }
+ default: {
+ return false;
+ }
+ }
+ }
+ }
+
+ public V get() throws InterruptedException, ExecutionException {
+ int state;
+ synchronized (this) {
+ for (;;) {
+ state = this.state;
+ switch (state) {
+ case ASF_ST_WAITING:
+ case ASF_ST_SUBMITTED:
+ case ASF_ST_RUNNING: {
+ wait();
+ break;
+ }
+ case ASF_ST_CANCELLED: {
+ throw new CancellationException("Task was cancelled");
+ }
+ case ASF_ST_REJECTED: {
+ throw new ExecutionException("Task failed due to rejection", (RejectedExecutionException) result);
+ }
+ case ASF_ST_FAILED: {
+ throw new ExecutionException((Throwable) result);
+ }
+ case ASF_ST_FINISHED: {
+ //noinspection unchecked
+ return (V) result;
+ }
+ }
+ }
+ }
+ }
+
+ public V get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long remaining = unit.toNanos(timeout);
+ long start = System.nanoTime();
+ int state;
+ synchronized (this) {
+ for (;;) {
+ state = this.state;
+ switch (state) {
+ case ASF_ST_WAITING:
+ case ASF_ST_SUBMITTED:
+ case ASF_ST_RUNNING: {
+ if (remaining <= 0) {
+ throw new TimeoutException();
+ }
+ wait(remaining / 1_000_000L, (int) (remaining % 1_000_000));
+ break;
+ }
+ case ASF_ST_CANCELLED: {
+ throw new CancellationException("Task was cancelled");
+ }
+ case ASF_ST_REJECTED: {
+ throw new ExecutionException("Task failed due to rejection", (RejectedExecutionException) result);
+ }
+ case ASF_ST_FAILED: {
+ throw new ExecutionException((Throwable) result);
+ }
+ case ASF_ST_FINISHED: {
+ //noinspection unchecked
+ return (V) result;
+ }
+ }
+ long newStart = System.nanoTime();
+ long elapsed = newStart - start;
+ remaining -= elapsed;
+ start = newStart;
+ }
+ }
+ }
+
+ public void run() {
+ stateTest: synchronized (this) {
+ switch (state) {
+ case ASF_ST_SUBMITTED: {
+ this.state = ASF_ST_RUNNING;
+ liveThread = currentThread();
+ break stateTest;
+ }
+ case ASF_ST_CANCELLED: {
+ // cancelled after submit but before it was run
+ return;
+ }
+ case ASF_ST_FAILED:
+ case ASF_ST_REJECTED: {
+ // a recurring task terminated abruptly, but was still found in the schedule
+ return;
+ }
+ default: {
+ // invalid state
+ fail(badState());
+ return;
+ }
+ }
+ }
+ try {
+ finish(performTask());
+ } catch (Throwable t) {
+ fail(t);
+ }
+ }
+
+ void submit() {
+ synchronized (this) {
+ stateTest: switch (state) {
+ case ASF_ST_WAITING: {
+ this.state = ASF_ST_SUBMITTED;
+ //noinspection UnnecessaryLabelOnBreakStatement
+ break stateTest;
+ }
+ case ASF_ST_CANCELLED: {
+ // do not actually submit
+ return;
+ }
+ case ASF_ST_FAILED:
+ case ASF_ST_REJECTED: {
+ // a recurring task terminated abruptly, but was still found in the schedule
+ return;
+ }
+ default: {
+ // invalid state
+ fail(badState());
+ return;
+ }
+ }
+ }
+ try {
+ /* copied from {@link #execute(Runnable)} */
+ int result = tryExecute(wrappingTask);
+ boolean ok = false;
+ if (result == EXE_OK) {
+ // last check to ensure that there is at least one existent thread to avoid rare thread timeout race condition
+ if (currentSizeOf(threadStatus) == 0 && tryAllocateThread(0.0f) == AT_YES && ! doStartThread(null)) {
+ deallocateThread();
+ }
+ if (UPDATE_STATISTICS) submittedTaskCounter.increment();
+ return;
+ } else if (result == EXE_CREATE_THREAD) try {
+ ok = doStartThread(wrappingTask);
+ } finally {
+ if (! ok) deallocateThread();
+ } else {
+ if (UPDATE_STATISTICS) rejectedTaskCounter.increment();
+ if (result == EXE_REJECT_SHUTDOWN) {
+ rejectShutdown(wrappingTask);
+ } else {
+ assert result == EXE_REJECT_QUEUE_FULL;
+ rejectQueueFull(wrappingTask);
+ }
+ }
+ } catch (RejectedExecutionException e) {
+ reject(e);
+ } catch (Throwable t) {
+ reject(new RejectedExecutionException("Task submission failed", t));
+ }
+ }
+
+ IllegalStateException badState() {
+ return new IllegalStateException("Task was not in expected state");
+ }
+
+ void reject(RejectedExecutionException e) {
+ synchronized (this) {
+ switch (state) {
+ case ASF_ST_SUBMITTED: {
+ result = e;
+ this.state = ASF_ST_REJECTED;
+ liveThread = null;
+ notifyAll();
+ return;
+ }
+ default: {
+ // invalid state
+ fail(badState());
+ return;
+ }
+ }
+ }
+ }
+
+ void fail(Throwable t) {
+ synchronized (this) {
+ switch (state) {
+ case ASF_ST_WAITING:
+ case ASF_ST_SUBMITTED:
+ case ASF_ST_RUNNING: {
+ result = t;
+ this.state = ASF_ST_FAILED;
+ liveThread = null;
+ notifyAll();
+ return;
+ }
+ case ASF_ST_CANCELLED:
+ case ASF_ST_FINISHED:
+ case ASF_ST_FAILED:
+ case ASF_ST_REJECTED: {
+ // ignore the failure, though we're likely in an invalid state
+ return;
+ }
+ }
+ }
+ }
+
+ void finish(V result) {
+ // overridden in subclasses where the task repeats
+ synchronized (this) {
+ switch (state) {
+ case ASF_ST_RUNNING: {
+ this.result = result;
+ this.state = ASF_ST_FINISHED;
+ liveThread = null;
+ notifyAll();
+ return;
+ }
+ default: {
+ // invalid state
+ fail(badState());
+ return;
+ }
+ }
+ }
+ }
+
+ abstract V performTask() throws Exception;
+
+ public String toString() {
+ return toString(new StringBuilder()).toString();
+ }
+
+ StringBuilder toString(StringBuilder b) {
+ return b.append("future result of ");
+ }
+ }
+
+ static int wrongType() throws ClassCastException {
+ throw new ClassCastException("Wrong task type for comparison");
+ }
+
+ final class RunnableScheduledFuture extends AbstractScheduledFuture {
+ final Runnable runnable;
+
+ RunnableScheduledFuture(final Runnable runnable, final long delay, final TimeUnit unit) {
+ super(delay, unit);
+ this.runnable = runnable;
+ }
+
+ Void performTask() {
+ runnable.run();
+ return null;
+ }
+
+ StringBuilder toString(final StringBuilder b) {
+ return super.toString(b).append(runnable);
+ }
+ }
+
+ final class CallableScheduledFuture extends AbstractScheduledFuture {
+ final Callable callable;
+
+ CallableScheduledFuture(final Callable callable, final long delay, final TimeUnit unit) {
+ super(delay, unit);
+ this.callable = callable;
+ }
+
+ V performTask() throws Exception {
+ return callable.call();
+ }
+
+ StringBuilder toString(final StringBuilder b) {
+ return super.toString(b).append(callable);
+ }
+ }
+
+ abstract class RepeatingScheduledFuture extends AbstractScheduledFuture {
+ final long period;
+
+ RepeatingScheduledFuture(final long delay, final long period, final TimeUnit unit) {
+ super(delay, unit);
+ this.period = unit.toNanos(period);
+ }
+
+ /**
+ * Adjust the time of this future for resubmission, after the task has run successfully.
+ */
+ abstract void adjustTime();
+
+ public void run() {
+ super.run();
+ // if an exception is thrown, we will have failed already anyway
+ adjustTime();
+ synchronized (this) {
+ switch (state) {
+ case ASF_ST_RUNNING: {
+ state = ASF_ST_WAITING;
+ schedulerTask.schedule(this);
+ return;
+ }
+ default: {
+ // in all other cases, we failed so the task should not be rescheduled
+ return;
+ }
+ }
+ }
+ }
+
+ void finish(final V result) {
+ // repeating tasks never actually finish
+ }
+
+ StringBuilder toString(final StringBuilder b) {
+ return super.toString(b.append("repeating "));
+ }
+ }
+
+ final class FixedRateRunnableScheduledFuture extends RepeatingScheduledFuture {
+ final Runnable runnable;
+
+ FixedRateRunnableScheduledFuture(final Runnable runnable, final long delay, final long period, final TimeUnit unit) {
+ super(delay, period, unit);
+ this.runnable = runnable;
+ }
+
+ void adjustTime() {
+ // if this results in a time in the past, the next run will happen immediately
+ this.when += period;
+ }
+
+ Void performTask() {
+ runnable.run();
+ return null;
+ }
+
+ StringBuilder toString(final StringBuilder b) {
+ return super.toString(b).append(runnable);
+ }
+ }
+
+ final class FixedDelayRunnableScheduledFuture extends RepeatingScheduledFuture {
+ final Runnable runnable;
+
+ FixedDelayRunnableScheduledFuture(final Runnable runnable, final long delay, final long period, final TimeUnit unit) {
+ super(delay, period, unit);
+ this.runnable = runnable;
+ }
+
+ void adjustTime() {
+ this.when = schedulerTask.age() + period;
+ }
+
+ Void performTask() {
+ runnable.run();
+ return null;
+ }
+
+ StringBuilder toString(final StringBuilder b) {
+ return super.toString(b).append(runnable);
+ }
+ }
+
+ // =======================================================
+ // Scheduler task thread worker
+ // =======================================================
+
+ final class SchedulerTask implements Runnable {
+ final long startMark = System.nanoTime();
+ final ReentrantLock ql = new ReentrantLock();
+ final Condition qc = ql.newCondition();
+ // todo: switch to array queue on a more optimistic day
+ // protected by {@link #ql}
+ ScheduledFutureQueue q = new TreeSetQueue();
+ boolean shutdownDetected;
+
+ void shutdown() {
+ ql.lock();
+ try {
+ shutdownDetected = true;
+ qc.signal();
+ } finally {
+ ql.unlock();
+ }
+ }
+
+ public void run() {
+ ScheduledFutureQueue q = this.q;
+ AbstractScheduledFuture>[] remainingFutures;
+ AbstractScheduledFuture> first;
+ long startMark = this.startMark;
+ outerLoop: for (;;) {
+ ql.lock();
+ try {
+ innerLoop: for (;;) {
+ long now = System.nanoTime();
+ if (shutdownDetected) {
+ // drop all tasks and return
+ remainingFutures = q.toArray();
+ q.clear();
+ break outerLoop;
+ } else if (q.isEmpty()) try {
+ qc.await();
+ } catch (InterruptedException ignored) {
+ // clear interrupt status
+ continue innerLoop;
+ } else {
+ first = q.first();
+ long firstWhen = first.when;
+ long currentWhen = max(0, now - startMark);
+ if (firstWhen <= currentWhen) {
+ // it's ready; run it outside of the lock
+ q.pollFirst();
+ //noinspection UnnecessaryLabelOnBreakStatement
+ break innerLoop;
+ } else {
+ long waitTime = firstWhen - currentWhen;
+ try {
+ qc.awaitNanos(waitTime);
+ } catch (InterruptedException e) {
+ // clear interrupt status
+ continue innerLoop;
+ }
+ }
+ }
+ }
+ } finally {
+ ql.unlock();
+ }
+ // outside of lock; `break innerLoop` goes ↓ here
+ first.submit();
+ // continue loop to find the next task
+ }
+ // ↓ `break outerLoop` goes here ↓
+ if (remainingFutures.length > 0) {
+ for (AbstractScheduledFuture> future : remainingFutures) {
+ future.cancel(true);
+ }
+ }
+ return;
+ }
+
+ > F schedule(final F item) {
+ Task wrappingTask = item.wrappingTask;
+ if (item.when <= age()) {
+ // just submit it now
+ item.submit();
+ return item;
+ }
+ ql.lock();
+ try {
+ if (shutdownDetected) {
+ rejectShutdown(wrappingTask);
+ return item;
+ }
+ // check to see if we need to wake up the scheduler
+ boolean first;
+ for (;;) try {
+ first = q.insertAndCheckForFirst(item);
+ break;
+ } catch (QueueFullException ignored) {
+ q = q.grow();
+ }
+ if (first) {
+ // the delay time has changed, so wake up the waiter
+ qc.signal();
+ }
+ return item;
+ } finally {
+ ql.unlock();
+ }
+ }
+
+ long age() {
+ return System.nanoTime() - startMark;
+ }
+ }
+
+ // =======================================================
+ // Schedule queue API & implementations
+ // =======================================================
+
+ interface ScheduledFutureQueue {
+ AbstractScheduledFuture>[] toArray();
+
+ void clear();
+
+ boolean isEmpty();
+
+ int size();
+
+ AbstractScheduledFuture> first();
+
+ @SuppressWarnings("UnusedReturnValue") // must match signature for TreeSet
+ AbstractScheduledFuture> pollFirst();
+
+ /**
+ * Insert the item in order, checking to see if it was added as the first item.
+ *
+ * @param item the item to insert (must not be {@code null})
+ * @return {@code true} if the item is first, {@code false} otherwise
+ * @throws QueueFullException if the queue is full; it must be recreated in this case
+ */
+ boolean insertAndCheckForFirst(AbstractScheduledFuture> item) throws QueueFullException;
+
+ /**
+ * Get a new queue with the same contents as this one, but with a larger capacity.
+ *
+ * @return the grown queue
+ */
+ ScheduledFutureQueue grow();
+ }
+
+ static final class ArrayQueue implements ScheduledFutureQueue {
+ final AbstractScheduledFuture>[] array;
+ // the removal point (lowest+least index)
+ int head;
+ // the number of elements
+ int size;
+
+ ArrayQueue(int capacity) {
+ // next power of two
+ capacity = Integer.highestOneBit(Math.max(capacity, 2) - 1) << 1;
+ array = new AbstractScheduledFuture>[capacity];
+ }
+
+ private ArrayQueue(final ArrayQueue original, final int newCapacity) {
+ assert Integer.bitCount(newCapacity) == 1;
+ array = original.toArray(newCapacity);
+ head = 0;
+ size = original.size;
+ }
+
+ public AbstractScheduledFuture>[] toArray() {
+ return toArray(size());
+ }
+
+ public AbstractScheduledFuture>[] toArray(int size) {
+ int head = this.head;
+ int end = head + size;
+ AbstractScheduledFuture>[] copy = Arrays.copyOfRange(array, head, end);
+ if (end > array.length) {
+ // copy the wrapped elements
+ System.arraycopy(array, 0, copy, size - (array.length - head), size - array.length);
+ }
+ return copy;
+ }
+
+ public void clear() {
+ Arrays.fill(array, null);
+ head = size = 0;
+ }
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public AbstractScheduledFuture> first() {
+ if (size == 0) {
+ throw new NoSuchElementException();
+ }
+ return array[head];
+ }
+
+ public AbstractScheduledFuture> pollFirst() {
+ if (size == 0) {
+ throw new NoSuchElementException();
+ }
+ int head = this.head;
+ AbstractScheduledFuture> item = array[head];
+ array[head] = null;
+ this.size --;
+ int mask = array.length - 1;
+ this.head = head + 1 & mask;
+ return item;
+ }
+
+ public boolean insertAndCheckForFirst(final AbstractScheduledFuture> item) {
+ // find the insertion point
+ int size = this.size;
+ AbstractScheduledFuture>[] array = this.array;
+ int arrayLen = array.length;
+ if (size == arrayLen) {
+ throw new QueueFullException();
+ }
+ int mask = arrayLen - 1;
+ int idx = 0;
+ int high = size - 1;
+ // at this point and onwards, there is definitely space in the array
+ int head = this.head;
+
+ while (idx <= high) {
+ int mid = (idx + high) >>> 1;
+ AbstractScheduledFuture> testVal = array[head + mid & mask];
+ int cmp = testVal.compareTo(item);
+ if (cmp < 0) {
+ idx = mid + 1;
+ } else if (cmp > 0) {
+ high = mid - 1;
+ } else {
+ // we found this task already present in the queue (should never happen)
+ return false;
+ }
+ }
+
+ return insertAt(idx, item);
+ }
+
+ /**
+ * Move all elements starting at the given index forward to make space at that position, wrapping if needed.
+ *
+ * @param idx the element index relative to {@code head} to open up
+ */
+ void moveForward(final int idx, final AbstractScheduledFuture> storeVal) {
+ AbstractScheduledFuture>[] array = this.array;
+ int size = this.size;
+ int moveCnt = size - idx;
+ int arrayLength = array.length;
+ int mask = arrayLength - 1;
+ int head = this.head;
+ int start = head + idx;
+ // TODO:
+ // - change this to three calls to System.arraycopy
+ // - one for the already-wrapped portion
+ // - one for the portion that is being newly wrapped
+ // - one for the leading (pre-wrap) portion
+ for (int i = moveCnt - 1; i >= 0; i --) {
+ int pos = start + i;
+ array[pos + 1 & mask] = array[pos & mask];
+ }
+ array[start & mask] = storeVal;
+ }
+
+ /**
+ * Move all elements starting before the given index backward to make space at that position, wrapping if needed.
+ *
+ * @param idx the element index relative to {@code head} to open up
+ */
+ void moveBackward(final int idx, final AbstractScheduledFuture> storeVal) {
+ AbstractScheduledFuture>[] array = this.array;
+ int size = this.size;
+ int moveCnt = size - idx + 1;
+ int arrayLength = array.length;
+ int mask = arrayLength - 1;
+ int head = this.head;
+ int start = head + idx - 1;
+ // TODO:
+ // - change this to three calls to System.arraycopy
+ // - one for the leading (pre-wrap) portion
+ // - one for the portion that is being newly de-wrapped
+ // - one for the already-wrapped portion
+ for (int i = moveCnt - 1; i >= 0; i --) {
+ int pos = start - i;
+ array[pos - 1 & mask] = array[pos & mask];
+ }
+ array[start & mask] = storeVal;
+ this.head = head - 1 & mask;
+ }
+
+ boolean insertAt(final int idx, final AbstractScheduledFuture> item) {
+ // this is a separate method for easier testing of the arraycopy algebraic mayhem
+ int size = this.size;
+ // no matter what, we're growing by one
+ this.size = size + 1;
+ int halfSize = size + 1 >> 1;
+ if (idx >= halfSize) {
+ moveForward(idx, item);
+ } else {
+ moveBackward(idx, item);
+ }
+ return idx == 0;
+ }
+
+ public ScheduledFutureQueue grow() {
+ // todo: calibrate this threshold
+ if (array.length >= 256) {
+ return new TreeSetQueue(this);
+ } else {
+ return new ArrayQueue(this, array.length << 1);
+ }
+ }
+
+ // test points for white-box unit tests
+
+ int testPoint_arrayLength() {
+ return array.length;
+ }
+
+ int testPoint_head() {
+ return head;
+ }
+
+ void testPoint_setHead(int newHead) {
+ head = newHead;
+ }
+
+ void testPoint_setSize(int newSize) {
+ size = newSize;
+ }
+
+ AbstractScheduledFuture> testPoint_getArrayItem(int index) {
+ return array[index & array.length - 1];
+ }
+
+ AbstractScheduledFuture> testPoint_setArrayItem(int index, AbstractScheduledFuture> item) {
+ try {
+ return array[index & array.length - 1];
+ } finally {
+ array[index & array.length - 1] = item;
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ static class TreeSetQueue extends TreeSet> implements ScheduledFutureQueue {
+ TreeSetQueue(final ScheduledFutureQueue original) {
+ Collections.addAll(this, original.toArray());
+ }
+
+ TreeSetQueue() {
+ }
+
+ public AbstractScheduledFuture>[] toArray() {
+ return super.toArray(NO_FUTURES);
+ }
+
+ public boolean insertAndCheckForFirst(final AbstractScheduledFuture> item) {
+ add(item);
+ return item == first();
+ }
+
+ public ScheduledFutureQueue grow() {
+ return this;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ static final class QueueFullException extends RuntimeException {
+ QueueFullException() {
+ super(null, null, false, false);
+ }
+ }
}
diff --git a/starter/starter-adapter/starter-adapter-webserver/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-adapter/starter-adapter-webserver/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..64e62adc1
--- /dev/null
+++ b/starter/starter-adapter/starter-adapter-webserver/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.adapter.webserver.autocconfigure.WebServerTpAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-common/pom.xml b/starter/starter-common/pom.xml
index 74d2d4545..33eb477e2 100644
--- a/starter/starter-common/pom.xml
+++ b/starter/starter-common/pom.xml
@@ -12,11 +12,6 @@
dynamic-tp-spring-boot-starter-common
-
- org.dromara.dynamictp
- dynamic-tp-core
-
-
org.springframework.boot
spring-boot-starter
diff --git a/starter/starter-common/src/main/java/org/dromara/dynamictp/starter/common/monitor/DtpEndpoint.java b/starter/starter-common/src/main/java/org/dromara/dynamictp/starter/common/monitor/DtpEndpoint.java
index b1b508d57..d66d5be63 100644
--- a/starter/starter-common/src/main/java/org/dromara/dynamictp/starter/common/monitor/DtpEndpoint.java
+++ b/starter/starter-common/src/main/java/org/dromara/dynamictp/starter/common/monitor/DtpEndpoint.java
@@ -23,11 +23,11 @@
import org.apache.commons.collections4.MapUtils;
import org.dromara.dynamictp.common.entity.JvmStats;
import org.dromara.dynamictp.common.entity.Metrics;
+import org.dromara.dynamictp.common.manager.ContextManagerHelper;
import org.dromara.dynamictp.core.DtpRegistry;
+import org.dromara.dynamictp.core.aware.MetricsAware;
import org.dromara.dynamictp.core.converter.ExecutorConverter;
-import org.dromara.dynamictp.common.manager.ContextManagerHelper;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
-import org.dromara.dynamictp.core.aware.MetricsAware;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
@@ -53,7 +53,7 @@ public List invoke() {
val handlerMap = ContextManagerHelper.getBeansOfType(MetricsAware.class);
if (MapUtils.isNotEmpty(handlerMap)) {
- handlerMap.forEach((k, v) -> metricsList.addAll(v.getMultiPoolStats()));
+ handlerMap.forEach((k, v) -> metricsList.addAll(v.getMultiExecutorStats()));
}
JvmStats jvmStats = new JvmStats();
Runtime runtime = Runtime.getRuntime();
diff --git a/starter/starter-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..82a0a770e
--- /dev/null
+++ b/starter/starter-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.common.DtpBootBeanConfiguration
\ No newline at end of file
diff --git a/starter/starter-configcenter/cloud-starter-consul/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-configcenter/cloud-starter-consul/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..a4b7c498a
--- /dev/null
+++ b/starter/starter-configcenter/cloud-starter-consul/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.cloud.consul.autoconfigure.DtpConsulAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-configcenter/cloud-starter-huawei/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-configcenter/cloud-starter-huawei/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..79948cf07
--- /dev/null
+++ b/starter/starter-configcenter/cloud-starter-huawei/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.cloud.huawei.autoconfigure.DtpHuaweiAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-configcenter/cloud-starter-nacos/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-configcenter/cloud-starter-nacos/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..022bfbf1d
--- /dev/null
+++ b/starter/starter-configcenter/cloud-starter-nacos/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.cloud.nacos.autoconfigure.DtpCloudNacosAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-configcenter/cloud-starter-polaris/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-configcenter/cloud-starter-polaris/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..ae81e8cd4
--- /dev/null
+++ b/starter/starter-configcenter/cloud-starter-polaris/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.cloud.polaris.autoconfigure.DtpPolarisAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-configcenter/cloud-starter-zookeeper/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-configcenter/cloud-starter-zookeeper/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..2c9093224
--- /dev/null
+++ b/starter/starter-configcenter/cloud-starter-zookeeper/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.cloud.zookeeper.autoconfigure.DtpCloudZkAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-configcenter/starter-apollo/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-configcenter/starter-apollo/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..6ee139d9f
--- /dev/null
+++ b/starter/starter-configcenter/starter-apollo/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.apollo.autoconfigure.DtpApolloAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-configcenter/starter-etcd/pom.xml b/starter/starter-configcenter/starter-etcd/pom.xml
index de05a1aaa..5f2120871 100644
--- a/starter/starter-configcenter/starter-etcd/pom.xml
+++ b/starter/starter-configcenter/starter-etcd/pom.xml
@@ -13,8 +13,8 @@
dynamic-tp-spring-boot-starter-etcd
- 8
- 8
+ 17
+ 17
@@ -22,7 +22,6 @@
io.etcd
jetcd-core
0.5.5
- provided
true
diff --git a/starter/starter-configcenter/starter-etcd/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-configcenter/starter-etcd/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..ea053abf7
--- /dev/null
+++ b/starter/starter-configcenter/starter-etcd/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.etcd.autoconfigure.DtpEtcdAutoConfiguration
diff --git a/starter/starter-configcenter/starter-nacos/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-configcenter/starter-nacos/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..62464668e
--- /dev/null
+++ b/starter/starter-configcenter/starter-nacos/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.nacos.autoconfigure.DtpNacosAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-configcenter/starter-zookeeper/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/starter/starter-configcenter/starter-zookeeper/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..1ed37e12a
--- /dev/null
+++ b/starter/starter-configcenter/starter-zookeeper/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.dynamictp.starter.zookeeper.autoconfigure.DtpZkAutoConfiguration
\ No newline at end of file
diff --git a/starter/starter-extension/starter-extension-limiter-redis/pom.xml b/starter/starter-extension/starter-extension-limiter-redis/pom.xml
index 1c26ef2a2..73ca8ce51 100644
--- a/starter/starter-extension/starter-extension-limiter-redis/pom.xml
+++ b/starter/starter-extension/starter-extension-limiter-redis/pom.xml
@@ -15,6 +15,11 @@
org.dromara.dynamictp
dynamic-tp-extension-limiter-redis