Skip to content

Commit

Permalink
remove stage.Done() method (erigontech#2390)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Jul 18, 2021
1 parent 7666552 commit 58e22c5
Show file tree
Hide file tree
Showing 27 changed files with 76 additions and 202 deletions.
8 changes: 4 additions & 4 deletions cmd/integration/commands/snapshot_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,24 +209,24 @@ func snapshotCheck(ctx context.Context, db ethdb.RwKV, isNew bool, tmpDir string

if isNew {
stage3 := stage(sync, tx, stages.Senders)
err = stage3.DoneAndUpdate(tx, lastBlockHeaderNumber)
err = stage3.Update(tx, lastBlockHeaderNumber)
if err != nil {
return err
}

stage4 := stage(sync, tx, stages.Execution)
err = stage4.DoneAndUpdate(tx, snapshotBlock)
err = stage4.Update(tx, snapshotBlock)
if err != nil {
return err
}
stage5 := stage(sync, tx, stages.HashState)
err = stage5.DoneAndUpdate(tx, snapshotBlock)
err = stage5.Update(tx, snapshotBlock)
if err != nil {
return err
}

stage6 := stage(sync, tx, stages.IntermediateHashes)
err = stage6.DoneAndUpdate(tx, snapshotBlock)
err = stage6.Update(tx, snapshotBlock)
if err != nil {
return err
}
Expand Down
43 changes: 31 additions & 12 deletions eth/stagedsync/stage.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stagedsync

import (
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb"
)
Expand Down Expand Up @@ -50,22 +51,40 @@ func (s *StageState) Update(db ethdb.Putter, newBlockNum uint64) error {
return stages.SaveStageProgress(db, s.ID, newBlockNum)
}

// Done makes sure that the stage execution is complete and proceeds to the next state.
// If Done() is not called and the stage `Forward` exits, then the same stage will be called again.
// This side effect is useful for something like block body download.
func (s *StageState) Done() {
s.state.NextStage()
}

// ExecutionAt gets the current state of the "Execution" stage, which block is currently executed.
func (s *StageState) ExecutionAt(db ethdb.KVGetter) (uint64, error) {
execution, err := stages.GetStageProgress(db, stages.Execution)
return execution, err
}

// DoneAndUpdate a convenience method combining both `Done()` and `Update()` calls together.
func (s *StageState) DoneAndUpdate(db ethdb.Putter, newBlockNum uint64) error {
err := stages.SaveStageProgress(db, s.ID, newBlockNum)
s.state.NextStage()
return err
// Unwinder allows the stage to cause an unwind.
type Unwinder interface {
// UnwindTo begins staged sync unwind to the specified block.
UnwindTo(unwindPoint uint64, badBlock common.Hash)
}

// UnwindState contains the information about unwind.
type UnwindState struct {
ID stages.SyncStage
// UnwindPoint is the block to unwind to.
UnwindPoint uint64
CurrentBlockNumber uint64
// If unwind is caused by a bad block, this hash is not empty
BadBlock common.Hash
state *Sync
}

func (u *UnwindState) LogPrefix() string { return u.state.LogPrefix() }

// Done updates the DB state of the stage.
func (u *UnwindState) Done(db ethdb.Putter) error {
return stages.SaveStageProgress(db, u.ID, u.UnwindPoint)
}

type PruneState struct {
ID stages.SyncStage
PrunePoint uint64 // PrunePoint is the block to prune to.
state *Sync
}

func (u *PruneState) LogPrefix() string { return u.state.LogPrefix() }
3 changes: 1 addition & 2 deletions eth/stagedsync/stage_blockhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func SpawnBlockHashStage(s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx c
}
headHash := rawdb.ReadHeaderByNumber(tx, headNumber).Hash()
if s.BlockNumber == headNumber {
s.Done()
return nil
}

Expand All @@ -75,7 +74,7 @@ func SpawnBlockHashStage(s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx c
); err != nil {
return err
}
if err = s.DoneAndUpdate(tx, headNumber); err != nil {
if err = s.Update(tx, headNumber); err != nil {
return err
}
if !useExternalTx {
Expand Down
3 changes: 1 addition & 2 deletions eth/stagedsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func BodiesForward(
}
bodyProgress = s.BlockNumber
if bodyProgress == headerProgress {
s.Done()
return nil
}
logPrefix := s.LogPrefix()
Expand Down Expand Up @@ -198,7 +197,7 @@ Loop:
d6 += time.Since(start)
stageBodiesGauge.Update(int64(bodyProgress))
}
if err := s.DoneAndUpdate(tx, bodyProgress); err != nil {
if err := s.Update(tx, bodyProgress); err != nil {
return err
}
if !useExternalTx {
Expand Down
1 change: 0 additions & 1 deletion eth/stagedsync/stage_bodies_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func StageSnapshotBodiesCfg(db ethdb.RwKV, snapshot ethconfig.Snapshot, client *
}

func SpawnBodiesSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg SnapshotBodiesCfg, ctx context.Context) error {
s.Done()
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions eth/stagedsync/stage_call_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,14 @@ func SpawnCallTraces(s *StageState, tx ethdb.RwTx, cfg CallTracesCfg, ctx contex
return fmt.Errorf("%s: getting last executed block: %w", logPrefix, err)
}
if endBlock == s.BlockNumber {
s.Done()
return nil
}

if err := promoteCallTraces(logPrefix, tx, s.BlockNumber+1, endBlock, bitmapsBufLimit, bitmapsFlushEvery, quit, cfg); err != nil {
return err
}

if err := s.DoneAndUpdate(tx, endBlock); err != nil {
if err := s.Update(tx, endBlock); err != nil {
return err
}
if !useExternalTx {
Expand Down
3 changes: 0 additions & 3 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx ethdb.RwTx, toBlock u
to = min(prevStageProgress, toBlock)
}
if to <= s.BlockNumber {
s.Done()
return nil
}
logPrefix := s.LogPrefix()
Expand Down Expand Up @@ -362,7 +361,6 @@ Loop:
}

log.Info(fmt.Sprintf("[%s] Completed on", logPrefix), "block", stageProgress)
s.Done()
return stoppedErr
}

Expand Down Expand Up @@ -436,7 +434,6 @@ func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, current
func UnwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) {
quit := ctx.Done()
if u.UnwindPoint >= s.BlockNumber {
s.Done()
return nil
}
useExternalTx := tx != nil
Expand Down
3 changes: 1 addition & 2 deletions eth/stagedsync/stage_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func FinishForward(s *StageState, tx ethdb.RwTx, cfg FinishCfg) error {
return err
}
if executionAt <= s.BlockNumber {
s.Done()
return nil
}

Expand All @@ -63,7 +62,7 @@ func FinishForward(s *StageState, tx ethdb.RwTx, cfg FinishCfg) error {
}
}
rawdb.WriteHeadBlockHash(tx, rawdb.ReadHeadHeaderHash(tx))
err = s.DoneAndUpdate(tx, executionAt)
err = s.Update(tx, executionAt)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions eth/stagedsync/stage_hashstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func SpawnHashStateStage(s *StageState, tx ethdb.RwTx, cfg HashStateCfg, ctx con
if s.BlockNumber == to {
// we already did hash check for this block
// we don't do the obvious `if s.BlockNumber > to` to support reorgs more naturally
s.Done()
return nil
}
if s.BlockNumber > to {
Expand All @@ -67,7 +66,7 @@ func SpawnHashStateStage(s *StageState, tx ethdb.RwTx, cfg HashStateCfg, ctx con
}
}

if err = s.DoneAndUpdate(tx, to); err != nil {
if err = s.Update(tx, to); err != nil {
return err
}

Expand Down
4 changes: 1 addition & 3 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func HeadersForward(
return err
}
}
s.Done()
return nil
}

Expand Down Expand Up @@ -191,7 +190,6 @@ func HeadersForward(
return fmt.Errorf("%s: failed to fix canonical chain: %w", logPrefix, err)
}
}
s.Done()
if !useExternalTx {
if err := tx.Commit(); err != nil {
return err
Expand Down Expand Up @@ -313,7 +311,7 @@ func HeadersUnwind(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HeadersCfg)
if err = rawdb.WriteHeadHeaderHash(tx, maxHash); err != nil {
return err
}
if err = s.DoneAndUpdate(tx, maxNum); err != nil {
if err = s.Update(tx, maxNum); err != nil {
return err
}
}
Expand Down
5 changes: 1 addition & 4 deletions eth/stagedsync/stage_headers_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func StageSnapshotHeadersCfg(db ethdb.RwKV, snapshot ethconfig.Snapshot, client
func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg SnapshotHeadersCfg, initial bool, ctx context.Context) error {
//generate snapshot only on initial mode
if !initial {
s.Done()
return nil
}

Expand All @@ -49,7 +48,6 @@ func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Snaps

//it's too early for snapshot
if to < snapshotsync.EpochSize {
s.Done()
return nil
}

Expand All @@ -63,7 +61,6 @@ func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Snaps
//So we have to move headers to snapshot right after headers stage.
//but we don't want to block not initial sync
if snapshotBlock <= currentSnapshotBlock {
s.Done()
return nil
}

Expand All @@ -88,7 +85,7 @@ func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Snaps
if err != nil {
return err
}
err = s.DoneAndUpdate(tx, snapshotBlock)
err = s.Update(tx, snapshotBlock)
if err != nil {
return err
}
Expand Down
6 changes: 2 additions & 4 deletions eth/stagedsync/stage_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func SpawnAccountHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx
return fmt.Errorf("%s: getting last executed block: %w", logPrefix, err)
}
if executionAt <= s.BlockNumber {
s.Done()
return nil
}

Expand All @@ -69,7 +68,7 @@ func SpawnAccountHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx
return fmt.Errorf("[%s] %w", logPrefix, err)
}

if err := s.DoneAndUpdate(tx, executionAt); err != nil {
if err := s.Update(tx, executionAt); err != nil {
return fmt.Errorf("[%s] %w", logPrefix, err)
}

Expand Down Expand Up @@ -99,7 +98,6 @@ func SpawnStorageHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx
return fmt.Errorf("%s: logs index: getting last executed block: %w", logPrefix, err)
}
if executionAt <= s.BlockNumber {
s.Done()
return nil
}

Expand All @@ -113,7 +111,7 @@ func SpawnStorageHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx
return fmt.Errorf("[%s] %w", logPrefix, err)
}

if err := s.DoneAndUpdate(tx, executionAt); err != nil {
if err := s.Update(tx, executionAt); err != nil {
return fmt.Errorf("[%s] %w", logPrefix, err)
}

Expand Down
4 changes: 1 addition & 3 deletions eth/stagedsync/stage_interhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx ethdb.RwTx, cfg
if s.BlockNumber == to {
// we already did hash check for this block
// we don't do the obvious `if s.BlockNumber > to` to support reorgs more naturally
s.Done()
return trie.EmptyRoot, nil
}

Expand Down Expand Up @@ -96,8 +95,7 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx ethdb.RwTx, cfg
log.Warn("Unwinding due to incorrect root hash", "to", to-1)
u.UnwindTo(to-1, headerHash)
}
s.Done()
} else if err = s.DoneAndUpdate(tx, to); err != nil {
} else if err = s.Update(tx, to); err != nil {
return trie.EmptyRoot, err
}
} else {
Expand Down
3 changes: 1 addition & 2 deletions eth/stagedsync/stage_log_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func SpawnLogIndex(s *StageState, tx ethdb.RwTx, cfg LogIndexCfg, ctx context.Co
return fmt.Errorf("%s: getting last executed block: %w", logPrefix, err)
}
if endBlock == s.BlockNumber {
s.Done()
return nil
}

Expand All @@ -72,7 +71,7 @@ func SpawnLogIndex(s *StageState, tx ethdb.RwTx, cfg LogIndexCfg, ctx context.Co
return err
}

if err := s.DoneAndUpdate(tx, endBlock); err != nil {
if err := s.Update(tx, endBlock); err != nil {
return err
}
if !useExternalTx {
Expand Down
1 change: 0 additions & 1 deletion eth/stagedsync/stage_mining_create_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ func SpawnMiningCreateBlockStage(s *StageState, tx ethdb.RwTx, cfg MiningCreateB

current.LocalTxs = types.NewTransactionsByPriceAndNonce(*signer, localTxs)
current.RemoteTxs = types.NewTransactionsByPriceAndNonce(*signer, remoteTxs)
s.Done()
fmt.Printf("aa: %t, %t,%t\n", current == nil, cfg.miner.MiningBlock == nil, current.Header == nil)
return nil
}
Expand Down
2 changes: 0 additions & 2 deletions eth/stagedsync/stage_mining_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func SpawnMiningExecStage(s *StageState, tx ethdb.RwTx, cfg MiningExecCfg, quit
// sealing in advance without waiting block execution finished.
if !noempty {
log.Info("Commit an empty block", "number", current.Header.Number)
s.Done()
return nil
}

Expand Down Expand Up @@ -141,7 +140,6 @@ func SpawnMiningExecStage(s *StageState, tx ethdb.RwTx, cfg MiningExecCfg, quit
if err := stages.SaveStageProgress(tx, stages.Execution, current.Header.Number.Uint64()); err != nil {
return err
}
s.Done()
return nil
}

Expand Down
2 changes: 0 additions & 2 deletions eth/stagedsync/stage_mining_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func SpawnMiningFinishStage(s *StageState, tx ethdb.RwTx, cfg MiningFinishCfg, q
// Tests may set pre-calculated nonce
if block.Header().Nonce.Uint64() != 0 {
cfg.miningState.MiningResultCh <- block
s.Done()
return nil
}

Expand All @@ -77,6 +76,5 @@ func SpawnMiningFinishStage(s *StageState, tx ethdb.RwTx, cfg MiningFinishCfg, q
log.Warn("Block sealing failed", "err", err)
}

s.Done()
return nil
}
4 changes: 1 addition & 3 deletions eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx ethd
to = min(prevStageProgress, toBlock)
}
if to <= s.BlockNumber {
s.Done()
return nil
}
logPrefix := s.LogPrefix()
Expand Down Expand Up @@ -238,7 +237,6 @@ Loop:
if to > s.BlockNumber {
u.UnwindTo(minBlockNum-1, minBlockHash)
}
s.Done()
} else {
if err := collectorSenders.Load(logPrefix, tx,
dbutils.Senders,
Expand All @@ -252,7 +250,7 @@ Loop:
); err != nil {
return err
}
if err = s.DoneAndUpdate(tx, to); err != nil {
if err = s.Update(tx, to); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 58e22c5

Please sign in to comment.