From daaaee5ea6377524a6c9b96d3d8aa188bb7bc7f1 Mon Sep 17 00:00:00 2001 From: wxm <115806199+youfanx@users.noreply.github.com> Date: Tue, 21 Nov 2023 17:25:07 +0800 Subject: [PATCH] up --- README.md | 2 +- .../java/org/rx/core/ForkJoinPoolWrapper.java | 166 +----------------- rxlib/src/main/java/org/rx/core/Sys.java | 2 +- rxlib/src/main/java/org/rx/core/Tasks.java | 4 +- rxlib/src/test/java/org/rx/core/TestCore.java | 84 ++++++--- 5 files changed, 59 insertions(+), 199 deletions(-) diff --git a/README.md b/README.md index fb4d162a..0a1fd19b 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ com.github.rockylomo rxlib - 2.19.9 + 2.19.11 ``` ### FEATURE diff --git a/rxlib/src/main/java/org/rx/core/ForkJoinPoolWrapper.java b/rxlib/src/main/java/org/rx/core/ForkJoinPoolWrapper.java index 93064130..cae0cf27 100644 --- a/rxlib/src/main/java/org/rx/core/ForkJoinPoolWrapper.java +++ b/rxlib/src/main/java/org/rx/core/ForkJoinPoolWrapper.java @@ -9,14 +9,12 @@ import net.bytebuddy.matcher.ElementMatchers; import org.rx.exception.InvalidException; -import java.util.Collection; -import java.util.List; import java.util.Properties; import java.util.concurrent.*; import java.util.function.Function; @Slf4j -public class ForkJoinPoolWrapper extends ForkJoinPool { +public class ForkJoinPoolWrapper { static class TaskAdvice { @Advice.OnMethodEnter public static void enter(@Advice.AllArguments(readOnly = false, typing = Assigner.Typing.DYNAMIC) Object[] arguments) throws Throwable { @@ -132,166 +130,4 @@ static ForkJoinTask wrap(ForkJoinTask task) { } }); } - - final ForkJoinPool delegate; - - //ForkJoinPool.externalPush - ForkJoinPoolWrapper() { - delegate = ForkJoinPool.commonPool(); - } - - @Override - public T invoke(ForkJoinTask task) { - return delegate.invoke(wrap(task)); - } - - @Override - public void execute(ForkJoinTask task) { - delegate.execute(wrap(task)); - } - - @Override - public void execute(Runnable task) { - delegate.execute(wrap(task)); - } - - @Override - public ForkJoinTask submit(ForkJoinTask task) { - return delegate.submit(wrap(task)); - } - - @Override - public ForkJoinTask submit(Callable task) { - return delegate.submit(wrap(task)); - } - - @Override - public ForkJoinTask submit(Runnable task, T result) { - return delegate.submit(wrap(task), result); - } - - @Override - public ForkJoinTask submit(Runnable task) { - return delegate.submit(wrap(task)); - } - - @Override - public List> invokeAll(Collection> tasks) { - return delegate.invokeAll(Linq.from(tasks).select(p -> wrap(p)).toList()); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return delegate.invokeAny(Linq.from(tasks).select(p -> wrap(p)).toList()); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return delegate.invokeAny(Linq.from(tasks).select(p -> wrap(p)).toList(), timeout, unit); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return delegate.invokeAll(Linq.from(tasks).select(p -> wrap(p)).toList(), timeout, unit); - } - - @Override - public ForkJoinWorkerThreadFactory getFactory() { - return delegate.getFactory(); - } - - @Override - public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { - return delegate.getUncaughtExceptionHandler(); - } - - @Override - public int getParallelism() { - return delegate.getParallelism(); - } - - @Override - public int getPoolSize() { - return delegate.getPoolSize(); - } - - @Override - public boolean getAsyncMode() { - return delegate.getAsyncMode(); - } - - @Override - public int getRunningThreadCount() { - return delegate.getRunningThreadCount(); - } - - @Override - public int getActiveThreadCount() { - return delegate.getActiveThreadCount(); - } - - @Override - public boolean isQuiescent() { - return delegate.isQuiescent(); - } - - @Override - public long getStealCount() { - return delegate.getStealCount(); - } - - @Override - public long getQueuedTaskCount() { - return delegate.getQueuedTaskCount(); - } - - @Override - public int getQueuedSubmissionCount() { - return delegate.getQueuedSubmissionCount(); - } - - @Override - public boolean hasQueuedSubmissions() { - return delegate.hasQueuedSubmissions(); - } - - @Override - public String toString() { - return delegate.toString(); - } - - @Override - public void shutdown() { - delegate.shutdown(); - } - - @Override - public List shutdownNow() { - return delegate.shutdownNow(); - } - - @Override - public boolean isTerminated() { - return delegate.isTerminated(); - } - - @Override - public boolean isTerminating() { - return delegate.isTerminating(); - } - - @Override - public boolean isShutdown() { - return delegate.isShutdown(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return delegate.awaitTermination(timeout, unit); - } - - @Override - public boolean awaitQuiescence(long timeout, TimeUnit unit) { - return delegate.awaitQuiescence(timeout, unit); - } } diff --git a/rxlib/src/main/java/org/rx/core/Sys.java b/rxlib/src/main/java/org/rx/core/Sys.java index ecf0714b..28cd5277 100644 --- a/rxlib/src/main/java/org/rx/core/Sys.java +++ b/rxlib/src/main/java/org/rx/core/Sys.java @@ -560,7 +560,7 @@ static Object visitJson(Object cur, String path, AtomicInteger i, char c, String Map obj = (Map) cur; cur = obj.get(visitor); } else if (cur instanceof Iterable) { -// System.out.println(cur); + //ignore } else { try { cur = Reflects.readField(cur, visitor); diff --git a/rxlib/src/main/java/org/rx/core/Tasks.java b/rxlib/src/main/java/org/rx/core/Tasks.java index 13d68ee7..7495dd64 100644 --- a/rxlib/src/main/java/org/rx/core/Tasks.java +++ b/rxlib/src/main/java/org/rx/core/Tasks.java @@ -26,7 +26,6 @@ public final class Tasks { //Random load balance, if methodA wait methodA, methodA is executing wait and methodB is in ThreadPoolQueue, then there will be a false death. static final List nodes = new CopyOnWriteArrayList<>(); static final ExecutorService executor; - // static final ForkJoinPoolWrapper forkJoinPool; static final WheelTimer timer; static final Queue shutdownActions = new ConcurrentLinkedQueue<>(); static int poolCount; @@ -78,7 +77,6 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE return shutdown; } }; -// forkJoinPool = new ForkJoinPoolWrapper(); timer = new WheelTimer(executor); Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -94,7 +92,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE try { Reflects.writeStaticField(CompletableFuture.class, "asyncPool", executor); //jdk8 -// Reflects.writeStaticField(ForkJoinPool.class, "common", forkJoinPool); +// ForkJoinPoolWrapper.transform(); } catch (Throwable e) { try { Reflects.writeStaticField(CompletableFuture.class, "ASYNC_POOL", executor); //jdk11 diff --git a/rxlib/src/test/java/org/rx/core/TestCore.java b/rxlib/src/test/java/org/rx/core/TestCore.java index 68acb5a9..aecc9bab 100644 --- a/rxlib/src/test/java/org/rx/core/TestCore.java +++ b/rxlib/src/test/java/org/rx/core/TestCore.java @@ -141,7 +141,7 @@ public void threadPool() { } List> futures = pool.runAll(tasks, 0); for (Future future : futures) { - System.out.println(future.get()); + log.info("runAll get {}", future.get()); } ThreadPool.MultiTaskFuture anyMf = pool.runAnyAsync(tasks); @@ -188,7 +188,8 @@ public void threadPoolAutosize() { @Test public void inheritThreadLocal() { Class.forName(Tasks.class.getName()); - //线程trace,支持异步trace包括Executor(ThreadPool), ScheduledExecutorService(WheelTimer), CompletableFuture.xxAsync()系列方法。 + ForkJoinPoolWrapper.transform(); + //线程trace,支持异步trace包括Executor(ThreadPool), ScheduledExecutorService(WheelTimer), CompletableFuture.xxAsync(), parallelStream()系列方法。 RxConfig.INSTANCE.getThreadPool().setTraceName("rx-traceId"); ThreadPool.traceIdGenerator = () -> UUID.randomUUID().toString().replace("-", ""); ThreadPool.onTraceIdChanged.combine((s, e) -> MDC.put("rx-traceId", e.getValue())); @@ -230,42 +231,67 @@ public void inheritThreadLocal() { // sleep(4000); //CompletableFuture.xxAsync异步方法正确获取trace - ThreadPool.startTrace(null); - CompletableFuture cf1 = null, cf2 = null; - for (int i = 0; i < 2; i++) { - int finalI = i; - cf1 = pool.runAsync(() -> { - log.info("TRACE ASYNC-1 {}", finalI); - pool.runAsync(() -> { - log.info("TRACE ASYNC-1_1 {}", finalI); - sleep(oneSecond); - }).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-1_1 uni {}", r)); - sleep(oneSecond); - }).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-1 uni {}", r)); - log.info("TRACE ASYNC MAIN {}", finalI); - cf2 = pool.runAsync(() -> { - log.info("TRACE ASYNC-2 {}", finalI); - sleep(oneSecond); - }).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-2 uni {}", r)); - } - //CompletableFuture.allOf -// pool.runAllAsync(org.rx.core.Arrays.toList(f1, f2)).getFuture().get(5, TimeUnit.SECONDS); - log.info("allOf start"); - CompletableFuture.allOf(cf1, cf2).whenCompleteAsync((r, e) -> { - log.info("TRACE ALL-OF {}", r); - }).get(10, TimeUnit.SECONDS); - log.info("allOf end"); +// ThreadPool.startTrace(null); +// for (int i = 0; i < 2; i++) { +// int finalI = i; +// CompletableFuture cf1 = pool.runAsync(() -> { +// log.info("TRACE ASYNC-1 {}", finalI); +// pool.runAsync(() -> { +// log.info("TRACE ASYNC-1_1 {}", finalI); +// sleep(oneSecond); +// }).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-1_1 uni {}", r)); +// sleep(oneSecond); +// }).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-1 uni {}", r)); +// log.info("TRACE ASYNC MAIN {}", finalI); +// CompletableFuture cf2 = pool.runAsync(() -> { +// log.info("TRACE ASYNC-2 {}", finalI); +// sleep(oneSecond); +// }).whenCompleteAsync((r, e) -> log.info("TRACE ASYNC-2 uni {}", r)); +// } // ThreadPool.endTrace(); - sleep(5000); +// +// ThreadPool.startTrace(null); +// log.info("TRACE ALL_OF start"); +// CompletableFuture.allOf(pool.runAsync(() -> { +// log.info("TRACE ALL_OF ASYNC-1"); +// pool.runAsync(() -> { +// log.info("TRACE ALL_OF ASYNC-1_1"); +// sleep(oneSecond); +// }).whenCompleteAsync((r, e) -> log.info("TRACE ALL_OF ASYNC-1_1 uni {}", r)); +// sleep(oneSecond); +// }).whenCompleteAsync((r, e) -> log.info("TRACE ALL_OF ASYNC-1 uni {}", r)), pool.runAsync(() -> { +// log.info("TRACE ALL_OF ASYNC-2"); +// sleep(oneSecond); +// }).whenCompleteAsync((r, e) -> log.info("TRACE ALL_OF ASYNC-2 uni {}", r))).whenCompleteAsync((r, e) -> { +// log.info("TRACE ALL-OF {}", r); +// }).get(10, TimeUnit.SECONDS); +// log.info("TRACE ALL_OF end"); +// ThreadPool.endTrace(); +// sleep(5000); //parallelStream -// ThreadPool.startTrace(null); + ThreadPool.startTrace(null); Arrays.toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).parallelStream().map(p -> { + //todo + Arrays.toList("a", "b", "c").parallelStream().map(x -> { + log.info("parallelStream {} -> {}", p, x); + return x.toString(); + }).collect(Collectors.toList()); log.info("parallelStream {}", p); return p.toString(); }).collect(Collectors.toList()); ThreadPool.endTrace(); + //timer + ThreadPool.startTrace(null); + Tasks.timer().setTimeout(() -> { + log.info("TIMER 1"); + pool.run(() -> { + log.info("TIMER 2"); + }); + }, d -> d > 5000 ? -1 : Math.max(d * 2, 1000), null, TimeoutFlag.PERIOD.flags()); + ThreadPool.endTrace(); + sleep(8000); // // //netty FastThreadLocal 支持继承 // FastThreadLocal ftl = new FastThreadLocal<>();