Skip to content

Commit

Permalink
fix: miss tx
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkAfCod committed Feb 4, 2024
1 parent 1901986 commit 1646a1b
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 25 deletions.
31 changes: 29 additions & 2 deletions hildr-node/src/main/java/io/optimism/derive/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.response.EthBlock;
Expand All @@ -24,12 +27,16 @@
*/
public class State {

private static final Logger LOGGER = LoggerFactory.getLogger(State.class);

private final TreeMap<String, L1Info> l1Info;

private final TreeMap<BigInteger, String> l1Hashes;

private final TreeMap<BigInteger, Tuple2<BlockInfo, Epoch>> l2Refs;

private final Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher;

private BlockInfo safeHead;

private Epoch safeEpoch;
Expand All @@ -44,6 +51,7 @@ public class State {
* @param l1Info the L1 info
* @param l1Hashes the L1 hashes
* @param l2Refs the L2 block info references
* @param l2Fetcher the L2 block info fetcher
* @param safeHead the safe head
* @param safeEpoch the safe epoch
* @param currentEpochNum the current epoch num
Expand All @@ -53,13 +61,15 @@ public State(
TreeMap<String, L1Info> l1Info,
TreeMap<BigInteger, String> l1Hashes,
TreeMap<BigInteger, Tuple2<BlockInfo, Epoch>> l2Refs,
Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher,
BlockInfo safeHead,
Epoch safeEpoch,
BigInteger currentEpochNum,
Config config) {
this.l1Info = l1Info;
this.l1Hashes = l1Hashes;
this.l2Refs = l2Refs;
this.l2Fetcher = l2Fetcher;
this.safeHead = safeHead;
this.safeEpoch = safeEpoch;
this.currentEpochNum = currentEpochNum;
Expand All @@ -70,18 +80,27 @@ public State(
* Create state.
*
* @param l2Refs the L2 block info references
* @param l2Fetcher the L2 block info fetcher
* @param finalizedHead the finalized head
* @param finalizedEpoch the finalized epoch
* @param config the config
* @return the state
*/
public static State create(
TreeMap<BigInteger, Tuple2<BlockInfo, Epoch>> l2Refs,
Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher,
BlockInfo finalizedHead,
Epoch finalizedEpoch,
Config config) {
return new State(
new TreeMap<>(), new TreeMap<>(), l2Refs, finalizedHead, finalizedEpoch, BigInteger.ZERO, config);
new TreeMap<>(),
new TreeMap<>(),
l2Refs,
l2Fetcher,
finalizedHead,
finalizedEpoch,
BigInteger.ZERO,
config);
}

/**
Expand Down Expand Up @@ -119,7 +138,14 @@ public Tuple2<BlockInfo, Epoch> l2Info(BigInteger timestamp) {
.subtract(config.chainConfig().l2Genesis().timestamp())
.divide(config.chainConfig().blockTime())
.add(config.chainConfig().l2Genesis().number());
return this.l2Refs.get(blockNum);
var cache = l2Refs.get(blockNum);
if (cache != null) {
return cache;
}

var res = l2Fetcher.apply(blockNum);
this.l2Refs.put(res.component1().number(), res);
return res;
}

/**
Expand Down Expand Up @@ -173,6 +199,7 @@ public void updateL1Info(L1Info l1Info) {
* @param safeEpoch the safe epoch
*/
public void purge(BlockInfo safeHead, Epoch safeEpoch) {
LOGGER.info("purge state: safeHead.number={}, safeEpoch. ={}", safeHead.number(), safeEpoch.hash());
this.safeHead = safeHead;
this.safeEpoch = safeEpoch;
this.l1Info.clear();
Expand Down
59 changes: 39 additions & 20 deletions hildr-node/src/main/java/io/optimism/derive/stages/Batches.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public Batches(TreeMap<BigInteger, Batch> batches, I channelIterator, AtomicRefe
public void purge() {
this.channelIterator.purge();
this.batches.clear();
if (!this.nextSingularBatches.isEmpty()) {
LOGGER.warn("batches has element but will be discarded");
}
this.nextSingularBatches.clear();
}

Expand All @@ -86,8 +89,13 @@ public Batch next() {
}
Channel channel = this.channelIterator.next();
if (channel != null) {
decodeBatches(this.config.chainConfig(), channel)
.forEach(batch -> this.batches.put(batch.batch().getTimestamp(), batch));
decodeBatches(this.config.chainConfig(), channel).forEach(batch -> {
Batch prev = this.batches.put(batch.batch().getTimestamp(), batch);
if (prev != null) {
LOGGER.warn(
"batch was replaced: timestamp={}", batch.batch().getTimestamp());
}
});
}

Batch derivedBatch = null;
Expand Down Expand Up @@ -121,24 +129,28 @@ public Batch next() {
this.nextSingularBatches.addAll(singularBatches);
return this.nextSingularBatches.poll();
}
}

State state = this.state.get();

BigInteger currentL1Block = state.getCurrentEpochNum();
BlockInfo safeHead = state.getSafeHead();
Epoch epoch = state.getSafeEpoch();
Epoch nextEpoch = state.epoch(epoch.number().add(BigInteger.ONE));
BigInteger seqWindowSize = this.config.chainConfig().seqWindowSize();

if (nextEpoch != null) {
if (currentL1Block.compareTo(epoch.number().add(seqWindowSize)) > 0) {
BigInteger nextTimestamp =
safeHead.timestamp().add(this.config.chainConfig().blockTime());
Epoch epochRes = nextTimestamp.compareTo(nextEpoch.timestamp()) < 0 ? epoch : nextEpoch;
var singularBatch = new SingularBatch(
safeHead.parentHash(), epochRes.number(), epochRes.hash(), nextTimestamp, Lists.newArrayList());
batch = new Batch(singularBatch, currentL1Block);
} else {
State state = this.state.get();

BigInteger currentL1Block = state.getCurrentEpochNum();
BlockInfo safeHead = state.getSafeHead();
Epoch epoch = state.getSafeEpoch();
Epoch nextEpoch = state.epoch(epoch.number().add(BigInteger.ONE));
BigInteger seqWindowSize = this.config.chainConfig().seqWindowSize();

if (nextEpoch != null) {
if (currentL1Block.compareTo(epoch.number().add(seqWindowSize)) > 0) {
BigInteger nextTimestamp =
safeHead.timestamp().add(this.config.chainConfig().blockTime());
Epoch epochRes = nextTimestamp.compareTo(nextEpoch.timestamp()) < 0 ? epoch : nextEpoch;
var singularBatch = new SingularBatch(
safeHead.parentHash(),
epochRes.number(),
epochRes.hash(),
nextTimestamp,
Lists.newArrayList());
batch = new Batch(singularBatch, currentL1Block);
}
}
}
return batch;
Expand All @@ -147,6 +159,7 @@ public Batch next() {
/**
* Decode batches list.
*
* @param chainConfig the chain config
* @param channel the channel
* @return the list
*/
Expand Down Expand Up @@ -485,6 +498,12 @@ private List<SingularBatch> toSingularBatches(final SpanBatch batch, final State
List<SingularBatch> singularBatches = new ArrayList<>();
for (SpanBatchElement element : batch.getBatches()) {
if (element.timestamp().compareTo(state.getSafeHead().timestamp()) <= 0) {
if (!element.transactions().isEmpty()) {
LOGGER.warn(
"past span batch element: timestamp{{}} <= safeHead.timestamp={{}}",
element.timestamp(),
state.getSafeHead().timestamp());
}
continue;
}
SingularBatch singularBatch = new SingularBatch();
Expand Down
27 changes: 24 additions & 3 deletions hildr-node/src/main/java/io/optimism/driver/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.tuples.generated.Tuple2;
import org.web3j.utils.Numeric;

/**
Expand Down Expand Up @@ -166,7 +168,7 @@ public EngineDriver<E> getEngineDriver() {
*/
public static Driver<EngineApi> from(Config config, CountDownLatch latch)
throws InterruptedException, ExecutionException {
Web3j provider = Web3jProvider.createClient(config.l2RpcUrl());
final Web3j provider = Web3jProvider.createClient(config.l2RpcUrl());

EthBlock finalizedBlock;
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Expand Down Expand Up @@ -207,8 +209,27 @@ public static Driver<EngineApi> from(Config config, CountDownLatch latch)
config);

var l2Refs = io.optimism.derive.State.initL2Refs(finalizedHead.number(), config.chainConfig(), provider);
AtomicReference<io.optimism.derive.State> state =
new AtomicReference<>(io.optimism.derive.State.create(l2Refs, finalizedHead, finalizedEpoch, config));
var l2Fetcher = (Function<BigInteger, Tuple2<BlockInfo, Epoch>>) blockNum -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
StructuredTaskScope.Subtask<EthBlock> blockTask = scope.fork(TracerTaskWrapper.wrap(
() -> provider.ethGetBlockByNumber(DefaultBlockParameter.valueOf(blockNum), true)
.send()));
scope.join();
scope.throwIfFailed();

var block = blockTask.get();
if (block == null) {
return null;
}
final HeadInfo l2BlockInfo = HeadInfo.from(block.getBlock());
return new Tuple2<>(l2BlockInfo.l2BlockInfo(), l2BlockInfo.l1Epoch());
} catch (Exception e) {
LOGGER.error("failed to fetch L2 block", e);
return null;
}
};
AtomicReference<io.optimism.derive.State> state = new AtomicReference<>(
io.optimism.derive.State.create(l2Refs, l2Fetcher, finalizedHead, finalizedEpoch, config));

EngineDriver<EngineApi> engineDriver = new EngineDriver<>(finalizedHead, finalizedEpoch, provider, config);

Expand Down

0 comments on commit 1646a1b

Please sign in to comment.