Skip to content

Commit

Permalink
fix: p2p network repeat starting
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkAfCod authored and GrapeBaBa committed Dec 19, 2023
1 parent 48ffdd4 commit 6740c00
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 63 deletions.
8 changes: 5 additions & 3 deletions hildr-node/src/main/java/io/optimism/driver/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -115,7 +116,7 @@ public class Driver<E extends Engine> extends AbstractExecutionThreadService {

private final OpStackNetwork opStackNetwork;

private volatile boolean isP2PNetworkStarted;
private final AtomicBoolean isP2PNetworkStarted;

/**
* Instantiates a new Driver.
Expand Down Expand Up @@ -158,6 +159,7 @@ public Driver(
rpcHandler.put(RpcMethod.OP_SYNC_STATUS.getRpcMethodName(), unused -> this.getSyncStatus());
rpcHandler.put(RpcMethod.OP_ROLLUP_CONFIG.getRpcMethodName(), unused -> this.getRollupConfig());
this.rpcServer.register(rpcHandler);
this.isP2PNetworkStarted = new AtomicBoolean(false);
}

/**
Expand Down Expand Up @@ -372,7 +374,7 @@ protected void shutDown() {
LOGGER.info("engineDriver shut down.");
this.rpcServer.stop();
LOGGER.info("driver stopped.");
if (this.opStackNetwork != null && this.isP2PNetworkStarted) {
if (this.opStackNetwork != null && this.isP2PNetworkStarted.compareAndExchange(true, false)) {
this.opStackNetwork.stop();
LOGGER.info("opStackNetwork stopped.");
}
Expand Down Expand Up @@ -565,7 +567,7 @@ private boolean synced() {
}

private void tryStartNetwork() {
if (this.synced() && this.opStackNetwork != null && !this.isP2PNetworkStarted) {
if (this.synced() && this.opStackNetwork != null && !this.isP2PNetworkStarted.compareAndExchange(false, true)) {
this.opStackNetwork.start();
}
}
Expand Down
15 changes: 2 additions & 13 deletions hildr-node/src/main/java/io/optimism/l1/ChainWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.optimism.common.BlockInfo;
import io.optimism.config.Config;
import java.math.BigInteger;
import java.util.concurrent.Executors;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscGrowableArrayQueue;

Expand Down Expand Up @@ -55,12 +54,7 @@ public MessagePassingQueue<BlockUpdate> getBlockUpdateQueue() {
public ChainWatcher(BigInteger l1StartBlock, BigInteger l2StartBlock, Config config) {
this.config = config;
this.blockUpdateQueue = new MpscGrowableArrayQueue<>(1024 * 4, 1024 * 64);
this.innerWatcher = new InnerWatcher(
this.config,
this.blockUpdateQueue,
l1StartBlock,
l2StartBlock,
Executors.newVirtualThreadPerTaskExecutor());
this.innerWatcher = new InnerWatcher(this.config, this.blockUpdateQueue, l1StartBlock, l2StartBlock);
}

/** start ChainWatcher. */
Expand All @@ -82,12 +76,7 @@ public void stop() {
public void restart(BigInteger l1StartBlock, BigInteger l2StartBlock) {
this.stop();
this.blockUpdateQueue = new MpscGrowableArrayQueue<>(1024 * 4, 1024 * 64);
this.innerWatcher = new InnerWatcher(
this.config,
this.blockUpdateQueue,
l1StartBlock,
l2StartBlock,
Executors.newVirtualThreadPerTaskExecutor());
this.innerWatcher = new InnerWatcher(this.config, this.blockUpdateQueue, l1StartBlock, l2StartBlock);
this.start();
}

Expand Down
55 changes: 23 additions & 32 deletions hildr-node/src/main/java/io/optimism/l1/InnerWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.optimism.l1.BlockUpdate.FinalityUpdate;
import io.optimism.utilities.rpc.Web3jProvider;
import io.optimism.utilities.telemetry.Logging;
import io.optimism.utilities.telemetry.TracerTaskWrapper;
import io.reactivex.disposables.Disposable;
import java.math.BigInteger;
import java.time.Duration;
Expand All @@ -38,8 +39,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.StructuredTaskScope;
import java.util.stream.Collectors;
import org.java_websocket.exceptions.WebsocketNotConnectedException;
import org.jctools.queues.MessagePassingQueue;
Expand Down Expand Up @@ -88,8 +88,6 @@ public class InnerWatcher extends AbstractExecutionThreadService {
new TypeReference<Uint256>() {},
new TypeReference<Bytes>() {})));

private final ExecutorService executor;

/** Global Config. */
private final Config config;

Expand Down Expand Up @@ -153,15 +151,9 @@ public class InnerWatcher extends AbstractExecutionThreadService {
* @param queue the Queue to send block updates
* @param l1StartBlock the start block number of l1
* @param l2StartBlock the start block number of l2
* @param executor the executor for async request
*/
public InnerWatcher(
Config config,
MessagePassingQueue<BlockUpdate> queue,
BigInteger l1StartBlock,
BigInteger l2StartBlock,
ExecutorService executor) {
this.executor = executor;
Config config, MessagePassingQueue<BlockUpdate> queue, BigInteger l1StartBlock, BigInteger l2StartBlock) {
this.config = config;
this.provider = Web3jProvider.createClient(config.l1RpcUrl());
this.wsProvider = Web3jProvider.createClient(config.l1WsRpcUrl());
Expand Down Expand Up @@ -411,29 +403,34 @@ private EthBlock.Block pollBlock(
final Web3j client, final DefaultBlockParameter parameter, final boolean fullTxObjectFlag)
throws ExecutionException, InterruptedException {
LOGGER.debug("will poll block: {}", parameter.getValue());
EthBlock.Block block = this.executor
.submit(() ->
client.ethGetBlockByNumber(parameter, fullTxObjectFlag).send())
.get()
.getBlock();
if (block == null) {
throw new BlockNotIncludedException();
}
if (block.getNumber() == null) {
throw new BlockNotIncludedException();
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var fork = scope.fork(TracerTaskWrapper.wrap(() ->
client.ethGetBlockByNumber(parameter, fullTxObjectFlag).send()));
scope.join();
scope.throwIfFailed();
var block = fork.get().getBlock();
if (block == null) {
throw new BlockNotIncludedException();
}
if (block.getNumber() == null) {
throw new BlockNotIncludedException();
}
return block;
}
return block;
}

private EthLog getLog(BigInteger fromBlock, BigInteger toBlock, String contract, String topic)
throws ExecutionException, InterruptedException {
final EthFilter ethFilter = new EthFilter(
DefaultBlockParameter.valueOf(fromBlock), DefaultBlockParameter.valueOf(toBlock), contract)
.addSingleTopic(topic);

return this.executor
.submit(() -> this.provider.ethGetLogs(ethFilter).send())
.get();
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var fork = scope.fork(TracerTaskWrapper.wrap(
() -> this.provider.ethGetLogs(ethFilter).send()));
scope.join();
scope.throwIfFailed();
return fork.get();
}
}

private List<Attributes.UserDeposited> getDeposits(BigInteger blockNum)
Expand Down Expand Up @@ -505,14 +502,8 @@ protected void run() {
}
}

@Override
protected Executor executor() {
return this.executor;
}

@Override
protected void shutDown() {
this.executor.shutdown();
this.provider.shutdown();
if (!this.l1HeadListener.isDisposed()) {
this.l1HeadListener.dispose();
Expand Down
20 changes: 5 additions & 15 deletions hildr-node/src/test/java/io/optimism/l1/InnerWatcherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.Executors;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscGrowableArrayQueue;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

Expand All @@ -39,27 +38,18 @@ public class InnerWatcherTest {

private static Config config;

private static ExecutorService executor;

@BeforeAll
static void setUp() {
config = TestConstants.createConfig();
executor = Executors.newSingleThreadExecutor();
}

@AfterAll
static void tearDown() {
executor.shutdownNow();
}

InnerWatcher createWatcher(
BigInteger l2StartBlock, MessagePassingQueue<BlockUpdate> queue, ExecutorService executor) {
InnerWatcher createWatcher(BigInteger l2StartBlock, MessagePassingQueue<BlockUpdate> queue) {
var watcherl2StartBlock = l2StartBlock;
if (l2StartBlock == null) {
watcherl2StartBlock = config.chainConfig().l2Genesis().number();
}
return new InnerWatcher(
config, queue, config.chainConfig().l1StartEpoch().number(), watcherl2StartBlock, executor);
config, queue, config.chainConfig().l1StartEpoch().number(), watcherl2StartBlock);
}

@Test
Expand All @@ -68,8 +58,8 @@ void testCreateInnerWatcher() {
return;
}
var queue = new MpscGrowableArrayQueue<BlockUpdate>(1024 * 4, 1024 * 64);
var unused = this.createWatcher(null, queue, executor);
unused = this.createWatcher(config.chainConfig().l2Genesis().number().add(BigInteger.TEN), queue, executor);
var unused = this.createWatcher(null, queue);
unused = this.createWatcher(config.chainConfig().l2Genesis().number().add(BigInteger.TEN), queue);
}

@Test
Expand All @@ -79,7 +69,7 @@ void testTryIngestBlock() throws Exception {
}
ExecutorService executor = Executors.newSingleThreadExecutor();
var queue = new MpscGrowableArrayQueue<BlockUpdate>(1024 * 4, 1024 * 64);
var watcher = this.createWatcher(null, queue, executor);
var watcher = this.createWatcher(null, queue);
watcher.startUp();
watcher.tryIngestBlock();
assertEquals(2, queue.size());
Expand Down

0 comments on commit 6740c00

Please sign in to comment.