diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 021553ec9fa..7a9f2818e0c 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -493,9 +493,14 @@ var ( } SequencerBatchVerificationTimeout = cli.StringFlag{ Name: "zkevm.sequencer-batch-verification-timeout", - Usage: "This is a maximum time that a batch verification could take. Including retries. This could be interpreted as maximum that that the sequencer can run without executor. Setting it to 0s will mean infinite timeout. Defaults to 30min", + Usage: "This is a maximum time that a batch verification could take in terms of executors' errors. Including retries. This could be interpreted as `maximum that that the sequencer can run without executor`. Setting it to 0s will mean infinite timeout. Defaults to 30min", Value: "30m", } + SequencerBatchVerificationRetries = cli.StringFlag{ + Name: "zkevm.sequencer-batch-verification-retries", + Usage: "Number of attempts that a batch will re-run in case of an internal (not executors') error. This could be interpreted as `maximum attempts to send a batch for verification`. Setting it to -1 will mean unlimited attempts. Defaults to 3", + Value: "3", + } SequencerTimeoutOnEmptyTxPool = cli.StringFlag{ Name: "zkevm.sequencer-timeout-on-empty-tx-pool", Usage: "Timeout before requesting txs from the txpool if none were found before. Defaults to 250ms", diff --git a/eth/ethconfig/config_zkevm.go b/eth/ethconfig/config_zkevm.go index 32de04302b2..4b7b1e155dd 100644 --- a/eth/ethconfig/config_zkevm.go +++ b/eth/ethconfig/config_zkevm.go @@ -37,6 +37,7 @@ type Zk struct { SequencerBlockSealTime time.Duration SequencerBatchSealTime time.Duration SequencerBatchVerificationTimeout time.Duration + SequencerBatchVerificationRetries int SequencerTimeoutOnEmptyTxPool time.Duration SequencerHaltOnBatchNumber uint64 SequencerResequence bool diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index 0a04b9db012..0ce2c05615c 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -75,6 +75,7 @@ func (s *StageState) IntermediateHashesAt(db kv.Getter) (uint64, error) { type Unwinder interface { // UnwindTo begins staged sync unwind to the specified block. UnwindTo(unwindPoint uint64, badBlock libcommon.Hash) + IsUnwindSet() bool } // UnwindState contains the information about unwind. diff --git a/eth/stagedsync/sync.go b/eth/stagedsync/sync.go index caaf1e907ca..4fb793a393b 100644 --- a/eth/stagedsync/sync.go +++ b/eth/stagedsync/sync.go @@ -113,6 +113,10 @@ func (s *Sync) UnwindTo(unwindPoint uint64, badBlock libcommon.Hash) { s.badBlock = badBlock } +func (s *Sync) IsUnwindSet() bool { + return s.unwindPoint != nil +} + func (s *Sync) IsDone() bool { return s.currentStage >= uint(len(s.stages)) && s.unwindPoint == nil } diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 6f8fabc694b..a517b260c30 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -200,6 +200,7 @@ var DefaultFlags = []cli.Flag{ &utils.SequencerBlockSealTime, &utils.SequencerBatchSealTime, &utils.SequencerBatchVerificationTimeout, + &utils.SequencerBatchVerificationRetries, &utils.SequencerTimeoutOnEmptyTxPool, &utils.SequencerHaltOnBatchNumber, &utils.SequencerResequence, diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index 4d979d04d7e..b9e750097b1 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -144,6 +144,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { SequencerBlockSealTime: sequencerBlockSealTime, SequencerBatchSealTime: sequencerBatchSealTime, SequencerBatchVerificationTimeout: sequencerBatchVerificationTimeout, + SequencerBatchVerificationRetries: ctx.Int(utils.SequencerBatchVerificationRetries.Name), SequencerTimeoutOnEmptyTxPool: sequencerTimeoutOnEmptyTxPool, SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name), SequencerResequence: ctx.Bool(utils.SequencerResequence.Name), diff --git a/zk/legacy_executor_verifier/legacy_executor_verifier.go b/zk/legacy_executor_verifier/legacy_executor_verifier.go index 9b081b0c01e..7084d33aefe 100644 --- a/zk/legacy_executor_verifier/legacy_executor_verifier.go +++ b/zk/legacy_executor_verifier/legacy_executor_verifier.go @@ -35,13 +35,14 @@ type VerifierRequest struct { Counters map[string]int creationTime time.Time timeout time.Duration + retries int } func NewVerifierRequest(forkId, batchNumber uint64, blockNumbers []uint64, stateRoot common.Hash, counters map[string]int) *VerifierRequest { - return NewVerifierRequestWithTimeout(forkId, batchNumber, blockNumbers, stateRoot, counters, 0) + return NewVerifierRequestWithLimits(forkId, batchNumber, blockNumbers, stateRoot, counters, 0, -1) } -func NewVerifierRequestWithTimeout(forkId, batchNumber uint64, blockNumbers []uint64, stateRoot common.Hash, counters map[string]int, timeout time.Duration) *VerifierRequest { +func NewVerifierRequestWithLimits(forkId, batchNumber uint64, blockNumbers []uint64, stateRoot common.Hash, counters map[string]int, timeout time.Duration, retries int) *VerifierRequest { return &VerifierRequest{ BatchNumber: batchNumber, BlockNumbers: blockNumbers, @@ -50,6 +51,7 @@ func NewVerifierRequestWithTimeout(forkId, batchNumber uint64, blockNumbers []ui Counters: counters, creationTime: time.Now(), timeout: timeout, + retries: retries, } } @@ -61,6 +63,17 @@ func (vr *VerifierRequest) IsOverdue() bool { return time.Since(vr.creationTime) > vr.timeout } +func (vr *VerifierRequest) IncrementAndValidateRetries() bool { + if vr.retries == -1 { + return true + } + + if vr.retries > 0 { + vr.retries-- + } + return vr.retries > 0 +} + func (vr *VerifierRequest) GetFirstBlockNumber() uint64 { return vr.BlockNumbers[0] } @@ -78,17 +91,27 @@ type VerifierResponse struct { } type VerifierBundle struct { - Request *VerifierRequest - Response *VerifierResponse + Request *VerifierRequest + Response *VerifierResponse + readyForSendingRequest bool } -func NewVerifierBundle(request *VerifierRequest, response *VerifierResponse) *VerifierBundle { +func NewVerifierBundle(request *VerifierRequest, response *VerifierResponse, readyForSendingRequest bool) *VerifierBundle { return &VerifierBundle{ - Request: request, - Response: response, + Request: request, + Response: response, + readyForSendingRequest: readyForSendingRequest, } } +func (vb *VerifierBundle) markAsreadyForSendingRequest() { + vb.readyForSendingRequest = true +} + +func (vb *VerifierBundle) isInternalError() bool { + return !vb.readyForSendingRequest +} + type WitnessGenerator interface { GetWitnessByBlockRange(tx kv.Tx, ctx context.Context, startBlock, endBlock uint64, debug, witnessFull bool) ([]byte, error) } @@ -138,10 +161,11 @@ func (v *LegacyExecutorVerifier) StartAsyncVerification( blockNumbers []uint64, useRemoteExecutor bool, requestTimeout time.Duration, + retries int, ) { var promise *Promise[*VerifierBundle] - request := NewVerifierRequestWithTimeout(forkId, batchNumber, blockNumbers, stateRoot, counters, requestTimeout) + request := NewVerifierRequestWithLimits(forkId, batchNumber, blockNumbers, stateRoot, counters, requestTimeout, retries) if useRemoteExecutor { promise = v.VerifyAsync(request) } else { @@ -200,7 +224,7 @@ func (v *LegacyExecutorVerifier) VerifyAsync(request *VerifierRequest) *Promise[ // eager promise will do the work as soon as called in a goroutine, then we can retrieve the result later // ProcessResultsSequentiallyUnsafe relies on the fact that this function returns ALWAYS non-verifierBundle and error. The only exception is the case when verifications has been canceled. Only then the verifierBundle can be nil return NewPromise[*VerifierBundle](func() (*VerifierBundle, error) { - verifierBundle := NewVerifierBundle(request, nil) + verifierBundle := NewVerifierBundle(request, nil, false) blockNumbers := verifierBundle.Request.BlockNumbers e := v.GetNextOnlineAvailableExecutor() @@ -274,6 +298,8 @@ func (v *LegacyExecutorVerifier) VerifyAsync(request *VerifierRequest) *Promise[ return verifierBundle, err } + verifierBundle.markAsreadyForSendingRequest() + ok, executorResponse, executorErr, generalErr := e.Verify(payload, request, previousBlock.Root()) if generalErr != nil { return verifierBundle, generalErr @@ -308,7 +334,7 @@ func (v *LegacyExecutorVerifier) VerifyWithoutExecutor(request *VerifierRequest) ExecutorResponse: nil, Error: nil, } - return NewVerifierBundle(request, response), nil + return NewVerifierBundle(request, response, true), nil }) promise.Wait() @@ -322,11 +348,12 @@ func (v *LegacyExecutorVerifier) HasPendingVerifications() bool { return len(v.promises) > 0 } -func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([]*VerifierBundle, error) { +func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([]*VerifierBundle, *VerifierBundle) { v.mtxPromises.Lock() defer v.mtxPromises.Unlock() var verifierResponse []*VerifierBundle + var verifierBundleForUnwind *VerifierBundle // not a stop signal, so we can start to process our promises now for idx, promise := range v.promises { @@ -346,15 +373,23 @@ func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([ log.Error("error on our end while preparing the verification request, re-queueing the task", "err", err) - if verifierBundle.Request.IsOverdue() { - // signal an error, the caller can check on this and stop the process if needs be - return nil, fmt.Errorf("error: batch %d couldn't be processed in 30 minutes", verifierBundle.Request.BatchNumber) + if verifierBundle.isInternalError() { + canRetry := verifierBundle.Request.IncrementAndValidateRetries() + if !canRetry { + verifierBundleForUnwind = verifierBundle + break + } + } else { + if verifierBundle.Request.IsOverdue() { + verifierBundleForUnwind = verifierBundle + break + } } // re-queue the task - it should be safe to replace the index of the slice here as we only add to it v.promises[idx] = promise.CloneAndRerun() - // break now as we know we can't proceed here until this promise is attempted again + // we have a problamtic bundle so we cannot processed next, because it should break the sequentiality break } @@ -365,7 +400,7 @@ func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([ // remove processed promises from the list v.promises = v.promises[len(verifierResponse):] - return verifierResponse, nil + return verifierResponse, verifierBundleForUnwind } func (v *LegacyExecutorVerifier) Wait() { diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index ad533588a0d..075d67c3ff0 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -19,8 +19,7 @@ import ( "github.com/ledgerwatch/erigon/zk/utils" ) -// we must perform execution and datastream alignment only during first run of this stage -var shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart = true +var shouldCheckForExecutionAndDataStreamAlighment = true func SpawnSequencingStage( s *stagedsync.StageState, @@ -132,7 +131,7 @@ func sequencingBatchStep( return sdb.tx.Commit() } - if shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart { + if shouldCheckForExecutionAndDataStreamAlighment { // handle cases where the last batch wasn't committed to the data stream. // this could occur because we're migrating from an RPC node to a sequencer // or because the sequencer was restarted and not all processes completed (like waiting from remote executor) @@ -142,20 +141,20 @@ func sequencingBatchStep( if !batchState.isAnyRecovery() { isUnwinding, err := alignExecutionToDatastream(batchContext, executionAt, u) if err != nil { - // do not set shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart=false because of the error + // do not set shouldCheckForExecutionAndDataStreamAlighment=false because of the error return err } if isUnwinding { err = sdb.tx.Commit() if err != nil { - // do not set shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart=false because of the error + // do not set shouldCheckForExecutionAndDataStreamAlighment=false because of the error return err } - shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart = false + shouldCheckForExecutionAndDataStreamAlighment = false return nil } } - shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart = false + shouldCheckForExecutionAndDataStreamAlighment = false } tryHaltSequencer(batchContext, batchState.batchNumber) @@ -508,7 +507,7 @@ func sequencingBatchStep( if err != nil { return err } - cfg.legacyVerifier.StartAsyncVerification(batchContext.s.LogPrefix(), batchState.forkId, batchState.batchNumber, block.Root(), counters.UsedAsMap(), batchState.builtBlocks, useExecutorForVerification, batchContext.cfg.zk.SequencerBatchVerificationTimeout) + cfg.legacyVerifier.StartAsyncVerification(batchContext.s.LogPrefix(), batchState.forkId, batchState.batchNumber, block.Root(), counters.UsedAsMap(), batchState.builtBlocks, useExecutorForVerification, batchContext.cfg.zk.SequencerBatchVerificationTimeout, batchContext.cfg.zk.SequencerBatchVerificationRetries) // check for new responses from the verifier needsUnwind, err := updateStreamAndCheckRollback(batchContext, batchState, streamWriter, u) diff --git a/zk/stages/stage_sequence_execute_batch.go b/zk/stages/stage_sequence_execute_batch.go index 80e2c7351b5..011fd186be4 100644 --- a/zk/stages/stage_sequence_execute_batch.go +++ b/zk/stages/stage_sequence_execute_batch.go @@ -9,6 +9,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk/l1_data" + verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" "github.com/ledgerwatch/log/v3" ) @@ -91,7 +92,7 @@ func updateStreamAndCheckRollback( streamWriter *SequencerBatchStreamWriter, u stagedsync.Unwinder, ) (bool, error) { - checkedVerifierBundles, err := streamWriter.CommitNewUpdates() + checkedVerifierBundles, verifierBundleForUnwind, err := streamWriter.CommitNewUpdates() if err != nil { return false, err } @@ -122,21 +123,35 @@ func updateStreamAndCheckRollback( return false, err } - unwindTo := verifierBundle.Request.GetLastBlockNumber() - 1 + err = markForUnwind(batchContext, streamWriter, u, verifierBundle) + return err == nil, err + } - // for unwind we supply the block number X-1 of the block we want to remove, but supply the hash of the block - // causing the unwind. - unwindHeader := rawdb.ReadHeaderByNumber(batchContext.sdb.tx, verifierBundle.Request.GetLastBlockNumber()) - if unwindHeader == nil { - return false, fmt.Errorf("could not find header for block %d", verifierBundle.Request.GetLastBlockNumber()) - } + if verifierBundleForUnwind != nil { + err = markForUnwind(batchContext, streamWriter, u, verifierBundleForUnwind) + return err == nil, err + } - log.Warn(fmt.Sprintf("[%s] Block is invalid - rolling back", batchContext.s.LogPrefix()), "badBlock", verifierBundle.Request.GetLastBlockNumber(), "unwindTo", unwindTo, "root", unwindHeader.Root) + return false, nil +} - u.UnwindTo(unwindTo, unwindHeader.Hash()) - streamWriter.legacyVerifier.CancelAllRequests() - return true, nil +func markForUnwind( + batchContext *BatchContext, + streamWriter *SequencerBatchStreamWriter, + u stagedsync.Unwinder, + verifierBundle *verifier.VerifierBundle, +) error { + unwindTo := verifierBundle.Request.GetLastBlockNumber() - 1 + + // for unwind we supply the block number X-1 of the block we want to remove, but supply the hash of the block + // causing the unwind. + unwindHeader := rawdb.ReadHeaderByNumber(batchContext.sdb.tx, verifierBundle.Request.GetLastBlockNumber()) + if unwindHeader == nil { + return fmt.Errorf("could not find header for block %d", verifierBundle.Request.GetLastBlockNumber()) } - return false, nil + log.Warn(fmt.Sprintf("[%s] Block is invalid - rolling back", batchContext.s.LogPrefix()), "badBlock", verifierBundle.Request.GetLastBlockNumber(), "unwindTo", unwindTo, "root", unwindHeader.Root) + + u.UnwindTo(unwindTo, unwindHeader.Hash()) + return nil } diff --git a/zk/stages/stage_sequence_execute_data_stream.go b/zk/stages/stage_sequence_execute_data_stream.go index 9631b317d98..41b52d1adf6 100644 --- a/zk/stages/stage_sequence_execute_data_stream.go +++ b/zk/stages/stage_sequence_execute_data_stream.go @@ -37,13 +37,10 @@ func newSequencerBatchStreamWriter(batchContext *BatchContext, batchState *Batch } } -func (sbc *SequencerBatchStreamWriter) CommitNewUpdates() ([]*verifier.VerifierBundle, error) { - verifierBundles, err := sbc.legacyVerifier.ProcessResultsSequentially(sbc.logPrefix) - if err != nil { - return nil, err - } - - return sbc.writeBlockDetailsToDatastream(verifierBundles) +func (sbc *SequencerBatchStreamWriter) CommitNewUpdates() ([]*verifier.VerifierBundle, *verifier.VerifierBundle, error) { + verifierBundles, verifierBundleForUnwind := sbc.legacyVerifier.ProcessResultsSequentially(sbc.logPrefix) + checkedVerifierBundles, err := sbc.writeBlockDetailsToDatastream(verifierBundles) + return checkedVerifierBundles, verifierBundleForUnwind, err } func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBundles []*verifier.VerifierBundle) ([]*verifier.VerifierBundle, error) { diff --git a/zk/stages/stages.go b/zk/stages/stages.go index ff24e6a01fd..4fc497f0f09 100644 --- a/zk/stages/stages.go +++ b/zk/stages/stages.go @@ -104,7 +104,13 @@ func SequencerZkStages( ID: stages2.Execution, Description: "Sequence transactions", Forward: func(firstCycle bool, badBlockUnwind bool, s *stages.StageState, u stages.Unwinder, tx kv.RwTx, quiet bool) error { - return SpawnSequencingStage(s, u, ctx, exec, history, quiet) + sequencerErr := SpawnSequencingStage(s, u, ctx, exec, history, quiet) + if sequencerErr != nil || u.IsUnwindSet() { + exec.legacyVerifier.CancelAllRequests() + // on the begining of next iteration the EXECUTION will be aligned to DS + shouldCheckForExecutionAndDataStreamAlighment = true + } + return sequencerErr }, Unwind: func(firstCycle bool, u *stages.UnwindState, s *stages.StageState, tx kv.RwTx) error { return UnwindSequenceExecutionStage(u, s, tx, ctx, exec, firstCycle)