From 760271b7d9c31b95969179d27da48f9dda177082 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Tue, 15 Oct 2024 19:58:53 +0800 Subject: [PATCH] fix: retrieval and frame queue --- .../datasource/DataAvailabilityProvider.java | 4 +- .../derive/datasource/impl/L1Retrieval.java | 50 ++++++++-- .../derive/datasource/impl/L1Traversal.java | 15 +-- .../derive/exception/FrameParseException.java | 33 +++++++ .../v2/derive/stages/ChannelBankProvider.java | 2 +- .../optimism/v2/derive/stages/DataIter.java | 12 ++- .../v2/derive/stages/FrameQueueProvider.java | 2 +- .../v2/derive/stages/L1RetrievalProvider.java | 2 +- .../v2/derive/stages/impl/FrameQueue.java | 98 ++++++++++++++++++- .../io/optimism/v2/derive/types/Frame.java | 80 +++++++++++---- 10 files changed, 256 insertions(+), 42 deletions(-) create mode 100644 src/main/java/io/optimism/v2/derive/exception/FrameParseException.java diff --git a/src/main/java/io/optimism/v2/derive/datasource/DataAvailabilityProvider.java b/src/main/java/io/optimism/v2/derive/datasource/DataAvailabilityProvider.java index 98c406d..0d46b83 100644 --- a/src/main/java/io/optimism/v2/derive/datasource/DataAvailabilityProvider.java +++ b/src/main/java/io/optimism/v2/derive/datasource/DataAvailabilityProvider.java @@ -1,7 +1,7 @@ package io.optimism.v2.derive.datasource; -import io.optimism.types.BlockInfo; import io.optimism.v2.derive.stages.DataIter; +import io.optimism.v2.derive.types.BlockInfo; /** * the data availability provider interface. @@ -11,5 +11,5 @@ */ public interface DataAvailabilityProvider { - DataIter openData(BlockInfo l1Ref, String batcherAddr); + DataIter openData(BlockInfo l1Ref); } diff --git a/src/main/java/io/optimism/v2/derive/datasource/impl/L1Retrieval.java b/src/main/java/io/optimism/v2/derive/datasource/impl/L1Retrieval.java index 58e20f2..3dda442 100644 --- a/src/main/java/io/optimism/v2/derive/datasource/impl/L1Retrieval.java +++ b/src/main/java/io/optimism/v2/derive/datasource/impl/L1Retrieval.java @@ -1,11 +1,16 @@ package io.optimism.v2.derive.datasource.impl; +import io.optimism.v2.derive.datasource.DataAvailabilityProvider; +import io.optimism.v2.derive.exception.PipelineEofException; +import io.optimism.v2.derive.stages.DataIter; import io.optimism.v2.derive.stages.FrameQueueProvider; +import io.optimism.v2.derive.stages.L1RetrievalProvider; import io.optimism.v2.derive.stages.OriginAdvancer; import io.optimism.v2.derive.stages.OriginProvider; import io.optimism.v2.derive.stages.ResettableStage; import io.optimism.v2.derive.types.BlockInfo; import io.optimism.v2.derive.types.SystemConfig; +import java.util.Optional; /** * the l1 chain data retrieval. @@ -13,21 +18,52 @@ * @author thinkAfCod * @since 0.4.6 */ -public class L1Retrieval implements FrameQueueProvider, OriginProvider, OriginAdvancer, ResettableStage { +public class L1Retrieval implements FrameQueueProvider { + + private final L1RetrievalProvider prev; + + private final DataAvailabilityProvider provider; + + private Optional data; + + /** + * L1Retrieval constructor. + * + * @param prev the previous stage + * @param provider the data availability provider + */ + public L1Retrieval(L1RetrievalProvider prev, DataAvailabilityProvider provider) { + this.prev = prev; + this.provider = provider; + this.data = Optional.empty(); + } @Override - public void advanceOrigin() {} + public byte[] next() { + if (data.isEmpty()) { + var next = this.prev.nextL1Block(); + this.data = Optional.ofNullable(this.provider.openData(next)); + } + if (this.data.isEmpty()) { + throw new PipelineEofException(""); + } + + return this.data.get().next(); + } @Override - public BlockInfo origin() { - return null; + public void advanceOrigin() { + ((OriginAdvancer) this.prev).advanceOrigin(); } @Override - public void reset(BlockInfo base, SystemConfig config) {} + public BlockInfo origin() { + return ((OriginProvider) this.prev).origin(); + } @Override - public byte[] next() { - return new byte[0]; + public void reset(BlockInfo base, SystemConfig config) { + ((ResettableStage) this.prev).reset(base, config); + this.data = Optional.ofNullable(this.provider.openData(base)); } } diff --git a/src/main/java/io/optimism/v2/derive/datasource/impl/L1Traversal.java b/src/main/java/io/optimism/v2/derive/datasource/impl/L1Traversal.java index 1a19673..ad26b4c 100644 --- a/src/main/java/io/optimism/v2/derive/datasource/impl/L1Traversal.java +++ b/src/main/java/io/optimism/v2/derive/datasource/impl/L1Traversal.java @@ -6,9 +6,6 @@ import io.optimism.v2.derive.exception.PipelineEofException; import io.optimism.v2.derive.exception.PipelineProviderException; import io.optimism.v2.derive.stages.L1RetrievalProvider; -import io.optimism.v2.derive.stages.OriginAdvancer; -import io.optimism.v2.derive.stages.OriginProvider; -import io.optimism.v2.derive.stages.ResettableStage; import io.optimism.v2.derive.types.BlockInfo; import io.optimism.v2.derive.types.SystemConfig; import java.math.BigInteger; @@ -21,7 +18,7 @@ * @author thinkAfCod * @since 0.4.6 */ -public class L1Traversal implements L1RetrievalProvider, OriginProvider, OriginAdvancer, ResettableStage { +public class L1Traversal implements L1RetrievalProvider { private final Config.ChainConfig rollupConfig; @@ -40,7 +37,12 @@ public L1Traversal(Config.ChainConfig rollupConfig, ChainProvider provider) { @Override public BlockInfo nextL1Block() { - return this.block; + if (!this.done) { + this.done = true; + return this.block; + } else { + throw new PipelineEofException(); + } } @Override @@ -51,7 +53,7 @@ public String batcherAddr() { @Override public void advanceOrigin() { if (this.block == null) { - throw new PipelineEofException(); + throw new PipelineEofException("Missing current block, can't advance origin with no reference."); } var block = this.block; @@ -84,6 +86,5 @@ public BlockInfo origin() { public void reset(BlockInfo base, SystemConfig config) { this.block = base; this.curSysConfig = config; - // metrics record stage reset for l1 traversal } } diff --git a/src/main/java/io/optimism/v2/derive/exception/FrameParseException.java b/src/main/java/io/optimism/v2/derive/exception/FrameParseException.java new file mode 100644 index 0000000..711bf7b --- /dev/null +++ b/src/main/java/io/optimism/v2/derive/exception/FrameParseException.java @@ -0,0 +1,33 @@ +package io.optimism.v2.derive.exception; + +/** + * The frame parse exception. + * + * @author thinkAfCod + * @since 0.4.6 + */ +public class FrameParseException extends RuntimeException { + /** Constructs a PipelineEofException. */ + public FrameParseException() { + super("parses frame failed"); + } + + /** + * Constructs a PipelineEofException with a custom message. + * + * @param message the custom error message + */ + public FrameParseException(String message) { + super(message); + } + + /** + * Constructs a new PipelineEofException with the specified detail message and cause. + * + * @param message the detail message + * @param cause the cause + */ + public FrameParseException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/io/optimism/v2/derive/stages/ChannelBankProvider.java b/src/main/java/io/optimism/v2/derive/stages/ChannelBankProvider.java index f702c04..2c4718a 100644 --- a/src/main/java/io/optimism/v2/derive/stages/ChannelBankProvider.java +++ b/src/main/java/io/optimism/v2/derive/stages/ChannelBankProvider.java @@ -8,7 +8,7 @@ * @author thinkAfCod * @since 0.4.6 */ -public interface ChannelBankProvider { +public interface ChannelBankProvider extends OriginProvider, OriginAdvancer, ResettableStage { /** * gets the next frame in the current channel * diff --git a/src/main/java/io/optimism/v2/derive/stages/DataIter.java b/src/main/java/io/optimism/v2/derive/stages/DataIter.java index a2843e1..6dc42d2 100644 --- a/src/main/java/io/optimism/v2/derive/stages/DataIter.java +++ b/src/main/java/io/optimism/v2/derive/stages/DataIter.java @@ -1,6 +1,16 @@ package io.optimism.v2.derive.stages; +/** + * the data iterator interface. + * + * @author thinkAfCod + * @since 0.4.6 + */ public interface DataIter { - byte[] Next(); + /** + * get the next data. + * @return the bytes of next data. + */ + byte[] next(); } diff --git a/src/main/java/io/optimism/v2/derive/stages/FrameQueueProvider.java b/src/main/java/io/optimism/v2/derive/stages/FrameQueueProvider.java index e8e4210..66b0303 100644 --- a/src/main/java/io/optimism/v2/derive/stages/FrameQueueProvider.java +++ b/src/main/java/io/optimism/v2/derive/stages/FrameQueueProvider.java @@ -6,7 +6,7 @@ * @author thinkAfCod * @since 0.4.6 */ -public interface FrameQueueProvider { +public interface FrameQueueProvider extends OriginProvider, OriginAdvancer, ResettableStage { /** * gets the bytes of the next raw frame. * diff --git a/src/main/java/io/optimism/v2/derive/stages/L1RetrievalProvider.java b/src/main/java/io/optimism/v2/derive/stages/L1RetrievalProvider.java index b462a53..867dd8b 100644 --- a/src/main/java/io/optimism/v2/derive/stages/L1RetrievalProvider.java +++ b/src/main/java/io/optimism/v2/derive/stages/L1RetrievalProvider.java @@ -8,7 +8,7 @@ * @author thinkAfCod * @since 0.4.6 */ -public interface L1RetrievalProvider { +public interface L1RetrievalProvider extends OriginProvider, OriginAdvancer, ResettableStage { /** * get the next L1 block info. * diff --git a/src/main/java/io/optimism/v2/derive/stages/impl/FrameQueue.java b/src/main/java/io/optimism/v2/derive/stages/impl/FrameQueue.java index e6e35ab..030ecf3 100644 --- a/src/main/java/io/optimism/v2/derive/stages/impl/FrameQueue.java +++ b/src/main/java/io/optimism/v2/derive/stages/impl/FrameQueue.java @@ -1,28 +1,118 @@ package io.optimism.v2.derive.stages.impl; +import io.optimism.config.Config; +import io.optimism.v2.derive.exception.PipelineProviderException; import io.optimism.v2.derive.stages.ChannelBankProvider; +import io.optimism.v2.derive.stages.FrameQueueProvider; import io.optimism.v2.derive.stages.OriginAdvancer; import io.optimism.v2.derive.stages.OriginProvider; import io.optimism.v2.derive.stages.ResettableStage; import io.optimism.v2.derive.types.BlockInfo; import io.optimism.v2.derive.types.Frame; import io.optimism.v2.derive.types.SystemConfig; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; public class FrameQueue implements ChannelBankProvider, OriginProvider, OriginAdvancer, ResettableStage { + private static final int QUEUE_SIZE = 1024; + + private final FrameQueueProvider prev; + + private final Config.ChainConfig rollupConfig; + + private List queue; + + /** + * The frame queue constructor. + * + * @param prev The previous stage in the pipeline + * @param rollupConfig The rollup configuration + */ + public FrameQueue(FrameQueueProvider prev, Config.ChainConfig rollupConfig) { + this.prev = prev; + this.rollupConfig = rollupConfig; + this.queue = new ArrayList<>(QUEUE_SIZE); + } + + /** + * loads more frames into the queue + */ + public void loadFrames() { + if (!this.queue.isEmpty()) { + return; + } + var data = this.prev.next(); + List frames = Frame.parseFrames(data); + this.queue.addAll(frames); + var origin = this.origin(); + if (origin == null) { + throw new PipelineProviderException("Missing origin"); + } + this.prune(origin); + } + + /** + * prunes frames if Holocene is active + * + * @param origin the l1 origin block + */ + public void prune(BlockInfo origin) { + if (!rollupConfig.isHolocene(origin.timestamp())) { + return; + } + int i = 0; + while (i < this.queue.size()) { + final var prevFrame = this.queue.get(i); + var nextFrame = this.queue.get(i + 1); + var extendsChannel = prevFrame.channelId().equals(nextFrame.channelId()); + if (extendsChannel && prevFrame.frameNumber() + 1 != nextFrame.frameNumber()) { + this.queue.remove(i + 1); + continue; + } + if (extendsChannel && prevFrame.isLastFrame()) { + this.queue.remove(i + 1); + continue; + } + if (!extendsChannel && !nextFrame.frameNumber().equals(0)) { + this.queue.remove(i + 1); + continue; + } + if (!extendsChannel + && !prevFrame.isLastFrame() + && nextFrame.frameNumber().equals(0)) { + this.queue = this.queue.stream() + .filter(f -> f.channelId().equals(prevFrame.channelId())) + .collect(Collectors.toList()); + continue; + } + i += 1; + } + } + @Override public Frame nextFrame() { - return null; + this.loadFrames(); + if (this.queue.isEmpty()) { + throw new PipelineProviderException("Not enough data"); + } + return this.queue.removeFirst(); } @Override - public void advanceOrigin() {} + public void advanceOrigin() { + this.prev.advanceOrigin(); + } @Override public BlockInfo origin() { - return null; + return this.prev.origin(); } @Override - public void reset(BlockInfo base, SystemConfig config) {} + public void reset(BlockInfo base, SystemConfig config) { + this.prev.reset(base, config); + this.queue = new ArrayList<>(QUEUE_SIZE); + } } diff --git a/src/main/java/io/optimism/v2/derive/types/Frame.java b/src/main/java/io/optimism/v2/derive/types/Frame.java index f995029..53af9dd 100644 --- a/src/main/java/io/optimism/v2/derive/types/Frame.java +++ b/src/main/java/io/optimism/v2/derive/types/Frame.java @@ -4,8 +4,11 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Shorts; import io.optimism.exceptions.InvalidFrameSizeException; +import io.optimism.v2.derive.exception.FrameParseException; import java.io.ByteArrayOutputStream; import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; @@ -36,6 +39,8 @@ public record Frame( */ public static final int FRAME_V0_OVER_HEAD_SIZE = 23; + public static final int MAX_FRAME_LEN = 1000000; + /** * Get tx bytes. * @@ -87,33 +92,72 @@ public byte[] encode() { * From data immutable pair. * * @param data the data - * @param offset the offset - * @param l1InclusionBlock the L1 inclusion block - * @return the immutable pair + * @return the immutable pair, left is frame, right is current read bytes */ - public static ImmutablePair from(byte[] data, int offset, BigInteger l1InclusionBlock) { - final byte[] frameDataMessage = ArrayUtils.subarray(data, offset, data.length); - if (frameDataMessage.length < 23) { + public static ImmutablePair from(byte[] data) { + if (data.length < FRAME_V0_OVER_HEAD_SIZE) { throw new InvalidFrameSizeException("invalid frame size"); } - - final BigInteger channelId = Numeric.toBigInt(ArrayUtils.subarray(frameDataMessage, 0, 16)); + var offset = 0; + final BigInteger channelId = Numeric.toBigInt(ArrayUtils.subarray(data, offset, 16)); + offset += 16; final int frameNumber = - Numeric.toBigInt(ArrayUtils.subarray(frameDataMessage, 16, 18)).intValue(); + Numeric.toBigInt(ArrayUtils.subarray(data, offset, offset + 2)).intValue(); + offset += 2; final int frameDataLen = - Numeric.toBigInt(ArrayUtils.subarray(frameDataMessage, 18, 22)).intValue(); - final int frameDataEnd = 22 + frameDataLen; + Numeric.toBigInt(ArrayUtils.subarray(data, offset, offset + 4)).intValue(); + offset += 4; - if (frameDataMessage.length < frameDataEnd) { - throw new InvalidFrameSizeException("invalid frame size"); + final int frameDataEnd = offset + frameDataLen; + + if (frameDataEnd < 0 || frameDataEnd > MAX_FRAME_LEN || data.length < frameDataEnd) { + throw new FrameParseException("invalid frame size"); } - final byte[] frameData = ArrayUtils.subarray(frameDataMessage, 22, frameDataEnd); - final boolean isLastFrame = frameDataMessage[frameDataEnd] != 0; + final byte[] frameData = ArrayUtils.subarray(data, offset, frameDataEnd); + final boolean isLastFrame = data[frameDataEnd] != 0; final Frame frame = new Frame(channelId, frameNumber, frameDataLen, frameData, isLastFrame); - LOGGER.debug(String.format( - "saw batcher tx: block=%d, number=%d, is_last=%b", l1InclusionBlock, frameNumber, isLastFrame)); + LOGGER.debug(String.format("saw batcher tx: number=%d, is_last=%b", frameNumber, isLastFrame)); + + return new ImmutablePair<>(frame, FRAME_V0_OVER_HEAD_SIZE + frameDataEnd); + } + + /** + * Parse frames. + * + * @param encoded the encoded + * @return the list of frames + */ + public static List parseFrames(byte[] encoded) { + if (encoded.length == 0) { + throw new FrameParseException("No frames"); + } + + if (encoded[0] != DERIVATION_VERSION_0) { + throw new FrameParseException("Unsupported version"); + } + + byte[] data = new byte[encoded.length - 1]; + System.arraycopy(encoded, 1, data, 0, data.length); + + List frames = new ArrayList<>(); + int offset = 0; + + while (offset < data.length) { + byte[] parseBytes = offset == 0 ? data : ArrayUtils.subarray(data, offset, data.length); + ImmutablePair result = Frame.from(parseBytes); + frames.add(result.left); + offset += result.right; + } + + if (offset != data.length) { + throw new FrameParseException("Data length mismatch"); + } + + if (frames.isEmpty()) { + throw new FrameParseException("No frames decoded"); + } - return new ImmutablePair<>(frame, offset + frameDataMessage.length); + return frames; } }