Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor(sequencer) improve code readability #1247

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions zk/stages/stage_interhashes.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package stages

import (
"errors"
"fmt"

"github.com/gateway-fm/cdk-erigon-lib/common"
Expand Down Expand Up @@ -522,7 +521,7 @@ func unwindZkSMT(ctx context.Context, logPrefix string, from, to uint64, db kv.R
for i := from; i >= to+1; i-- {
select {
case <-ctx.Done():
return trie.EmptyRoot, errors.New(fmt.Sprintf("[%s] Context done", logPrefix))
return trie.EmptyRoot, fmt.Errorf("[%s] Context done", logPrefix)
default:
}

Expand Down
42 changes: 20 additions & 22 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,42 +41,33 @@ func SpawnSequencingStage(
return err
}

highestBatchInDS, err := cfg.datastreamServer.GetHighestBatchNumber()
highestBatchInDs, err := cfg.datastreamServer.GetHighestBatchNumber()
if err != nil {
return err
}

if !cfg.zk.SequencerResequence || lastBatch >= highestBatchInDS {
if cfg.zk.SequencerResequence {
log.Info(fmt.Sprintf("[%s] Resequencing completed. Please restart sequencer without resequence flag.", s.LogPrefix()))
time.Sleep(10 * time.Second)
return nil
if lastBatch < highestBatchInDs {
if !cfg.zk.SequencerResequence {
panic(fmt.Sprintf("[%s] The node need re-sequencing but this option is disabled.", s.LogPrefix()))
}

err = sequencingStageStep(s, u, ctx, cfg, historyCfg, quiet, nil)
if err != nil {
return err
}
} else {
log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDS))
log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDs))

batches, err := cfg.datastreamServer.ReadBatches(lastBatch+1, highestBatchInDS)
batches, err := cfg.datastreamServer.ReadBatches(lastBatch+1, highestBatchInDs)
if err != nil {
return err
}

err = cfg.datastreamServer.UnwindToBatchStart(lastBatch + 1)
if err != nil {
if err = cfg.datastreamServer.UnwindToBatchStart(lastBatch + 1); err != nil {
return err
}

log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDS))

log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDs))
for _, batch := range batches {
batchJob := NewResequenceBatchJob(batch)
subBatchCount := 0
for batchJob.HasMoreBlockToProcess() {
if err = sequencingStageStep(s, u, ctx, cfg, historyCfg, quiet, batchJob); err != nil {
if err = sequencingBatchStep(s, u, ctx, cfg, historyCfg, batchJob); err != nil {
return err
}

Expand All @@ -88,18 +79,25 @@ func SpawnSequencingStage(
return fmt.Errorf("strict mode enabled, but resequenced batch %d has %d sub-batches", batchJob.batchToProcess[0].BatchNumber, subBatchCount)
}
}

return nil
}

if cfg.zk.SequencerResequence {
log.Info(fmt.Sprintf("[%s] Resequencing completed. Please restart sequencer without resequence flag.", s.LogPrefix()))
time.Sleep(10 * time.Minute)
return nil
}

return nil
return sequencingBatchStep(s, u, ctx, cfg, historyCfg, nil)
}

func sequencingStageStep(
func sequencingBatchStep(
s *stagedsync.StageState,
u stagedsync.Unwinder,
ctx context.Context,
cfg SequenceBlockCfg,
historyCfg stagedsync.HistoryCfg,
quiet bool,
resequenceBatchJob *ResequenceBatchJob,
) (err error) {
logPrefix := s.LogPrefix()
Expand Down Expand Up @@ -170,7 +168,7 @@ func sequencingStageStep(
// if we identify any. During normal operation this function will simply check and move on without performing
// any action.
if !batchState.isAnyRecovery() {
isUnwinding, err := alignExecutionToDatastream(batchContext, batchState, executionAt, u)
isUnwinding, err := alignExecutionToDatastream(batchContext, executionAt, u)
if err != nil {
// do not set shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart=false because of the error
return err
Expand Down
8 changes: 6 additions & 2 deletions zk/stages/stage_sequence_execute_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun
return checkedVerifierBundles, nil
}

func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchState, lastExecutedBlock uint64, u stagedsync.Unwinder) (bool, error) {
func alignExecutionToDatastream(batchContext *BatchContext, lastExecutedBlock uint64, u stagedsync.Unwinder) (bool, error) {
lastStartedDatastreamBatch, err := batchContext.cfg.datastreamServer.GetHighestBatchNumber()
if err != nil {
return false, err
Expand All @@ -131,7 +131,7 @@ func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchSta
}
}

if lastExecutedBlock != lastDatastreamBlock {
if lastExecutedBlock > lastDatastreamBlock {
block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastDatastreamBlock)
if err != nil {
return false, err
Expand All @@ -142,6 +142,10 @@ func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchSta
return true, nil
}

if lastExecutedBlock < lastDatastreamBlock {
panic(fmt.Errorf("[%s] Datastream is ahead of sequencer. Re-sequencing should have handled this case before even comming to this point", batchContext.s.LogPrefix()))
}

return false, nil
}

Expand Down
77 changes: 77 additions & 0 deletions zk/stages/stage_sequence_execute_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync"
dsTypes "github.com/ledgerwatch/erigon/zk/datastream/types"
"github.com/ledgerwatch/erigon/zk/l1_data"
zktx "github.com/ledgerwatch/erigon/zk/tx"
"github.com/ledgerwatch/erigon/zk/txpool"
"github.com/ledgerwatch/log/v3"
)

const maximumOverflowTransactionAttempts = 5
Expand Down Expand Up @@ -306,3 +308,78 @@ func (bbe *BuiltBlockElements) onFinishAddingTransaction(transaction types.Trans
bbe.executionResults = append(bbe.executionResults, execResult)
bbe.effectiveGases = append(bbe.effectiveGases, effectiveGas)
}

type resequenceTxMetadata struct {
blockNum int
txIndex int
}

type ResequenceBatchJob struct {
batchToProcess []*dsTypes.FullL2Block
StartBlockIndex int
StartTxIndex int
txIndexMap map[common.Hash]resequenceTxMetadata
}

func NewResequenceBatchJob(batch []*dsTypes.FullL2Block) *ResequenceBatchJob {
return &ResequenceBatchJob{
batchToProcess: batch,
StartBlockIndex: 0,
StartTxIndex: 0,
txIndexMap: make(map[common.Hash]resequenceTxMetadata),
}
}

func (r *ResequenceBatchJob) HasMoreBlockToProcess() bool {
return r.StartBlockIndex < len(r.batchToProcess)
}

func (r *ResequenceBatchJob) AtNewBlockBoundary() bool {
return r.StartTxIndex == 0
}

func (r *ResequenceBatchJob) CurrentBlock() *dsTypes.FullL2Block {
if r.HasMoreBlockToProcess() {
return r.batchToProcess[r.StartBlockIndex]
}
return nil
}

func (r *ResequenceBatchJob) YieldNextBlockTransactions(decoder zktx.TxDecoder) ([]types.Transaction, error) {
blockTransactions := make([]types.Transaction, 0)
if r.HasMoreBlockToProcess() {
block := r.CurrentBlock()
r.txIndexMap[block.L2Blockhash] = resequenceTxMetadata{r.StartBlockIndex, 0}

for i := r.StartTxIndex; i < len(block.L2Txs); i++ {
transaction := block.L2Txs[i]
tx, _, err := decoder(transaction.Encoded, transaction.EffectiveGasPricePercentage, block.ForkId)
if err != nil {
return nil, fmt.Errorf("decode tx error: %v", err)
}
r.txIndexMap[tx.Hash()] = resequenceTxMetadata{r.StartBlockIndex, i}
blockTransactions = append(blockTransactions, tx)
}
}

return blockTransactions, nil
}

func (r *ResequenceBatchJob) UpdateLastProcessedTx(h common.Hash) {
if idx, ok := r.txIndexMap[h]; ok {
block := r.batchToProcess[idx.blockNum]

if idx.txIndex >= len(block.L2Txs)-1 {
// we've processed all the transactions in this block
// move to the next block
r.StartBlockIndex = idx.blockNum + 1
r.StartTxIndex = 0
} else {
// move to the next transaction in the block
r.StartBlockIndex = idx.blockNum
r.StartTxIndex = idx.txIndex + 1
}
} else {
log.Warn("tx hash not found in tx index map", "hash", h)
}
}
86 changes: 2 additions & 84 deletions zk/stages/stage_sequence_execute_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ import (
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/ledgerwatch/erigon/zk/datastream/server"
dsTypes "github.com/ledgerwatch/erigon/zk/datastream/types"
"github.com/ledgerwatch/erigon/zk/hermez_db"
verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier"
"github.com/ledgerwatch/erigon/zk/tx"
zktx "github.com/ledgerwatch/erigon/zk/tx"
"github.com/ledgerwatch/erigon/zk/txpool"
zktypes "github.com/ledgerwatch/erigon/zk/types"
Expand Down Expand Up @@ -241,12 +239,7 @@ func prepareHeader(tx kv.RwTx, previousBlockNumber, deltaTimestamp, forcedTimest
}, parentBlock, nil
}

func prepareL1AndInfoTreeRelatedStuff(
sdb *stageDb,
batchState *BatchState,
proposedTimestamp uint64,
reuseL1InfoIndex bool,
) (
func prepareL1AndInfoTreeRelatedStuff(sdb *stageDb, batchState *BatchState, proposedTimestamp uint64, reuseL1InfoIndex bool) (
infoTreeIndexProgress uint64,
l1TreeUpdate *zktypes.L1InfoTreeUpdate,
l1TreeUpdateIndex uint64,
Expand Down Expand Up @@ -483,7 +476,7 @@ func newBlockDataChecker() *BlockDataChecker {
// adds bytes amounting to the block data and checks if the limit is reached
// if the limit is reached, the data is not added, so this can be reused again for next check
func (bdc *BlockDataChecker) AddBlockStartData() bool {
blockStartBytesAmount := tx.START_BLOCK_BATCH_L2_DATA_SIZE // tx.GenerateStartBlockBatchL2Data(deltaTimestamp, l1InfoTreeIndex) returns 65 long byte array
blockStartBytesAmount := zktx.START_BLOCK_BATCH_L2_DATA_SIZE // tx.GenerateStartBlockBatchL2Data(deltaTimestamp, l1InfoTreeIndex) returns 65 long byte array
// add in the changeL2Block transaction
if bdc.counter+blockStartBytesAmount > bdc.limit {
return true
Expand All @@ -504,78 +497,3 @@ func (bdc *BlockDataChecker) AddTransactionData(txL2Data []byte) bool {

return false
}

type txMatadata struct {
blockNum int
txIndex int
}

type ResequenceBatchJob struct {
batchToProcess []*dsTypes.FullL2Block
StartBlockIndex int
StartTxIndex int
txIndexMap map[common.Hash]txMatadata
}

func NewResequenceBatchJob(batch []*dsTypes.FullL2Block) *ResequenceBatchJob {
return &ResequenceBatchJob{
batchToProcess: batch,
StartBlockIndex: 0,
StartTxIndex: 0,
txIndexMap: make(map[common.Hash]txMatadata),
}
}

func (r *ResequenceBatchJob) HasMoreBlockToProcess() bool {
return r.StartBlockIndex < len(r.batchToProcess)
}

func (r *ResequenceBatchJob) AtNewBlockBoundary() bool {
return r.StartTxIndex == 0
}

func (r *ResequenceBatchJob) CurrentBlock() *dsTypes.FullL2Block {
if r.HasMoreBlockToProcess() {
return r.batchToProcess[r.StartBlockIndex]
}
return nil
}

func (r *ResequenceBatchJob) YieldNextBlockTransactions(decoder zktx.TxDecoder) ([]types.Transaction, error) {
blockTransactions := make([]types.Transaction, 0)
if r.HasMoreBlockToProcess() {
block := r.CurrentBlock()
r.txIndexMap[block.L2Blockhash] = txMatadata{r.StartBlockIndex, 0}

for i := r.StartTxIndex; i < len(block.L2Txs); i++ {
transaction := block.L2Txs[i]
tx, _, err := decoder(transaction.Encoded, transaction.EffectiveGasPricePercentage, block.ForkId)
if err != nil {
return nil, fmt.Errorf("decode tx error: %v", err)
}
r.txIndexMap[tx.Hash()] = txMatadata{r.StartBlockIndex, i}
blockTransactions = append(blockTransactions, tx)
}
}

return blockTransactions, nil
}

func (r *ResequenceBatchJob) UpdateLastProcessedTx(h common.Hash) {
if idx, ok := r.txIndexMap[h]; ok {
block := r.batchToProcess[idx.blockNum]

if idx.txIndex >= len(block.L2Txs)-1 {
// we've processed all the transactions in this block
// move to the next block
r.StartBlockIndex = idx.blockNum + 1
r.StartTxIndex = 0
} else {
// move to the next transaction in the block
r.StartBlockIndex = idx.blockNum
r.StartTxIndex = idx.txIndex + 1
}
} else {
log.Warn("tx hash not found in tx index map", "hash", h)
}
}
8 changes: 4 additions & 4 deletions zk/stages/stage_sequence_execute_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func TestResequenceBatchJob_YieldNextBlockTransactions(t *testing.T) {
},
StartBlockIndex: 0,
StartTxIndex: 0,
txIndexMap: make(map[common.Hash]txMatadata),
txIndexMap: make(map[common.Hash]resequenceTxMetadata),
},
expectedTxCount: 2,
expectedError: false,
Expand All @@ -332,7 +332,7 @@ func TestResequenceBatchJob_YieldNextBlockTransactions(t *testing.T) {
batchToProcess: []*dsTypes.FullL2Block{{}},
StartBlockIndex: 1,
StartTxIndex: 0,
txIndexMap: make(map[common.Hash]txMatadata),
txIndexMap: make(map[common.Hash]resequenceTxMetadata),
},
expectedTxCount: 0,
expectedError: false,
Expand Down Expand Up @@ -366,7 +366,7 @@ func TestResequenceBatchJob_YieldAndUpdate(t *testing.T) {
batchToProcess: batch,
StartBlockIndex: 0,
StartTxIndex: 1, // Start at block 0, index 1
txIndexMap: make(map[common.Hash]txMatadata),
txIndexMap: make(map[common.Hash]resequenceTxMetadata),
}

processTransactions := func(txs []types.Transaction) {
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestResequenceBatchJob_YieldAndUpdate(t *testing.T) {
}

// Verify txIndexMap
expectedTxIndexMap := map[common.Hash]txMatadata{
expectedTxIndexMap := map[common.Hash]resequenceTxMetadata{
common.HexToHash("0"): {0, 0},
common.HexToHash("1"): {1, 0},
common.HexToHash("2"): {2, 0},
Expand Down
Loading