Skip to content

Commit

Permalink
feat(sequencer): update handing of errors (#1277)
Browse files Browse the repository at this point in the history
  • Loading branch information
kstoykov authored and cffls committed Oct 15, 2024
1 parent 396cd19 commit 22b09d4
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 46 deletions.
7 changes: 6 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,14 @@ var (
}
SequencerBatchVerificationTimeout = cli.StringFlag{
Name: "zkevm.sequencer-batch-verification-timeout",
Usage: "This is a maximum time that a batch verification could take. Including retries. This could be interpreted as maximum that that the sequencer can run without executor. Setting it to 0s will mean infinite timeout. Defaults to 30min",
Usage: "This is a maximum time that a batch verification could take in terms of executors' errors. Including retries. This could be interpreted as `maximum that that the sequencer can run without executor`. Setting it to 0s will mean infinite timeout. Defaults to 30min",
Value: "30m",
}
SequencerBatchVerificationRetries = cli.StringFlag{
Name: "zkevm.sequencer-batch-verification-retries",
Usage: "Number of attempts that a batch will re-run in case of an internal (not executors') error. This could be interpreted as `maximum attempts to send a batch for verification`. Setting it to -1 will mean unlimited attempts. Defaults to 3",
Value: "3",
}
SequencerTimeoutOnEmptyTxPool = cli.StringFlag{
Name: "zkevm.sequencer-timeout-on-empty-tx-pool",
Usage: "Timeout before requesting txs from the txpool if none were found before. Defaults to 250ms",
Expand Down
1 change: 1 addition & 0 deletions eth/ethconfig/config_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Zk struct {
SequencerBlockSealTime time.Duration
SequencerBatchSealTime time.Duration
SequencerBatchVerificationTimeout time.Duration
SequencerBatchVerificationRetries int
SequencerTimeoutOnEmptyTxPool time.Duration
SequencerHaltOnBatchNumber uint64
SequencerResequence bool
Expand Down
1 change: 1 addition & 0 deletions eth/stagedsync/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (s *StageState) IntermediateHashesAt(db kv.Getter) (uint64, error) {
type Unwinder interface {
// UnwindTo begins staged sync unwind to the specified block.
UnwindTo(unwindPoint uint64, badBlock libcommon.Hash)
IsUnwindSet() bool
}

// UnwindState contains the information about unwind.
Expand Down
4 changes: 4 additions & 0 deletions eth/stagedsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (s *Sync) UnwindTo(unwindPoint uint64, badBlock libcommon.Hash) {
s.badBlock = badBlock
}

func (s *Sync) IsUnwindSet() bool {
return s.unwindPoint != nil
}

func (s *Sync) IsDone() bool {
return s.currentStage >= uint(len(s.stages)) && s.unwindPoint == nil
}
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ var DefaultFlags = []cli.Flag{
&utils.SequencerBlockSealTime,
&utils.SequencerBatchSealTime,
&utils.SequencerBatchVerificationTimeout,
&utils.SequencerBatchVerificationRetries,
&utils.SequencerTimeoutOnEmptyTxPool,
&utils.SequencerHaltOnBatchNumber,
&utils.SequencerResequence,
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/flags_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
SequencerBlockSealTime: sequencerBlockSealTime,
SequencerBatchSealTime: sequencerBatchSealTime,
SequencerBatchVerificationTimeout: sequencerBatchVerificationTimeout,
SequencerBatchVerificationRetries: ctx.Int(utils.SequencerBatchVerificationRetries.Name),
SequencerTimeoutOnEmptyTxPool: sequencerTimeoutOnEmptyTxPool,
SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name),
SequencerResequence: ctx.Bool(utils.SequencerResequence.Name),
Expand Down
67 changes: 51 additions & 16 deletions zk/legacy_executor_verifier/legacy_executor_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ type VerifierRequest struct {
Counters map[string]int
creationTime time.Time
timeout time.Duration
retries int
}

func NewVerifierRequest(forkId, batchNumber uint64, blockNumbers []uint64, stateRoot common.Hash, counters map[string]int) *VerifierRequest {
return NewVerifierRequestWithTimeout(forkId, batchNumber, blockNumbers, stateRoot, counters, 0)
return NewVerifierRequestWithLimits(forkId, batchNumber, blockNumbers, stateRoot, counters, 0, -1)
}

func NewVerifierRequestWithTimeout(forkId, batchNumber uint64, blockNumbers []uint64, stateRoot common.Hash, counters map[string]int, timeout time.Duration) *VerifierRequest {
func NewVerifierRequestWithLimits(forkId, batchNumber uint64, blockNumbers []uint64, stateRoot common.Hash, counters map[string]int, timeout time.Duration, retries int) *VerifierRequest {
return &VerifierRequest{
BatchNumber: batchNumber,
BlockNumbers: blockNumbers,
Expand All @@ -50,6 +51,7 @@ func NewVerifierRequestWithTimeout(forkId, batchNumber uint64, blockNumbers []ui
Counters: counters,
creationTime: time.Now(),
timeout: timeout,
retries: retries,
}
}

Expand All @@ -61,6 +63,17 @@ func (vr *VerifierRequest) IsOverdue() bool {
return time.Since(vr.creationTime) > vr.timeout
}

func (vr *VerifierRequest) IncrementAndValidateRetries() bool {
if vr.retries == -1 {
return true
}

if vr.retries > 0 {
vr.retries--
}
return vr.retries > 0
}

func (vr *VerifierRequest) GetFirstBlockNumber() uint64 {
return vr.BlockNumbers[0]
}
Expand All @@ -78,17 +91,27 @@ type VerifierResponse struct {
}

type VerifierBundle struct {
Request *VerifierRequest
Response *VerifierResponse
Request *VerifierRequest
Response *VerifierResponse
readyForSendingRequest bool
}

func NewVerifierBundle(request *VerifierRequest, response *VerifierResponse) *VerifierBundle {
func NewVerifierBundle(request *VerifierRequest, response *VerifierResponse, readyForSendingRequest bool) *VerifierBundle {
return &VerifierBundle{
Request: request,
Response: response,
Request: request,
Response: response,
readyForSendingRequest: readyForSendingRequest,
}
}

func (vb *VerifierBundle) markAsreadyForSendingRequest() {
vb.readyForSendingRequest = true
}

func (vb *VerifierBundle) isInternalError() bool {
return !vb.readyForSendingRequest
}

type WitnessGenerator interface {
GetWitnessByBlockRange(tx kv.Tx, ctx context.Context, startBlock, endBlock uint64, debug, witnessFull bool) ([]byte, error)
}
Expand Down Expand Up @@ -138,10 +161,11 @@ func (v *LegacyExecutorVerifier) StartAsyncVerification(
blockNumbers []uint64,
useRemoteExecutor bool,
requestTimeout time.Duration,
retries int,
) {
var promise *Promise[*VerifierBundle]

request := NewVerifierRequestWithTimeout(forkId, batchNumber, blockNumbers, stateRoot, counters, requestTimeout)
request := NewVerifierRequestWithLimits(forkId, batchNumber, blockNumbers, stateRoot, counters, requestTimeout, retries)
if useRemoteExecutor {
promise = v.VerifyAsync(request)
} else {
Expand Down Expand Up @@ -200,7 +224,7 @@ func (v *LegacyExecutorVerifier) VerifyAsync(request *VerifierRequest) *Promise[
// eager promise will do the work as soon as called in a goroutine, then we can retrieve the result later
// ProcessResultsSequentiallyUnsafe relies on the fact that this function returns ALWAYS non-verifierBundle and error. The only exception is the case when verifications has been canceled. Only then the verifierBundle can be nil
return NewPromise[*VerifierBundle](func() (*VerifierBundle, error) {
verifierBundle := NewVerifierBundle(request, nil)
verifierBundle := NewVerifierBundle(request, nil, false)
blockNumbers := verifierBundle.Request.BlockNumbers

e := v.GetNextOnlineAvailableExecutor()
Expand Down Expand Up @@ -274,6 +298,8 @@ func (v *LegacyExecutorVerifier) VerifyAsync(request *VerifierRequest) *Promise[
return verifierBundle, err
}

verifierBundle.markAsreadyForSendingRequest()

ok, executorResponse, executorErr, generalErr := e.Verify(payload, request, previousBlock.Root())
if generalErr != nil {
return verifierBundle, generalErr
Expand Down Expand Up @@ -308,7 +334,7 @@ func (v *LegacyExecutorVerifier) VerifyWithoutExecutor(request *VerifierRequest)
ExecutorResponse: nil,
Error: nil,
}
return NewVerifierBundle(request, response), nil
return NewVerifierBundle(request, response, true), nil
})
promise.Wait()

Expand All @@ -322,11 +348,12 @@ func (v *LegacyExecutorVerifier) HasPendingVerifications() bool {
return len(v.promises) > 0
}

func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([]*VerifierBundle, error) {
func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([]*VerifierBundle, *VerifierBundle) {
v.mtxPromises.Lock()
defer v.mtxPromises.Unlock()

var verifierResponse []*VerifierBundle
var verifierBundleForUnwind *VerifierBundle

// not a stop signal, so we can start to process our promises now
for idx, promise := range v.promises {
Expand All @@ -346,15 +373,23 @@ func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([

log.Error("error on our end while preparing the verification request, re-queueing the task", "err", err)

if verifierBundle.Request.IsOverdue() {
// signal an error, the caller can check on this and stop the process if needs be
return nil, fmt.Errorf("error: batch %d couldn't be processed in 30 minutes", verifierBundle.Request.BatchNumber)
if verifierBundle.isInternalError() {
canRetry := verifierBundle.Request.IncrementAndValidateRetries()
if !canRetry {
verifierBundleForUnwind = verifierBundle
break
}
} else {
if verifierBundle.Request.IsOverdue() {
verifierBundleForUnwind = verifierBundle
break
}
}

// re-queue the task - it should be safe to replace the index of the slice here as we only add to it
v.promises[idx] = promise.CloneAndRerun()

// break now as we know we can't proceed here until this promise is attempted again
// we have a problamtic bundle so we cannot processed next, because it should break the sequentiality
break
}

Expand All @@ -365,7 +400,7 @@ func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([
// remove processed promises from the list
v.promises = v.promises[len(verifierResponse):]

return verifierResponse, nil
return verifierResponse, verifierBundleForUnwind
}

func (v *LegacyExecutorVerifier) Wait() {
Expand Down
15 changes: 7 additions & 8 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import (
"github.com/ledgerwatch/erigon/zk/utils"
)

// we must perform execution and datastream alignment only during first run of this stage
var shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart = true
var shouldCheckForExecutionAndDataStreamAlighment = true

func SpawnSequencingStage(
s *stagedsync.StageState,
Expand Down Expand Up @@ -132,7 +131,7 @@ func sequencingBatchStep(
return sdb.tx.Commit()
}

if shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart {
if shouldCheckForExecutionAndDataStreamAlighment {
// handle cases where the last batch wasn't committed to the data stream.
// this could occur because we're migrating from an RPC node to a sequencer
// or because the sequencer was restarted and not all processes completed (like waiting from remote executor)
Expand All @@ -142,20 +141,20 @@ func sequencingBatchStep(
if !batchState.isAnyRecovery() {
isUnwinding, err := alignExecutionToDatastream(batchContext, executionAt, u)
if err != nil {
// do not set shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart=false because of the error
// do not set shouldCheckForExecutionAndDataStreamAlighment=false because of the error
return err
}
if isUnwinding {
err = sdb.tx.Commit()
if err != nil {
// do not set shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart=false because of the error
// do not set shouldCheckForExecutionAndDataStreamAlighment=false because of the error
return err
}
shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart = false
shouldCheckForExecutionAndDataStreamAlighment = false
return nil
}
}
shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart = false
shouldCheckForExecutionAndDataStreamAlighment = false
}

tryHaltSequencer(batchContext, batchState.batchNumber)
Expand Down Expand Up @@ -508,7 +507,7 @@ func sequencingBatchStep(
if err != nil {
return err
}
cfg.legacyVerifier.StartAsyncVerification(batchContext.s.LogPrefix(), batchState.forkId, batchState.batchNumber, block.Root(), counters.UsedAsMap(), batchState.builtBlocks, useExecutorForVerification, batchContext.cfg.zk.SequencerBatchVerificationTimeout)
cfg.legacyVerifier.StartAsyncVerification(batchContext.s.LogPrefix(), batchState.forkId, batchState.batchNumber, block.Root(), counters.UsedAsMap(), batchState.builtBlocks, useExecutorForVerification, batchContext.cfg.zk.SequencerBatchVerificationTimeout, batchContext.cfg.zk.SequencerBatchVerificationRetries)

// check for new responses from the verifier
needsUnwind, err := updateStreamAndCheckRollback(batchContext, batchState, streamWriter, u)
Expand Down
41 changes: 28 additions & 13 deletions zk/stages/stage_sequence_execute_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/zk/l1_data"
verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -91,7 +92,7 @@ func updateStreamAndCheckRollback(
streamWriter *SequencerBatchStreamWriter,
u stagedsync.Unwinder,
) (bool, error) {
checkedVerifierBundles, err := streamWriter.CommitNewUpdates()
checkedVerifierBundles, verifierBundleForUnwind, err := streamWriter.CommitNewUpdates()
if err != nil {
return false, err
}
Expand Down Expand Up @@ -122,21 +123,35 @@ func updateStreamAndCheckRollback(
return false, err
}

unwindTo := verifierBundle.Request.GetLastBlockNumber() - 1
err = markForUnwind(batchContext, streamWriter, u, verifierBundle)
return err == nil, err
}

// for unwind we supply the block number X-1 of the block we want to remove, but supply the hash of the block
// causing the unwind.
unwindHeader := rawdb.ReadHeaderByNumber(batchContext.sdb.tx, verifierBundle.Request.GetLastBlockNumber())
if unwindHeader == nil {
return false, fmt.Errorf("could not find header for block %d", verifierBundle.Request.GetLastBlockNumber())
}
if verifierBundleForUnwind != nil {
err = markForUnwind(batchContext, streamWriter, u, verifierBundleForUnwind)
return err == nil, err
}

log.Warn(fmt.Sprintf("[%s] Block is invalid - rolling back", batchContext.s.LogPrefix()), "badBlock", verifierBundle.Request.GetLastBlockNumber(), "unwindTo", unwindTo, "root", unwindHeader.Root)
return false, nil
}

u.UnwindTo(unwindTo, unwindHeader.Hash())
streamWriter.legacyVerifier.CancelAllRequests()
return true, nil
func markForUnwind(
batchContext *BatchContext,
streamWriter *SequencerBatchStreamWriter,
u stagedsync.Unwinder,
verifierBundle *verifier.VerifierBundle,
) error {
unwindTo := verifierBundle.Request.GetLastBlockNumber() - 1

// for unwind we supply the block number X-1 of the block we want to remove, but supply the hash of the block
// causing the unwind.
unwindHeader := rawdb.ReadHeaderByNumber(batchContext.sdb.tx, verifierBundle.Request.GetLastBlockNumber())
if unwindHeader == nil {
return fmt.Errorf("could not find header for block %d", verifierBundle.Request.GetLastBlockNumber())
}

return false, nil
log.Warn(fmt.Sprintf("[%s] Block is invalid - rolling back", batchContext.s.LogPrefix()), "badBlock", verifierBundle.Request.GetLastBlockNumber(), "unwindTo", unwindTo, "root", unwindHeader.Root)

u.UnwindTo(unwindTo, unwindHeader.Hash())
return nil
}
11 changes: 4 additions & 7 deletions zk/stages/stage_sequence_execute_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@ func newSequencerBatchStreamWriter(batchContext *BatchContext, batchState *Batch
}
}

func (sbc *SequencerBatchStreamWriter) CommitNewUpdates() ([]*verifier.VerifierBundle, error) {
verifierBundles, err := sbc.legacyVerifier.ProcessResultsSequentially(sbc.logPrefix)
if err != nil {
return nil, err
}

return sbc.writeBlockDetailsToDatastream(verifierBundles)
func (sbc *SequencerBatchStreamWriter) CommitNewUpdates() ([]*verifier.VerifierBundle, *verifier.VerifierBundle, error) {
verifierBundles, verifierBundleForUnwind := sbc.legacyVerifier.ProcessResultsSequentially(sbc.logPrefix)
checkedVerifierBundles, err := sbc.writeBlockDetailsToDatastream(verifierBundles)
return checkedVerifierBundles, verifierBundleForUnwind, err
}

func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBundles []*verifier.VerifierBundle) ([]*verifier.VerifierBundle, error) {
Expand Down
8 changes: 7 additions & 1 deletion zk/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ func SequencerZkStages(
ID: stages2.Execution,
Description: "Sequence transactions",
Forward: func(firstCycle bool, badBlockUnwind bool, s *stages.StageState, u stages.Unwinder, tx kv.RwTx, quiet bool) error {
return SpawnSequencingStage(s, u, ctx, exec, history, quiet)
sequencerErr := SpawnSequencingStage(s, u, ctx, exec, history, quiet)
if sequencerErr != nil || u.IsUnwindSet() {
exec.legacyVerifier.CancelAllRequests()
// on the begining of next iteration the EXECUTION will be aligned to DS
shouldCheckForExecutionAndDataStreamAlighment = true
}
return sequencerErr
},
Unwind: func(firstCycle bool, u *stages.UnwindState, s *stages.StageState, tx kv.RwTx) error {
return UnwindSequenceExecutionStage(u, s, tx, ctx, exec, firstCycle)
Expand Down

0 comments on commit 22b09d4

Please sign in to comment.