Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce VirtualThreadExecutorService for Virtual Threads Support #2185

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ name: Eclipse Grizzly NIO CI

on:
pull_request:
push:

jobs:
build:
Expand All @@ -22,13 +23,13 @@ jobs:

strategy:
matrix:
java_version: [ 11 ]
java_version: [ 21 ]

steps:
- name: Checkout for build
uses: actions/checkout@v3
- name: Set up JDK
uses: actions/setup-java@v2
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: ${{ matrix.java_version }}
Expand All @@ -47,13 +48,13 @@ jobs:

strategy:
matrix:
java_version: [ 11 ]
java_version: [ 21 ]

steps:
- name: Checkout for build
uses: actions/checkout@v3
- name: Set up JDK
uses: actions/setup-java@v2
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: ${{ matrix.java_version }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package org.glassfish.grizzly.threadpool;

import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.localization.LogMessages;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* @author mazhen
*/
public class VirtualThreadExecutorService extends AbstractExecutorService implements Thread.UncaughtExceptionHandler {

private static final Logger logger = Grizzly.logger(VirtualThreadExecutorService.class);

private final ExecutorService internalExecutorService;
private final Semaphore poolSemaphore;
private final Semaphore queueSemaphore;

public static VirtualThreadExecutorService createInstance() {
return createInstance(ThreadPoolConfig.defaultConfig().setMaxPoolSize(-1).setPoolName("Grizzly-virt-"));
}

public static VirtualThreadExecutorService createInstance(ThreadPoolConfig cfg) {
Objects.requireNonNull(cfg);
return new VirtualThreadExecutorService(cfg);
}

protected VirtualThreadExecutorService(ThreadPoolConfig cfg) {
internalExecutorService = Executors.newThreadPerTaskExecutor(getThreadFactory(cfg));

int poolSizeLimit = cfg.getMaxPoolSize() > 0 ? cfg.getMaxPoolSize() : Integer.MAX_VALUE;
int queueLimit = cfg.getQueueLimit() >= 0 ? cfg.getQueueLimit() : Integer.MAX_VALUE;
// Check for integer overflow
long totalLimit = (long) poolSizeLimit + (long) queueLimit;
if (totalLimit > Integer.MAX_VALUE) {
// Handle the overflow case
queueSemaphore = new Semaphore(Integer.MAX_VALUE, true);
} else {
queueSemaphore = new Semaphore((int) totalLimit, true);
}
poolSemaphore = new Semaphore(poolSizeLimit, true);
}

private ThreadFactory getThreadFactory(ThreadPoolConfig threadPoolConfig) {

var prefix = threadPoolConfig.getPoolName() + "-";

// virtual threads factory
final ThreadFactory factory = Thread.ofVirtual()
.name(prefix, 0L)
.uncaughtExceptionHandler(this)
.factory();

return r -> {
Thread thread = factory.newThread(r);
final ClassLoader initial = threadPoolConfig.getInitialClassLoader();
if (initial != null) {
thread.setContextClassLoader(initial);
}
return thread;
};
}

@Override
public void shutdown() {
internalExecutorService.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return internalExecutorService.shutdownNow();
}

@Override
public boolean isShutdown() {
return internalExecutorService.isShutdown();
}

@Override
public boolean isTerminated() {
return internalExecutorService.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return internalExecutorService.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
if (!queueSemaphore.tryAcquire()) {
throw new RejectedExecutionException("Too Many Concurrent Requests");
}

internalExecutorService.execute(() -> {
try {
poolSemaphore.acquire();
try {
command.run();
} finally {
poolSemaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
queueSemaphore.release();
}
});
}

@Override
public void uncaughtException(Thread thread, Throwable throwable) {
logger.log(Level.WARNING, LogMessages.WARNING_GRIZZLY_THREADPOOL_UNCAUGHT_EXCEPTION(thread), throwable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.glassfish.grizzly;

import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
import org.glassfish.grizzly.threadpool.VirtualThreadExecutorService;
import org.junit.Assert;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class VirtualThreadExecutorServiceTest extends GrizzlyTestCase {

public void testCreateInstance() throws Exception {

VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance();
final int tasks = 2000000;
doTest(r, tasks);
}

public void testAwaitTermination() throws Exception {
VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance();
final int tasks = 2000;
doTest(r, tasks);
r.shutdown();
assertTrue(r.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(r.isTerminated());
}

public void testQueueLimit() throws Exception {
int maxPoolSize = 20;
int queueLimit = 10;
int queue = maxPoolSize + queueLimit;
ThreadPoolConfig config = ThreadPoolConfig.defaultConfig()
.setMaxPoolSize(maxPoolSize)
.setQueueLimit(queueLimit);
VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(config);

CyclicBarrier start = new CyclicBarrier(maxPoolSize + 1);
CyclicBarrier hold = new CyclicBarrier(maxPoolSize + 1);
AtomicInteger result = new AtomicInteger();
for (int i = 0; i < maxPoolSize; i++) {
int taskId = i;
r.execute(() -> {
try {
System.out.println("task " + taskId + " is running");
start.await();
hold.await();
result.getAndIncrement();
System.out.println("task " + taskId + " is completed");
} catch (Exception e) {
}
});
}
start.await();
for (int i = maxPoolSize; i < queue; i++) {
int taskId = i;
r.execute(() -> {
try {
result.getAndIncrement();
System.out.println("task " + taskId + " is completed");
} catch (Exception e) {
}
});
}
// Too Many Concurrent Requests
Assert.assertThrows(RejectedExecutionException.class, () -> r.execute(() -> System.out.println("cannot be executed")));
hold.await();
while (true) {
if (result.intValue() == queue) {
System.out.println("All tasks have been completed.");
break;
}
}
// The executor can accept new tasks
doTest(r, queue);
}

private void doTest(VirtualThreadExecutorService r, int tasks) throws Exception {
final CountDownLatch cl = new CountDownLatch(tasks);
while (tasks-- > 0) {
r.execute(() -> cl.countDown());
}
assertTrue("latch timed out", cl.await(30, TimeUnit.SECONDS));
}
}
8 changes: 5 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
</issueManagement>

<properties>
<java.version>21</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<servlet-version>6.0.0</servlet-version>
<maven-plugin.version>1.0.0</maven-plugin.version>
Expand Down Expand Up @@ -196,8 +197,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.9.0</version>
<configuration>
<source>11</source>
<target>11</target>
<source>${java.version}</source>
<target>${java.version}</target>
<release>${java.version}</release>
<compilerArgument>-Xlint:unchecked,deprecation,fallthrough,finally,cast,dep-ann,empty,overrides</compilerArgument>
</configuration>
</plugin>
Expand All @@ -224,7 +226,7 @@
<configuration>
<rules>
<requireJavaVersion>
<version>[11,)</version>
<version>[21,)</version>
</requireJavaVersion>
<requireMavenVersion>
<version>3.6.3</version>
Expand Down