diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f6e984d06..f326e0229 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -14,6 +14,7 @@ name: Eclipse Grizzly NIO CI on: pull_request: + push: jobs: build: @@ -22,13 +23,13 @@ jobs: strategy: matrix: - java_version: [ 11 ] + java_version: [ 21 ] steps: - name: Checkout for build uses: actions/checkout@v3 - name: Set up JDK - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: distribution: 'temurin' java-version: ${{ matrix.java_version }} @@ -47,13 +48,13 @@ jobs: strategy: matrix: - java_version: [ 11 ] + java_version: [ 21 ] steps: - name: Checkout for build uses: actions/checkout@v3 - name: Set up JDK - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: distribution: 'temurin' java-version: ${{ matrix.java_version }} diff --git a/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java b/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java new file mode 100644 index 000000000..f98b91703 --- /dev/null +++ b/modules/grizzly/src/main/java/org/glassfish/grizzly/threadpool/VirtualThreadExecutorService.java @@ -0,0 +1,125 @@ +package org.glassfish.grizzly.threadpool; + +import org.glassfish.grizzly.Grizzly; +import org.glassfish.grizzly.localization.LogMessages; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author mazhen + */ +public class VirtualThreadExecutorService extends AbstractExecutorService implements Thread.UncaughtExceptionHandler { + + private static final Logger logger = Grizzly.logger(VirtualThreadExecutorService.class); + + private final ExecutorService internalExecutorService; + private final Semaphore poolSemaphore; + private final Semaphore queueSemaphore; + + public static VirtualThreadExecutorService createInstance() { + return createInstance(ThreadPoolConfig.defaultConfig().setMaxPoolSize(-1).setPoolName("Grizzly-virt-")); + } + + public static VirtualThreadExecutorService createInstance(ThreadPoolConfig cfg) { + Objects.requireNonNull(cfg); + return new VirtualThreadExecutorService(cfg); + } + + protected VirtualThreadExecutorService(ThreadPoolConfig cfg) { + internalExecutorService = Executors.newThreadPerTaskExecutor(getThreadFactory(cfg)); + + int poolSizeLimit = cfg.getMaxPoolSize() > 0 ? cfg.getMaxPoolSize() : Integer.MAX_VALUE; + int queueLimit = cfg.getQueueLimit() >= 0 ? cfg.getQueueLimit() : Integer.MAX_VALUE; + // Check for integer overflow + long totalLimit = (long) poolSizeLimit + (long) queueLimit; + if (totalLimit > Integer.MAX_VALUE) { + // Handle the overflow case + queueSemaphore = new Semaphore(Integer.MAX_VALUE, true); + } else { + queueSemaphore = new Semaphore((int) totalLimit, true); + } + poolSemaphore = new Semaphore(poolSizeLimit, true); + } + + private ThreadFactory getThreadFactory(ThreadPoolConfig threadPoolConfig) { + + var prefix = threadPoolConfig.getPoolName() + "-"; + + // virtual threads factory + final ThreadFactory factory = Thread.ofVirtual() + .name(prefix, 0L) + .uncaughtExceptionHandler(this) + .factory(); + + return r -> { + Thread thread = factory.newThread(r); + final ClassLoader initial = threadPoolConfig.getInitialClassLoader(); + if (initial != null) { + thread.setContextClassLoader(initial); + } + return thread; + }; + } + + @Override + public void shutdown() { + internalExecutorService.shutdown(); + } + + @Override + public List shutdownNow() { + return internalExecutorService.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return internalExecutorService.isShutdown(); + } + + @Override + public boolean isTerminated() { + return internalExecutorService.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return internalExecutorService.awaitTermination(timeout, unit); + } + + @Override + public void execute(Runnable command) { + if (!queueSemaphore.tryAcquire()) { + throw new RejectedExecutionException("Too Many Concurrent Requests"); + } + + internalExecutorService.execute(() -> { + try { + poolSemaphore.acquire(); + try { + command.run(); + } finally { + poolSemaphore.release(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + queueSemaphore.release(); + } + }); + } + + @Override + public void uncaughtException(Thread thread, Throwable throwable) { + logger.log(Level.WARNING, LogMessages.WARNING_GRIZZLY_THREADPOOL_UNCAUGHT_EXCEPTION(thread), throwable); + } +} diff --git a/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java b/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java new file mode 100644 index 000000000..e45fa9c8c --- /dev/null +++ b/modules/grizzly/src/test/java/org/glassfish/grizzly/VirtualThreadExecutorServiceTest.java @@ -0,0 +1,87 @@ +package org.glassfish.grizzly; + +import org.glassfish.grizzly.threadpool.ThreadPoolConfig; +import org.glassfish.grizzly.threadpool.VirtualThreadExecutorService; +import org.junit.Assert; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class VirtualThreadExecutorServiceTest extends GrizzlyTestCase { + + public void testCreateInstance() throws Exception { + + VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(); + final int tasks = 2000000; + doTest(r, tasks); + } + + public void testAwaitTermination() throws Exception { + VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(); + final int tasks = 2000; + doTest(r, tasks); + r.shutdown(); + assertTrue(r.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(r.isTerminated()); + } + + public void testQueueLimit() throws Exception { + int maxPoolSize = 20; + int queueLimit = 10; + int queue = maxPoolSize + queueLimit; + ThreadPoolConfig config = ThreadPoolConfig.defaultConfig() + .setMaxPoolSize(maxPoolSize) + .setQueueLimit(queueLimit); + VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(config); + + CyclicBarrier start = new CyclicBarrier(maxPoolSize + 1); + CyclicBarrier hold = new CyclicBarrier(maxPoolSize + 1); + AtomicInteger result = new AtomicInteger(); + for (int i = 0; i < maxPoolSize; i++) { + int taskId = i; + r.execute(() -> { + try { + System.out.println("task " + taskId + " is running"); + start.await(); + hold.await(); + result.getAndIncrement(); + System.out.println("task " + taskId + " is completed"); + } catch (Exception e) { + } + }); + } + start.await(); + for (int i = maxPoolSize; i < queue; i++) { + int taskId = i; + r.execute(() -> { + try { + result.getAndIncrement(); + System.out.println("task " + taskId + " is completed"); + } catch (Exception e) { + } + }); + } + // Too Many Concurrent Requests + Assert.assertThrows(RejectedExecutionException.class, () -> r.execute(() -> System.out.println("cannot be executed"))); + hold.await(); + while (true) { + if (result.intValue() == queue) { + System.out.println("All tasks have been completed."); + break; + } + } + // The executor can accept new tasks + doTest(r, queue); + } + + private void doTest(VirtualThreadExecutorService r, int tasks) throws Exception { + final CountDownLatch cl = new CountDownLatch(tasks); + while (tasks-- > 0) { + r.execute(() -> cl.countDown()); + } + assertTrue("latch timed out", cl.await(30, TimeUnit.SECONDS)); + } +} diff --git a/pom.xml b/pom.xml index 979dfd721..aebef09df 100644 --- a/pom.xml +++ b/pom.xml @@ -118,6 +118,7 @@ + 21 UTF-8 6.0.0 1.0.0 @@ -196,8 +197,9 @@ maven-compiler-plugin 3.9.0 - 11 - 11 + ${java.version} + ${java.version} + ${java.version} -Xlint:unchecked,deprecation,fallthrough,finally,cast,dep-ann,empty,overrides @@ -224,7 +226,7 @@ - [11,) + [21,) 3.6.3