From 43cf6bd1fcaf835d2f3e51711af234f8782e27d3 Mon Sep 17 00:00:00 2001 From: Kamen Stoykov Date: Mon, 30 Sep 2024 10:44:14 +0000 Subject: [PATCH 1/3] improve code readability --- zk/stages/stage_interhashes.go | 3 +- zk/stages/stage_sequence_execute.go | 42 +++++----- .../stage_sequence_execute_data_stream.go | 8 +- zk/stages/stage_sequence_execute_state.go | 77 +++++++++++++++++ zk/stages/stage_sequence_execute_utils.go | 83 +------------------ .../stage_sequence_execute_utils_test.go | 8 +- 6 files changed, 109 insertions(+), 112 deletions(-) diff --git a/zk/stages/stage_interhashes.go b/zk/stages/stage_interhashes.go index 2b26ffa403a..e285ed05eb0 100644 --- a/zk/stages/stage_interhashes.go +++ b/zk/stages/stage_interhashes.go @@ -1,7 +1,6 @@ package stages import ( - "errors" "fmt" "github.com/gateway-fm/cdk-erigon-lib/common" @@ -522,7 +521,7 @@ func unwindZkSMT(ctx context.Context, logPrefix string, from, to uint64, db kv.R for i := from; i >= to+1; i-- { select { case <-ctx.Done(): - return trie.EmptyRoot, errors.New(fmt.Sprintf("[%s] Context done", logPrefix)) + return trie.EmptyRoot, fmt.Errorf("[%s] Context done", logPrefix) default: } diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index a200d888e2b..221c479ce73 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -41,42 +41,33 @@ func SpawnSequencingStage( return err } - highestBatchInDS, err := cfg.datastreamServer.GetHighestBatchNumber() + highestBatchInDs, err := cfg.datastreamServer.GetHighestBatchNumber() if err != nil { return err } - if !cfg.zk.SequencerResequence || lastBatch >= highestBatchInDS { - if cfg.zk.SequencerResequence { - log.Info(fmt.Sprintf("[%s] Resequencing completed. Please restart sequencer without resequence flag.", s.LogPrefix())) - time.Sleep(10 * time.Second) - return nil + if lastBatch < highestBatchInDs { + if !cfg.zk.SequencerResequence { + panic(fmt.Sprintf("[%s] The node need re-sequencing but this option is disabled.", s.LogPrefix())) } - err = sequencingStageStep(s, u, ctx, cfg, historyCfg, quiet, nil) - if err != nil { - return err - } - } else { - log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDS)) + log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDs)) - batches, err := cfg.datastreamServer.ReadBatches(lastBatch+1, highestBatchInDS) + batches, err := cfg.datastreamServer.ReadBatches(lastBatch+1, highestBatchInDs) if err != nil { return err } - err = cfg.datastreamServer.UnwindToBatchStart(lastBatch + 1) - if err != nil { + if err = cfg.datastreamServer.UnwindToBatchStart(lastBatch + 1); err != nil { return err } - log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDS)) - + log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDs)) for _, batch := range batches { batchJob := NewResequenceBatchJob(batch) subBatchCount := 0 for batchJob.HasMoreBlockToProcess() { - if err = sequencingStageStep(s, u, ctx, cfg, historyCfg, quiet, batchJob); err != nil { + if err = sequencingBatchStep(s, u, ctx, cfg, historyCfg, batchJob); err != nil { return err } @@ -88,18 +79,25 @@ func SpawnSequencingStage( return fmt.Errorf("strict mode enabled, but resequenced batch %d has %d sub-batches", batchJob.batchToProcess[0].BatchNumber, subBatchCount) } } + + return nil + } + + if cfg.zk.SequencerResequence { + log.Info(fmt.Sprintf("[%s] Resequencing completed. Please restart sequencer without resequence flag.", s.LogPrefix())) + time.Sleep(10 * time.Minute) + return nil } - return nil + return sequencingBatchStep(s, u, ctx, cfg, historyCfg, nil) } -func sequencingStageStep( +func sequencingBatchStep( s *stagedsync.StageState, u stagedsync.Unwinder, ctx context.Context, cfg SequenceBlockCfg, historyCfg stagedsync.HistoryCfg, - quiet bool, resequenceBatchJob *ResequenceBatchJob, ) (err error) { logPrefix := s.LogPrefix() @@ -170,7 +168,7 @@ func sequencingStageStep( // if we identify any. During normal operation this function will simply check and move on without performing // any action. if !batchState.isAnyRecovery() { - isUnwinding, err := alignExecutionToDatastream(batchContext, batchState, executionAt, u) + isUnwinding, err := alignExecutionToDatastream(batchContext, executionAt, u) if err != nil { // do not set shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart=false because of the error return err diff --git a/zk/stages/stage_sequence_execute_data_stream.go b/zk/stages/stage_sequence_execute_data_stream.go index a1e3606cb09..9631b317d98 100644 --- a/zk/stages/stage_sequence_execute_data_stream.go +++ b/zk/stages/stage_sequence_execute_data_stream.go @@ -109,7 +109,7 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, nil } -func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchState, lastExecutedBlock uint64, u stagedsync.Unwinder) (bool, error) { +func alignExecutionToDatastream(batchContext *BatchContext, lastExecutedBlock uint64, u stagedsync.Unwinder) (bool, error) { lastStartedDatastreamBatch, err := batchContext.cfg.datastreamServer.GetHighestBatchNumber() if err != nil { return false, err @@ -131,7 +131,7 @@ func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchSta } } - if lastExecutedBlock != lastDatastreamBlock { + if lastExecutedBlock > lastDatastreamBlock { block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastDatastreamBlock) if err != nil { return false, err @@ -142,6 +142,10 @@ func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchSta return true, nil } + if lastExecutedBlock < lastDatastreamBlock { + panic(fmt.Errorf("[%s] Datastream is ahead of sequencer. Re-sequencing should have handled this case before even comming to this point", batchContext.s.LogPrefix())) + } + return false, nil } diff --git a/zk/stages/stage_sequence_execute_state.go b/zk/stages/stage_sequence_execute_state.go index d756bf4c99f..291bd78e462 100644 --- a/zk/stages/stage_sequence_execute_state.go +++ b/zk/stages/stage_sequence_execute_state.go @@ -10,9 +10,11 @@ import ( "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync" + dsTypes "github.com/ledgerwatch/erigon/zk/datastream/types" "github.com/ledgerwatch/erigon/zk/l1_data" zktx "github.com/ledgerwatch/erigon/zk/tx" "github.com/ledgerwatch/erigon/zk/txpool" + "github.com/ledgerwatch/log/v3" ) const maximumOverflowTransactionAttempts = 5 @@ -306,3 +308,78 @@ func (bbe *BuiltBlockElements) onFinishAddingTransaction(transaction types.Trans bbe.executionResults = append(bbe.executionResults, execResult) bbe.effectiveGases = append(bbe.effectiveGases, effectiveGas) } + +type resequenceTxMatadata struct { + blockNum int + txIndex int +} + +type ResequenceBatchJob struct { + batchToProcess []*dsTypes.FullL2Block + StartBlockIndex int + StartTxIndex int + txIndexMap map[common.Hash]resequenceTxMatadata +} + +func NewResequenceBatchJob(batch []*dsTypes.FullL2Block) *ResequenceBatchJob { + return &ResequenceBatchJob{ + batchToProcess: batch, + StartBlockIndex: 0, + StartTxIndex: 0, + txIndexMap: make(map[common.Hash]resequenceTxMatadata), + } +} + +func (r *ResequenceBatchJob) HasMoreBlockToProcess() bool { + return r.StartBlockIndex < len(r.batchToProcess) +} + +func (r *ResequenceBatchJob) AtNewBlockBoundary() bool { + return r.StartTxIndex == 0 +} + +func (r *ResequenceBatchJob) CurrentBlock() *dsTypes.FullL2Block { + if r.HasMoreBlockToProcess() { + return r.batchToProcess[r.StartBlockIndex] + } + return nil +} + +func (r *ResequenceBatchJob) YieldNextBlockTransactions(decoder zktx.TxDecoder) ([]types.Transaction, error) { + blockTransactions := make([]types.Transaction, 0) + if r.HasMoreBlockToProcess() { + block := r.CurrentBlock() + r.txIndexMap[block.L2Blockhash] = resequenceTxMatadata{r.StartBlockIndex, 0} + + for i := r.StartTxIndex; i < len(block.L2Txs); i++ { + transaction := block.L2Txs[i] + tx, _, err := decoder(transaction.Encoded, transaction.EffectiveGasPricePercentage, block.ForkId) + if err != nil { + return nil, fmt.Errorf("decode tx error: %v", err) + } + r.txIndexMap[tx.Hash()] = resequenceTxMatadata{r.StartBlockIndex, i} + blockTransactions = append(blockTransactions, tx) + } + } + + return blockTransactions, nil +} + +func (r *ResequenceBatchJob) UpdateLastProcessedTx(h common.Hash) { + if idx, ok := r.txIndexMap[h]; ok { + block := r.batchToProcess[idx.blockNum] + + if idx.txIndex >= len(block.L2Txs)-1 { + // we've processed all the transactions in this block + // move to the next block + r.StartBlockIndex = idx.blockNum + 1 + r.StartTxIndex = 0 + } else { + // move to the next transaction in the block + r.StartBlockIndex = idx.blockNum + r.StartTxIndex = idx.txIndex + 1 + } + } else { + log.Warn("tx hash not found in tx index map", "hash", h) + } +} diff --git a/zk/stages/stage_sequence_execute_utils.go b/zk/stages/stage_sequence_execute_utils.go index 7f7e89d0a01..1a0cee26a81 100644 --- a/zk/stages/stage_sequence_execute_utils.go +++ b/zk/stages/stage_sequence_execute_utils.go @@ -29,7 +29,6 @@ import ( "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" "github.com/ledgerwatch/erigon/zk/datastream/server" - dsTypes "github.com/ledgerwatch/erigon/zk/datastream/types" "github.com/ledgerwatch/erigon/zk/hermez_db" verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" "github.com/ledgerwatch/erigon/zk/tx" @@ -241,12 +240,7 @@ func prepareHeader(tx kv.RwTx, previousBlockNumber, deltaTimestamp, forcedTimest }, parentBlock, nil } -func prepareL1AndInfoTreeRelatedStuff( - sdb *stageDb, - batchState *BatchState, - proposedTimestamp uint64, - reuseL1InfoIndex bool, -) ( +func prepareL1AndInfoTreeRelatedStuff(sdb *stageDb, batchState *BatchState, proposedTimestamp uint64, reuseL1InfoIndex bool) ( infoTreeIndexProgress uint64, l1TreeUpdate *zktypes.L1InfoTreeUpdate, l1TreeUpdateIndex uint64, @@ -504,78 +498,3 @@ func (bdc *BlockDataChecker) AddTransactionData(txL2Data []byte) bool { return false } - -type txMatadata struct { - blockNum int - txIndex int -} - -type ResequenceBatchJob struct { - batchToProcess []*dsTypes.FullL2Block - StartBlockIndex int - StartTxIndex int - txIndexMap map[common.Hash]txMatadata -} - -func NewResequenceBatchJob(batch []*dsTypes.FullL2Block) *ResequenceBatchJob { - return &ResequenceBatchJob{ - batchToProcess: batch, - StartBlockIndex: 0, - StartTxIndex: 0, - txIndexMap: make(map[common.Hash]txMatadata), - } -} - -func (r *ResequenceBatchJob) HasMoreBlockToProcess() bool { - return r.StartBlockIndex < len(r.batchToProcess) -} - -func (r *ResequenceBatchJob) AtNewBlockBoundary() bool { - return r.StartTxIndex == 0 -} - -func (r *ResequenceBatchJob) CurrentBlock() *dsTypes.FullL2Block { - if r.HasMoreBlockToProcess() { - return r.batchToProcess[r.StartBlockIndex] - } - return nil -} - -func (r *ResequenceBatchJob) YieldNextBlockTransactions(decoder zktx.TxDecoder) ([]types.Transaction, error) { - blockTransactions := make([]types.Transaction, 0) - if r.HasMoreBlockToProcess() { - block := r.CurrentBlock() - r.txIndexMap[block.L2Blockhash] = txMatadata{r.StartBlockIndex, 0} - - for i := r.StartTxIndex; i < len(block.L2Txs); i++ { - transaction := block.L2Txs[i] - tx, _, err := decoder(transaction.Encoded, transaction.EffectiveGasPricePercentage, block.ForkId) - if err != nil { - return nil, fmt.Errorf("decode tx error: %v", err) - } - r.txIndexMap[tx.Hash()] = txMatadata{r.StartBlockIndex, i} - blockTransactions = append(blockTransactions, tx) - } - } - - return blockTransactions, nil -} - -func (r *ResequenceBatchJob) UpdateLastProcessedTx(h common.Hash) { - if idx, ok := r.txIndexMap[h]; ok { - block := r.batchToProcess[idx.blockNum] - - if idx.txIndex >= len(block.L2Txs)-1 { - // we've processed all the transactions in this block - // move to the next block - r.StartBlockIndex = idx.blockNum + 1 - r.StartTxIndex = 0 - } else { - // move to the next transaction in the block - r.StartBlockIndex = idx.blockNum - r.StartTxIndex = idx.txIndex + 1 - } - } else { - log.Warn("tx hash not found in tx index map", "hash", h) - } -} diff --git a/zk/stages/stage_sequence_execute_utils_test.go b/zk/stages/stage_sequence_execute_utils_test.go index f5fe9d0eb50..39fafa5b8bb 100644 --- a/zk/stages/stage_sequence_execute_utils_test.go +++ b/zk/stages/stage_sequence_execute_utils_test.go @@ -321,7 +321,7 @@ func TestResequenceBatchJob_YieldNextBlockTransactions(t *testing.T) { }, StartBlockIndex: 0, StartTxIndex: 0, - txIndexMap: make(map[common.Hash]txMatadata), + txIndexMap: make(map[common.Hash]resequenceTxMatadata), }, expectedTxCount: 2, expectedError: false, @@ -332,7 +332,7 @@ func TestResequenceBatchJob_YieldNextBlockTransactions(t *testing.T) { batchToProcess: []*dsTypes.FullL2Block{{}}, StartBlockIndex: 1, StartTxIndex: 0, - txIndexMap: make(map[common.Hash]txMatadata), + txIndexMap: make(map[common.Hash]resequenceTxMatadata), }, expectedTxCount: 0, expectedError: false, @@ -366,7 +366,7 @@ func TestResequenceBatchJob_YieldAndUpdate(t *testing.T) { batchToProcess: batch, StartBlockIndex: 0, StartTxIndex: 1, // Start at block 0, index 1 - txIndexMap: make(map[common.Hash]txMatadata), + txIndexMap: make(map[common.Hash]resequenceTxMatadata), } processTransactions := func(txs []types.Transaction) { @@ -443,7 +443,7 @@ func TestResequenceBatchJob_YieldAndUpdate(t *testing.T) { } // Verify txIndexMap - expectedTxIndexMap := map[common.Hash]txMatadata{ + expectedTxIndexMap := map[common.Hash]resequenceTxMatadata{ common.HexToHash("0"): {0, 0}, common.HexToHash("1"): {1, 0}, common.HexToHash("2"): {2, 0}, From cdb6ec8bf5739c625113df4748b9e650b23b3146 Mon Sep 17 00:00:00 2001 From: Kamen Stoykov Date: Mon, 30 Sep 2024 10:45:31 +0000 Subject: [PATCH 2/3] fix(sequencer):remove double import --- zk/stages/stage_sequence_execute_utils.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zk/stages/stage_sequence_execute_utils.go b/zk/stages/stage_sequence_execute_utils.go index 1a0cee26a81..8289c55533e 100644 --- a/zk/stages/stage_sequence_execute_utils.go +++ b/zk/stages/stage_sequence_execute_utils.go @@ -31,7 +31,6 @@ import ( "github.com/ledgerwatch/erigon/zk/datastream/server" "github.com/ledgerwatch/erigon/zk/hermez_db" verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" - "github.com/ledgerwatch/erigon/zk/tx" zktx "github.com/ledgerwatch/erigon/zk/tx" "github.com/ledgerwatch/erigon/zk/txpool" zktypes "github.com/ledgerwatch/erigon/zk/types" @@ -477,7 +476,7 @@ func newBlockDataChecker() *BlockDataChecker { // adds bytes amounting to the block data and checks if the limit is reached // if the limit is reached, the data is not added, so this can be reused again for next check func (bdc *BlockDataChecker) AddBlockStartData() bool { - blockStartBytesAmount := tx.START_BLOCK_BATCH_L2_DATA_SIZE // tx.GenerateStartBlockBatchL2Data(deltaTimestamp, l1InfoTreeIndex) returns 65 long byte array + blockStartBytesAmount := zktx.START_BLOCK_BATCH_L2_DATA_SIZE // tx.GenerateStartBlockBatchL2Data(deltaTimestamp, l1InfoTreeIndex) returns 65 long byte array // add in the changeL2Block transaction if bdc.counter+blockStartBytesAmount > bdc.limit { return true From 5978d857a0a8465fd35d60aeb8d04069a10b374c Mon Sep 17 00:00:00 2001 From: Kamen Stoykov Date: Mon, 30 Sep 2024 11:00:36 +0000 Subject: [PATCH 3/3] chore(sequencer): typo --- zk/stages/stage_sequence_execute_state.go | 10 +++++----- zk/stages/stage_sequence_execute_utils_test.go | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/zk/stages/stage_sequence_execute_state.go b/zk/stages/stage_sequence_execute_state.go index 291bd78e462..03b37444581 100644 --- a/zk/stages/stage_sequence_execute_state.go +++ b/zk/stages/stage_sequence_execute_state.go @@ -309,7 +309,7 @@ func (bbe *BuiltBlockElements) onFinishAddingTransaction(transaction types.Trans bbe.effectiveGases = append(bbe.effectiveGases, effectiveGas) } -type resequenceTxMatadata struct { +type resequenceTxMetadata struct { blockNum int txIndex int } @@ -318,7 +318,7 @@ type ResequenceBatchJob struct { batchToProcess []*dsTypes.FullL2Block StartBlockIndex int StartTxIndex int - txIndexMap map[common.Hash]resequenceTxMatadata + txIndexMap map[common.Hash]resequenceTxMetadata } func NewResequenceBatchJob(batch []*dsTypes.FullL2Block) *ResequenceBatchJob { @@ -326,7 +326,7 @@ func NewResequenceBatchJob(batch []*dsTypes.FullL2Block) *ResequenceBatchJob { batchToProcess: batch, StartBlockIndex: 0, StartTxIndex: 0, - txIndexMap: make(map[common.Hash]resequenceTxMatadata), + txIndexMap: make(map[common.Hash]resequenceTxMetadata), } } @@ -349,7 +349,7 @@ func (r *ResequenceBatchJob) YieldNextBlockTransactions(decoder zktx.TxDecoder) blockTransactions := make([]types.Transaction, 0) if r.HasMoreBlockToProcess() { block := r.CurrentBlock() - r.txIndexMap[block.L2Blockhash] = resequenceTxMatadata{r.StartBlockIndex, 0} + r.txIndexMap[block.L2Blockhash] = resequenceTxMetadata{r.StartBlockIndex, 0} for i := r.StartTxIndex; i < len(block.L2Txs); i++ { transaction := block.L2Txs[i] @@ -357,7 +357,7 @@ func (r *ResequenceBatchJob) YieldNextBlockTransactions(decoder zktx.TxDecoder) if err != nil { return nil, fmt.Errorf("decode tx error: %v", err) } - r.txIndexMap[tx.Hash()] = resequenceTxMatadata{r.StartBlockIndex, i} + r.txIndexMap[tx.Hash()] = resequenceTxMetadata{r.StartBlockIndex, i} blockTransactions = append(blockTransactions, tx) } } diff --git a/zk/stages/stage_sequence_execute_utils_test.go b/zk/stages/stage_sequence_execute_utils_test.go index 39fafa5b8bb..5193ba688df 100644 --- a/zk/stages/stage_sequence_execute_utils_test.go +++ b/zk/stages/stage_sequence_execute_utils_test.go @@ -321,7 +321,7 @@ func TestResequenceBatchJob_YieldNextBlockTransactions(t *testing.T) { }, StartBlockIndex: 0, StartTxIndex: 0, - txIndexMap: make(map[common.Hash]resequenceTxMatadata), + txIndexMap: make(map[common.Hash]resequenceTxMetadata), }, expectedTxCount: 2, expectedError: false, @@ -332,7 +332,7 @@ func TestResequenceBatchJob_YieldNextBlockTransactions(t *testing.T) { batchToProcess: []*dsTypes.FullL2Block{{}}, StartBlockIndex: 1, StartTxIndex: 0, - txIndexMap: make(map[common.Hash]resequenceTxMatadata), + txIndexMap: make(map[common.Hash]resequenceTxMetadata), }, expectedTxCount: 0, expectedError: false, @@ -366,7 +366,7 @@ func TestResequenceBatchJob_YieldAndUpdate(t *testing.T) { batchToProcess: batch, StartBlockIndex: 0, StartTxIndex: 1, // Start at block 0, index 1 - txIndexMap: make(map[common.Hash]resequenceTxMatadata), + txIndexMap: make(map[common.Hash]resequenceTxMetadata), } processTransactions := func(txs []types.Transaction) { @@ -443,7 +443,7 @@ func TestResequenceBatchJob_YieldAndUpdate(t *testing.T) { } // Verify txIndexMap - expectedTxIndexMap := map[common.Hash]resequenceTxMatadata{ + expectedTxIndexMap := map[common.Hash]resequenceTxMetadata{ common.HexToHash("0"): {0, 0}, common.HexToHash("1"): {1, 0}, common.HexToHash("2"): {2, 0},