Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Nov 21, 2023
1 parent 80a3357 commit daaaee5
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 199 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<dependency>
<groupId>com.github.rockylomo</groupId>
<artifactId>rxlib</artifactId>
<version>2.19.9</version>
<version>2.19.11</version>
</dependency>
```
### FEATURE
Expand Down
166 changes: 1 addition & 165 deletions rxlib/src/main/java/org/rx/core/ForkJoinPoolWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@
import net.bytebuddy.matcher.ElementMatchers;
import org.rx.exception.InvalidException;

import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.function.Function;

@Slf4j
public class ForkJoinPoolWrapper extends ForkJoinPool {
public class ForkJoinPoolWrapper {
static class TaskAdvice {
@Advice.OnMethodEnter
public static void enter(@Advice.AllArguments(readOnly = false, typing = Assigner.Typing.DYNAMIC) Object[] arguments) throws Throwable {
Expand Down Expand Up @@ -132,166 +130,4 @@ static <T> ForkJoinTask<T> wrap(ForkJoinTask<T> task) {
}
});
}

final ForkJoinPool delegate;

//ForkJoinPool.externalPush
ForkJoinPoolWrapper() {
delegate = ForkJoinPool.commonPool();
}

@Override
public <T> T invoke(ForkJoinTask<T> task) {
return delegate.invoke(wrap(task));
}

@Override
public void execute(ForkJoinTask<?> task) {
delegate.execute(wrap(task));
}

@Override
public void execute(Runnable task) {
delegate.execute(wrap(task));
}

@Override
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
return delegate.submit(wrap(task));
}

@Override
public <T> ForkJoinTask<T> submit(Callable<T> task) {
return delegate.submit(wrap(task));
}

@Override
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
return delegate.submit(wrap(task), result);
}

@Override
public ForkJoinTask<?> submit(Runnable task) {
return delegate.submit(wrap(task));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
return delegate.invokeAll(Linq.from(tasks).select(p -> wrap(p)).toList());
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(Linq.from(tasks).select(p -> wrap(p)).toList());
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(Linq.from(tasks).select(p -> wrap(p)).toList(), timeout, unit);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return delegate.invokeAll(Linq.from(tasks).select(p -> wrap(p)).toList(), timeout, unit);
}

@Override
public ForkJoinWorkerThreadFactory getFactory() {
return delegate.getFactory();
}

@Override
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
return delegate.getUncaughtExceptionHandler();
}

@Override
public int getParallelism() {
return delegate.getParallelism();
}

@Override
public int getPoolSize() {
return delegate.getPoolSize();
}

@Override
public boolean getAsyncMode() {
return delegate.getAsyncMode();
}

@Override
public int getRunningThreadCount() {
return delegate.getRunningThreadCount();
}

@Override
public int getActiveThreadCount() {
return delegate.getActiveThreadCount();
}

@Override
public boolean isQuiescent() {
return delegate.isQuiescent();
}

@Override
public long getStealCount() {
return delegate.getStealCount();
}

@Override
public long getQueuedTaskCount() {
return delegate.getQueuedTaskCount();
}

@Override
public int getQueuedSubmissionCount() {
return delegate.getQueuedSubmissionCount();
}

@Override
public boolean hasQueuedSubmissions() {
return delegate.hasQueuedSubmissions();
}

@Override
public String toString() {
return delegate.toString();
}

@Override
public void shutdown() {
delegate.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

@Override
public boolean isTerminating() {
return delegate.isTerminating();
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}

@Override
public boolean awaitQuiescence(long timeout, TimeUnit unit) {
return delegate.awaitQuiescence(timeout, unit);
}
}
2 changes: 1 addition & 1 deletion rxlib/src/main/java/org/rx/core/Sys.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ static Object visitJson(Object cur, String path, AtomicInteger i, char c, String
Map<String, ?> obj = (Map<String, ?>) cur;
cur = obj.get(visitor);
} else if (cur instanceof Iterable) {
// System.out.println(cur);
//ignore
} else {
try {
cur = Reflects.readField(cur, visitor);
Expand Down
4 changes: 1 addition & 3 deletions rxlib/src/main/java/org/rx/core/Tasks.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public final class Tasks {
//Random load balance, if methodA wait methodA, methodA is executing wait and methodB is in ThreadPoolQueue, then there will be a false death.
static final List<ThreadPool> nodes = new CopyOnWriteArrayList<>();
static final ExecutorService executor;
// static final ForkJoinPoolWrapper forkJoinPool;
static final WheelTimer timer;
static final Queue<Action> shutdownActions = new ConcurrentLinkedQueue<>();
static int poolCount;
Expand Down Expand Up @@ -78,7 +77,6 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
return shutdown;
}
};
// forkJoinPool = new ForkJoinPoolWrapper();
timer = new WheelTimer(executor);

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand All @@ -94,7 +92,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE

try {
Reflects.writeStaticField(CompletableFuture.class, "asyncPool", executor); //jdk8
// Reflects.writeStaticField(ForkJoinPool.class, "common", forkJoinPool);
// ForkJoinPoolWrapper.transform();
} catch (Throwable e) {
try {
Reflects.writeStaticField(CompletableFuture.class, "ASYNC_POOL", executor); //jdk11
Expand Down
84 changes: 55 additions & 29 deletions rxlib/src/test/java/org/rx/core/TestCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void threadPool() {
}
List<Future<Integer>> futures = pool.runAll(tasks, 0);
for (Future<Integer> future : futures) {
System.out.println(future.get());
log.info("runAll get {}", future.get());
}

ThreadPool.MultiTaskFuture<Integer, Integer> anyMf = pool.runAnyAsync(tasks);
Expand Down Expand Up @@ -188,7 +188,8 @@ public void threadPoolAutosize() {
@Test
public void inheritThreadLocal() {
Class.forName(Tasks.class.getName());
//线程trace,支持异步trace包括Executor(ThreadPool), ScheduledExecutorService(WheelTimer), CompletableFuture.xxAsync()系列方法。
ForkJoinPoolWrapper.transform();
//线程trace,支持异步trace包括Executor(ThreadPool), ScheduledExecutorService(WheelTimer), CompletableFuture.xxAsync(), parallelStream()系列方法。
RxConfig.INSTANCE.getThreadPool().setTraceName("rx-traceId");
ThreadPool.traceIdGenerator = () -> UUID.randomUUID().toString().replace("-", "");
ThreadPool.onTraceIdChanged.combine((s, e) -> MDC.put("rx-traceId", e.getValue()));
Expand Down Expand Up @@ -230,42 +231,67 @@ public void inheritThreadLocal() {
// sleep(4000);

//CompletableFuture.xxAsync异步方法正确获取trace
ThreadPool.startTrace(null);
CompletableFuture<Void> cf1 = null, cf2 = null;
for (int i = 0; i < 2; i++) {
int finalI = i;
cf1 = pool.runAsync(() -> {
log.info("TRACE ASYNC-1 {}", finalI);
pool.runAsync(() -> {
log.info("TRACE ASYNC-1_1 {}", finalI);
sleep(oneSecond);
}).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-1_1 uni {}", r));
sleep(oneSecond);
}).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-1 uni {}", r));
log.info("TRACE ASYNC MAIN {}", finalI);
cf2 = pool.runAsync(() -> {
log.info("TRACE ASYNC-2 {}", finalI);
sleep(oneSecond);
}).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-2 uni {}", r));
}
//CompletableFuture.allOf
// pool.runAllAsync(org.rx.core.Arrays.toList(f1, f2)).getFuture().get(5, TimeUnit.SECONDS);
log.info("allOf start");
CompletableFuture.allOf(cf1, cf2).whenCompleteAsync((r, e) -> {
log.info("TRACE ALL-OF {}", r);
}).get(10, TimeUnit.SECONDS);
log.info("allOf end");
// ThreadPool.startTrace(null);
// for (int i = 0; i < 2; i++) {
// int finalI = i;
// CompletableFuture<Void> cf1 = pool.runAsync(() -> {
// log.info("TRACE ASYNC-1 {}", finalI);
// pool.runAsync(() -> {
// log.info("TRACE ASYNC-1_1 {}", finalI);
// sleep(oneSecond);
// }).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-1_1 uni {}", r));
// sleep(oneSecond);
// }).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-1 uni {}", r));
// log.info("TRACE ASYNC MAIN {}", finalI);
// CompletableFuture<Void> cf2 = pool.runAsync(() -> {
// log.info("TRACE ASYNC-2 {}", finalI);
// sleep(oneSecond);
// }).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-2 uni {}", r));
// }
// ThreadPool.endTrace();
sleep(5000);
//
// ThreadPool.startTrace(null);
// log.info("TRACE ALL_OF start");
// CompletableFuture.allOf(pool.runAsync(() -> {
// log.info("TRACE ALL_OF ASYNC-1");
// pool.runAsync(() -> {
// log.info("TRACE ALL_OF ASYNC-1_1");
// sleep(oneSecond);
// }).whenCompleteAsync((r, e) -> log.info("TRACE ALL_OF ASYNC-1_1 uni {}", r));
// sleep(oneSecond);
// }).whenCompleteAsync((r, e) -> log.info("TRACE ALL_OF ASYNC-1 uni {}", r)), pool.runAsync(() -> {
// log.info("TRACE ALL_OF ASYNC-2");
// sleep(oneSecond);
// }).whenCompleteAsync((r, e) -> log.info("TRACE ALL_OF ASYNC-2 uni {}", r))).whenCompleteAsync((r, e) -> {
// log.info("TRACE ALL-OF {}", r);
// }).get(10, TimeUnit.SECONDS);
// log.info("TRACE ALL_OF end");
// ThreadPool.endTrace();
// sleep(5000);

//parallelStream
// ThreadPool.startTrace(null);
ThreadPool.startTrace(null);
Arrays.toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).parallelStream().map(p -> {
//todo
Arrays.toList("a", "b", "c").parallelStream().map(x -> {
log.info("parallelStream {} -> {}", p, x);
return x.toString();
}).collect(Collectors.toList());
log.info("parallelStream {}", p);
return p.toString();
}).collect(Collectors.toList());
ThreadPool.endTrace();

//timer
ThreadPool.startTrace(null);
Tasks.timer().setTimeout(() -> {
log.info("TIMER 1");
pool.run(() -> {
log.info("TIMER 2");
});
}, d -> d > 5000 ? -1 : Math.max(d * 2, 1000), null, TimeoutFlag.PERIOD.flags());
ThreadPool.endTrace();
sleep(8000);
//
// //netty FastThreadLocal 支持继承
// FastThreadLocal<Integer> ftl = new FastThreadLocal<>();
Expand Down

0 comments on commit daaaee5

Please sign in to comment.