diff --git a/hildr-node/src/main/java/io/optimism/driver/Driver.java b/hildr-node/src/main/java/io/optimism/driver/Driver.java index af4559f1..90949f53 100644 --- a/hildr-node/src/main/java/io/optimism/driver/Driver.java +++ b/hildr-node/src/main/java/io/optimism/driver/Driver.java @@ -61,6 +61,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.StructuredTaskScope; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -115,7 +116,7 @@ public class Driver extends AbstractExecutionThreadService { private final OpStackNetwork opStackNetwork; - private volatile boolean isP2PNetworkStarted; + private final AtomicBoolean isP2PNetworkStarted; /** * Instantiates a new Driver. @@ -158,6 +159,7 @@ public Driver( rpcHandler.put(RpcMethod.OP_SYNC_STATUS.getRpcMethodName(), unused -> this.getSyncStatus()); rpcHandler.put(RpcMethod.OP_ROLLUP_CONFIG.getRpcMethodName(), unused -> this.getRollupConfig()); this.rpcServer.register(rpcHandler); + this.isP2PNetworkStarted = new AtomicBoolean(false); } /** @@ -372,7 +374,7 @@ protected void shutDown() { LOGGER.info("engineDriver shut down."); this.rpcServer.stop(); LOGGER.info("driver stopped."); - if (this.opStackNetwork != null && this.isP2PNetworkStarted) { + if (this.opStackNetwork != null && this.isP2PNetworkStarted.compareAndExchange(true, false)) { this.opStackNetwork.stop(); LOGGER.info("opStackNetwork stopped."); } @@ -565,7 +567,7 @@ private boolean synced() { } private void tryStartNetwork() { - if (this.synced() && this.opStackNetwork != null && !this.isP2PNetworkStarted) { + if (this.synced() && this.opStackNetwork != null && !this.isP2PNetworkStarted.compareAndExchange(false, true)) { this.opStackNetwork.start(); } } diff --git a/hildr-node/src/main/java/io/optimism/l1/ChainWatcher.java b/hildr-node/src/main/java/io/optimism/l1/ChainWatcher.java index f31f8c87..3c90a34d 100644 --- a/hildr-node/src/main/java/io/optimism/l1/ChainWatcher.java +++ b/hildr-node/src/main/java/io/optimism/l1/ChainWatcher.java @@ -19,7 +19,6 @@ import io.optimism.common.BlockInfo; import io.optimism.config.Config; import java.math.BigInteger; -import java.util.concurrent.Executors; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscGrowableArrayQueue; @@ -55,12 +54,7 @@ public MessagePassingQueue getBlockUpdateQueue() { public ChainWatcher(BigInteger l1StartBlock, BigInteger l2StartBlock, Config config) { this.config = config; this.blockUpdateQueue = new MpscGrowableArrayQueue<>(1024 * 4, 1024 * 64); - this.innerWatcher = new InnerWatcher( - this.config, - this.blockUpdateQueue, - l1StartBlock, - l2StartBlock, - Executors.newVirtualThreadPerTaskExecutor()); + this.innerWatcher = new InnerWatcher(this.config, this.blockUpdateQueue, l1StartBlock, l2StartBlock); } /** start ChainWatcher. */ @@ -82,12 +76,7 @@ public void stop() { public void restart(BigInteger l1StartBlock, BigInteger l2StartBlock) { this.stop(); this.blockUpdateQueue = new MpscGrowableArrayQueue<>(1024 * 4, 1024 * 64); - this.innerWatcher = new InnerWatcher( - this.config, - this.blockUpdateQueue, - l1StartBlock, - l2StartBlock, - Executors.newVirtualThreadPerTaskExecutor()); + this.innerWatcher = new InnerWatcher(this.config, this.blockUpdateQueue, l1StartBlock, l2StartBlock); this.start(); } diff --git a/hildr-node/src/main/java/io/optimism/l1/InnerWatcher.java b/hildr-node/src/main/java/io/optimism/l1/InnerWatcher.java index 4cc3c480..fb3c237b 100644 --- a/hildr-node/src/main/java/io/optimism/l1/InnerWatcher.java +++ b/hildr-node/src/main/java/io/optimism/l1/InnerWatcher.java @@ -30,6 +30,7 @@ import io.optimism.l1.BlockUpdate.FinalityUpdate; import io.optimism.utilities.rpc.Web3jProvider; import io.optimism.utilities.telemetry.Logging; +import io.optimism.utilities.telemetry.TracerTaskWrapper; import io.reactivex.disposables.Disposable; import java.math.BigInteger; import java.time.Duration; @@ -38,8 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.StructuredTaskScope; import java.util.stream.Collectors; import org.java_websocket.exceptions.WebsocketNotConnectedException; import org.jctools.queues.MessagePassingQueue; @@ -88,8 +88,6 @@ public class InnerWatcher extends AbstractExecutionThreadService { new TypeReference() {}, new TypeReference() {}))); - private final ExecutorService executor; - /** Global Config. */ private final Config config; @@ -153,15 +151,9 @@ public class InnerWatcher extends AbstractExecutionThreadService { * @param queue the Queue to send block updates * @param l1StartBlock the start block number of l1 * @param l2StartBlock the start block number of l2 - * @param executor the executor for async request */ public InnerWatcher( - Config config, - MessagePassingQueue queue, - BigInteger l1StartBlock, - BigInteger l2StartBlock, - ExecutorService executor) { - this.executor = executor; + Config config, MessagePassingQueue queue, BigInteger l1StartBlock, BigInteger l2StartBlock) { this.config = config; this.provider = Web3jProvider.createClient(config.l1RpcUrl()); this.wsProvider = Web3jProvider.createClient(config.l1WsRpcUrl()); @@ -411,18 +403,20 @@ private EthBlock.Block pollBlock( final Web3j client, final DefaultBlockParameter parameter, final boolean fullTxObjectFlag) throws ExecutionException, InterruptedException { LOGGER.debug("will poll block: {}", parameter.getValue()); - EthBlock.Block block = this.executor - .submit(() -> - client.ethGetBlockByNumber(parameter, fullTxObjectFlag).send()) - .get() - .getBlock(); - if (block == null) { - throw new BlockNotIncludedException(); - } - if (block.getNumber() == null) { - throw new BlockNotIncludedException(); + try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { + var fork = scope.fork(TracerTaskWrapper.wrap(() -> + client.ethGetBlockByNumber(parameter, fullTxObjectFlag).send())); + scope.join(); + scope.throwIfFailed(); + var block = fork.get().getBlock(); + if (block == null) { + throw new BlockNotIncludedException(); + } + if (block.getNumber() == null) { + throw new BlockNotIncludedException(); + } + return block; } - return block; } private EthLog getLog(BigInteger fromBlock, BigInteger toBlock, String contract, String topic) @@ -430,10 +424,13 @@ private EthLog getLog(BigInteger fromBlock, BigInteger toBlock, String contract, final EthFilter ethFilter = new EthFilter( DefaultBlockParameter.valueOf(fromBlock), DefaultBlockParameter.valueOf(toBlock), contract) .addSingleTopic(topic); - - return this.executor - .submit(() -> this.provider.ethGetLogs(ethFilter).send()) - .get(); + try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { + var fork = scope.fork(TracerTaskWrapper.wrap( + () -> this.provider.ethGetLogs(ethFilter).send())); + scope.join(); + scope.throwIfFailed(); + return fork.get(); + } } private List getDeposits(BigInteger blockNum) @@ -505,14 +502,8 @@ protected void run() { } } - @Override - protected Executor executor() { - return this.executor; - } - @Override protected void shutDown() { - this.executor.shutdown(); this.provider.shutdown(); if (!this.l1HeadListener.isDisposed()) { this.l1HeadListener.dispose(); diff --git a/hildr-node/src/test/java/io/optimism/l1/InnerWatcherTest.java b/hildr-node/src/test/java/io/optimism/l1/InnerWatcherTest.java index 20ffeaa4..95b3f516 100644 --- a/hildr-node/src/test/java/io/optimism/l1/InnerWatcherTest.java +++ b/hildr-node/src/test/java/io/optimism/l1/InnerWatcherTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.Executors; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscGrowableArrayQueue; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -39,27 +38,18 @@ public class InnerWatcherTest { private static Config config; - private static ExecutorService executor; - @BeforeAll static void setUp() { config = TestConstants.createConfig(); - executor = Executors.newSingleThreadExecutor(); - } - - @AfterAll - static void tearDown() { - executor.shutdownNow(); } - InnerWatcher createWatcher( - BigInteger l2StartBlock, MessagePassingQueue queue, ExecutorService executor) { + InnerWatcher createWatcher(BigInteger l2StartBlock, MessagePassingQueue queue) { var watcherl2StartBlock = l2StartBlock; if (l2StartBlock == null) { watcherl2StartBlock = config.chainConfig().l2Genesis().number(); } return new InnerWatcher( - config, queue, config.chainConfig().l1StartEpoch().number(), watcherl2StartBlock, executor); + config, queue, config.chainConfig().l1StartEpoch().number(), watcherl2StartBlock); } @Test @@ -68,8 +58,8 @@ void testCreateInnerWatcher() { return; } var queue = new MpscGrowableArrayQueue(1024 * 4, 1024 * 64); - var unused = this.createWatcher(null, queue, executor); - unused = this.createWatcher(config.chainConfig().l2Genesis().number().add(BigInteger.TEN), queue, executor); + var unused = this.createWatcher(null, queue); + unused = this.createWatcher(config.chainConfig().l2Genesis().number().add(BigInteger.TEN), queue); } @Test @@ -79,7 +69,7 @@ void testTryIngestBlock() throws Exception { } ExecutorService executor = Executors.newSingleThreadExecutor(); var queue = new MpscGrowableArrayQueue(1024 * 4, 1024 * 64); - var watcher = this.createWatcher(null, queue, executor); + var watcher = this.createWatcher(null, queue); watcher.startUp(); watcher.tryIngestBlock(); assertEquals(2, queue.size());