Skip to content

Commit

Permalink
silence logs for witness generation and add generation time log (#976)
Browse files Browse the repository at this point in the history
  • Loading branch information
hexoscott authored Aug 19, 2024
1 parent 7a8ab91 commit d3353f4
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 40 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ func stageHashState(db kv.RwDB, ctx context.Context) error {
cfg := stagedsync.StageHashStateCfg(db, dirs, historyV3, agg)
if unwind > 0 {
u := sync.NewUnwindState(stages.HashState, s.BlockNumber-unwind, s.BlockNumber)
err = stagedsync.UnwindHashStateStage(u, s, tx, cfg, ctx)
err = stagedsync.UnwindHashStateStage(u, s, tx, cfg, ctx, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func loopIh(db kv.RwDB, ctx context.Context, unwind uint64) error {
to := execStage.BlockNumber - unwind
_ = sync.SetCurrentStage(stages.HashState)
u := &stagedsync.UnwindState{ID: stages.HashState, UnwindPoint: to}
if err = stagedsync.UnwindHashStateStage(u, stage(sync, tx, nil, stages.HashState), tx, stagedsync.StageHashStateCfg(db, dirs, historyV3, agg), ctx); err != nil {
if err = stagedsync.UnwindHashStateStage(u, stage(sync, tx, nil, stages.HashState), tx, stagedsync.StageHashStateCfg(db, dirs, historyV3, agg), ctx, false); err != nil {
return err
}
_ = sync.SetCurrentStage(stages.IntermediateHashes)
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/eth_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (api *APIImpl) GetProof(ctx context.Context, address libcommon.Address, sto
stageState := &stagedsync.StageState{BlockNumber: latestBlock}

hashStageCfg := stagedsync.StageHashStateCfg(nil, api.dirs, api.historyV3(batch), api._agg)
if err := stagedsync.UnwindHashStateStage(unwindState, stageState, batch, hashStageCfg, ctx); err != nil {
if err := stagedsync.UnwindHashStateStage(unwindState, stageState, batch, hashStageCfg, ctx, true); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/default_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func DefaultStages(ctx context.Context, snapshots SnapshotsCfg, headers HeadersC
return SpawnHashStateStage(s, tx, hashState, ctx, quiet)
},
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
return UnwindHashStateStage(u, s, tx, hashState, ctx)
return UnwindHashStateStage(u, s, tx, hashState, ctx, false)
},
Prune: func(firstCycle bool, p *PruneState, tx kv.RwTx) error {
return PruneHashStateStage(p, tx, hashState, ctx)
Expand Down Expand Up @@ -274,7 +274,7 @@ func StateStages(ctx context.Context, headers HeadersCfg, bodies BodiesCfg, bloc
return SpawnHashStateStage(s, tx, hashState, ctx, quiet)
},
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
return UnwindHashStateStage(u, s, tx, hashState, ctx)
return UnwindHashStateStage(u, s, tx, hashState, ctx, false)
},
},
{
Expand Down
18 changes: 10 additions & 8 deletions eth/stagedsync/stage_hashstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func SpawnHashStateStage(s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx contex
return nil
}

func UnwindHashStateStage(u *UnwindState, s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx context.Context) (err error) {
func UnwindHashStateStage(u *UnwindState, s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx context.Context, quiet bool) (err error) {
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
Expand All @@ -124,7 +124,7 @@ func UnwindHashStateStage(u *UnwindState, s *StageState, tx kv.RwTx, cfg HashSta
}

logPrefix := u.LogPrefix()
if err = unwindHashStateStageImpl(logPrefix, u, s, tx, cfg, ctx); err != nil {
if err = unwindHashStateStageImpl(logPrefix, u, s, tx, cfg, ctx, quiet); err != nil {
return err
}
if err = u.Done(tx); err != nil {
Expand All @@ -138,7 +138,7 @@ func UnwindHashStateStage(u *UnwindState, s *StageState, tx kv.RwTx, cfg HashSta
return nil
}

func unwindHashStateStageImpl(logPrefix string, u *UnwindState, s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx context.Context) error {
func unwindHashStateStageImpl(logPrefix string, u *UnwindState, s *StageState, tx kv.RwTx, cfg HashStateCfg, ctx context.Context, quiet bool) error {
// Currently it does not require unwinding because it does not create any Intermediate Hash records
// and recomputes the state root from scratch
prom := NewPromoter(tx, cfg.dirs, ctx)
Expand All @@ -155,13 +155,13 @@ func unwindHashStateStageImpl(logPrefix string, u *UnwindState, s *StageState, t
}
return nil
}
if err := prom.Unwind(logPrefix, s, u, false /* storage */, true /* codes */); err != nil {
if err := prom.Unwind(logPrefix, s, u, false /* storage */, true /* codes */, quiet); err != nil {
return err
}
if err := prom.Unwind(logPrefix, s, u, false /* storage */, false /* codes */); err != nil {
if err := prom.Unwind(logPrefix, s, u, false /* storage */, false /* codes */, quiet); err != nil {
return err
}
if err := prom.Unwind(logPrefix, s, u, true /* storage */, false /* codes */); err != nil {
if err := prom.Unwind(logPrefix, s, u, true /* storage */, false /* codes */, quiet); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -852,7 +852,7 @@ func (p *Promoter) UnwindOnHistoryV3(logPrefix string, agg *state.AggregatorV3,
return collector.Load(p.tx, kv.HashedAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: p.ctx.Done()})
}

func (p *Promoter) Unwind(logPrefix string, s *StageState, u *UnwindState, storage bool, codes bool) error {
func (p *Promoter) Unwind(logPrefix string, s *StageState, u *UnwindState, storage bool, codes bool, quiet bool) error {
var changeSetBucket string
if storage {
changeSetBucket = kv.StorageChangeSet
Expand All @@ -862,7 +862,9 @@ func (p *Promoter) Unwind(logPrefix string, s *StageState, u *UnwindState, stora
from := s.BlockNumber
to := u.UnwindPoint

log.Info(fmt.Sprintf("[%s] Unwinding started", logPrefix), "from", from, "to", to, "storage", storage, "codes", codes)
if !quiet {
log.Info(fmt.Sprintf("[%s] Unwinding started", logPrefix), "from", from, "to", to, "storage", storage, "codes", codes)
}

startkey := hexutility.EncodeTs(to + 1)

Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_hashstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestUnwindHashed(t *testing.T) {
}
u := &UnwindState{UnwindPoint: 50}
s := &StageState{BlockNumber: 100}
err = unwindHashStateStageImpl("logPrefix", u, s, tx2, StageHashStateCfg(db2, dirs, historyV3, nil), context.Background())
err = unwindHashStateStageImpl("logPrefix", u, s, tx2, StageHashStateCfg(db2, dirs, historyV3, nil), context.Background(), false)
if err != nil {
t.Errorf("error while unwind state: %v", err)
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestUnwindHashStateShutdown(t *testing.T) {

u := &UnwindState{UnwindPoint: 5}
s := &StageState{BlockNumber: 10}
if err = unwindHashStateStageImpl("logPrefix", u, s, tx, cfg, ctx); !errors.Is(err, tc.errExp) {
if err = unwindHashStateStageImpl("logPrefix", u, s, tx, cfg, ctx, false); !errors.Is(err, tc.errExp) {
t.Errorf("error does not match expected error while shutdown unwindHashStateStageImpl, got: %v, expected: %v", err, tc.errExp)
}

Expand Down
30 changes: 19 additions & 11 deletions zk/stages/stage_interhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func SpawnZkIntermediateHashesStage(s *stagedsync.StageState, u stagedsync.Unwin
return root, err
}

func UnwindZkIntermediateHashesStage(u *stagedsync.UnwindState, s *stagedsync.StageState, tx kv.RwTx, cfg ZkInterHashesCfg, ctx context.Context) (err error) {
func UnwindZkIntermediateHashesStage(u *stagedsync.UnwindState, s *stagedsync.StageState, tx kv.RwTx, cfg ZkInterHashesCfg, ctx context.Context, silent bool) (err error) {
quit := ctx.Done()
useExternalTx := tx != nil
if !useExternalTx {
Expand All @@ -206,7 +206,9 @@ func UnwindZkIntermediateHashesStage(u *stagedsync.UnwindState, s *stagedsync.St
}
defer tx.Rollback()
}
log.Debug(fmt.Sprintf("[%s] Unwinding intermediate hashes", s.LogPrefix()), "from", s.BlockNumber, "to", u.UnwindPoint)
if !silent {
log.Debug(fmt.Sprintf("[%s] Unwinding intermediate hashes", s.LogPrefix()), "from", s.BlockNumber, "to", u.UnwindPoint)
}

var expectedRootHash common.Hash
syncHeadHeader := rawdb.ReadHeaderByNumber(tx, u.UnwindPoint)
Expand All @@ -219,7 +221,7 @@ func UnwindZkIntermediateHashesStage(u *stagedsync.UnwindState, s *stagedsync.St
expectedRootHash = syncHeadHeader.Root
}

root, err := unwindZkSMT(ctx, s.LogPrefix(), s.BlockNumber, u.UnwindPoint, tx, cfg.checkRoot, &expectedRootHash, quit)
root, err := unwindZkSMT(ctx, s.LogPrefix(), s.BlockNumber, u.UnwindPoint, tx, cfg.checkRoot, &expectedRootHash, silent, quit)
if err != nil {
return err
}
Expand Down Expand Up @@ -457,14 +459,18 @@ func zkIncrementIntermediateHashes(ctx context.Context, logPrefix string, s *sta
return hash, nil
}

func unwindZkSMT(ctx context.Context, logPrefix string, from, to uint64, db kv.RwTx, checkRoot bool, expectedRootHash *common.Hash, quit <-chan struct{}) (common.Hash, error) {
log.Info(fmt.Sprintf("[%s] Unwind trie hashes started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Unwind ended", logPrefix))
func unwindZkSMT(ctx context.Context, logPrefix string, from, to uint64, db kv.RwTx, checkRoot bool, expectedRootHash *common.Hash, quiet bool, quit <-chan struct{}) (common.Hash, error) {
if !quiet {
log.Info(fmt.Sprintf("[%s] Unwind trie hashes started", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Unwind ended", logPrefix))
}

eridb := db2.NewEriDb(db)
dbSmt := smt.NewSMT(eridb, false)

log.Info(fmt.Sprintf("[%s]", logPrefix), "last root", common.BigToHash(dbSmt.LastRoot()))
if !quiet {
log.Info(fmt.Sprintf("[%s]", logPrefix), "last root", common.BigToHash(dbSmt.LastRoot()))
}

if quit == nil {
log.Warn("quit channel is nil, creating a new one")
Expand Down Expand Up @@ -492,7 +498,7 @@ func unwindZkSMT(ctx context.Context, logPrefix string, from, to uint64, db kv.R

total := uint64(math.Abs(float64(from) - float64(to) + 1))
printerStopped := false
progressChan, stopPrinter := zk.ProgressPrinter(fmt.Sprintf("[%s] Progress unwinding", logPrefix), total)
progressChan, stopPrinter := zk.ProgressPrinter(fmt.Sprintf("[%s] Progress unwinding", logPrefix), total, quiet)
defer func() {
if !printerStopped {
stopPrinter()
Expand Down Expand Up @@ -616,7 +622,7 @@ func unwindZkSMT(ctx context.Context, logPrefix string, from, to uint64, db kv.R
return trie.EmptyRoot, err
}

if err := verifyLastHash(dbSmt, expectedRootHash, checkRoot, logPrefix); err != nil {
if err := verifyLastHash(dbSmt, expectedRootHash, checkRoot, logPrefix, quiet); err != nil {
log.Error("failed to verify hash")
eridb.RollbackBatch()
return trie.EmptyRoot, err
Expand All @@ -632,13 +638,15 @@ func unwindZkSMT(ctx context.Context, logPrefix string, from, to uint64, db kv.R
return hash, nil
}

func verifyLastHash(dbSmt *smt.SMT, expectedRootHash *common.Hash, checkRoot bool, logPrefix string) error {
func verifyLastHash(dbSmt *smt.SMT, expectedRootHash *common.Hash, checkRoot bool, logPrefix string, quiet bool) error {
hash := common.BigToHash(dbSmt.LastRoot())

if checkRoot && hash != *expectedRootHash {
panic(fmt.Sprintf("[%s] Wrong trie root: %x, expected (from header): %x", logPrefix, hash, expectedRootHash))
}
log.Info(fmt.Sprintf("[%s] Trie root matches", logPrefix), "hash", hash.Hex())
if !quiet {
log.Info(fmt.Sprintf("[%s] Trie root matches", logPrefix), "hash", hash.Hex())
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion zk/stages/stage_sequencer_interhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func UnwindSequencerInterhashsStage(
ctx context.Context,
cfg ZkInterHashesCfg,
) error {
return UnwindZkIntermediateHashesStage(u, s, tx, cfg, ctx)
return UnwindZkIntermediateHashesStage(u, s, tx, cfg, ctx, false)
}

func PruneSequencerInterhashesStage(
Expand Down
6 changes: 3 additions & 3 deletions zk/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func SequencerZkStages(
return stages.SpawnHashStateStage(s, tx, hashState, ctx, quiet)
},
Unwind: func(firstCycle bool, u *stages.UnwindState, s *stages.StageState, tx kv.RwTx) error {
return stages.UnwindHashStateStage(u, s, tx, hashState, ctx)
return stages.UnwindHashStateStage(u, s, tx, hashState, ctx, false)
},
Prune: func(firstCycle bool, p *stages.PruneState, tx kv.RwTx) error {
return stages.PruneHashStateStage(p, tx, hashState, ctx)
Expand Down Expand Up @@ -354,7 +354,7 @@ func DefaultZkStages(
return stages.SpawnHashStateStage(s, tx, hashState, ctx, quiet)
},
Unwind: func(firstCycle bool, u *stages.UnwindState, s *stages.StageState, tx kv.RwTx) error {
return stages.UnwindHashStateStage(u, s, tx, hashState, ctx)
return stages.UnwindHashStateStage(u, s, tx, hashState, ctx, false)
},
Prune: func(firstCycle bool, p *stages.PruneState, tx kv.RwTx) error {
return stages.PruneHashStateStage(p, tx, hashState, ctx)
Expand All @@ -369,7 +369,7 @@ func DefaultZkStages(
return err
},
Unwind: func(firstCycle bool, u *stages.UnwindState, s *stages.StageState, tx kv.RwTx) error {
return UnwindZkIntermediateHashesStage(u, s, tx, zkInterHashesCfg, ctx)
return UnwindZkIntermediateHashesStage(u, s, tx, zkInterHashesCfg, ctx, false)
},
Prune: func(firstCycle bool, p *stages.PruneState, tx kv.RwTx) error {
// TODO: implement this in zk interhashes
Expand Down
4 changes: 2 additions & 2 deletions zk/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var ErrLimboState = errors.New("Calculating limbo state")

// prints progress every 10 seconds
// returns a channel to send progress to, and a function to stop the printer routine
func ProgressPrinter(message string, total uint64) (chan uint64, func()) {
func ProgressPrinter(message string, total uint64, quiet bool) (chan uint64, func()) {
progress := make(chan uint64)
ctDone := make(chan bool)

Expand All @@ -34,7 +34,7 @@ func ProgressPrinter(message string, total uint64) (chan uint64, func()) {
pct = (pc * 100) / total
}
case <-ticker.C:
if pc > 0 {
if pc > 0 && !quiet {
log.Info(fmt.Sprintf("%s: %d/%d (%d%%)", message, pc, total, pct))
}
case <-ctDone:
Expand Down
29 changes: 21 additions & 8 deletions zk/witness/witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
zkStages "github.com/ledgerwatch/erigon/zk/stages"
zkUtils "github.com/ledgerwatch/erigon/zk/utils"
"github.com/ledgerwatch/log/v3"
"time"
)

var (
Expand Down Expand Up @@ -126,7 +127,7 @@ func (g *Generator) GetWitnessByBatch(tx kv.Tx, ctx context.Context, batchNum ui
blocks[i] = block
}

return g.generateWitness(tx, ctx, blocks, debug, witnessFull)
return g.generateWitness(tx, ctx, batchNum, blocks, debug, witnessFull)
} else {
blockNumbers, err := reader.GetL2BlockNosByBatch(batchNum)
if err != nil {
Expand All @@ -145,7 +146,7 @@ func (g *Generator) GetWitnessByBatch(tx kv.Tx, ctx context.Context, batchNum ui
blocks[idx] = block
idx++
}
return g.generateWitness(tx, ctx, blocks, debug, witnessFull)
return g.generateWitness(tx, ctx, batchNum, blocks, debug, witnessFull)
}
}

Expand All @@ -160,23 +161,35 @@ func (g *Generator) GetWitnessByBlockRange(tx kv.Tx, ctx context.Context, startB
witness := trie.NewWitness([]trie.WitnessOperator{})
return getWitnessBytes(witness, debug)
}
hermezDb := hermez_db.NewHermezDbReader(tx)
idx := 0
blocks := make([]*eritypes.Block, endBlock-startBlock+1)
var firstBatch uint64 = 0
for blockNum := startBlock; blockNum <= endBlock; blockNum++ {
block, err := rawdb.ReadBlockByNumber(tx, blockNum)
if err != nil {
return nil, err
}
firstBatch, err = hermezDb.GetBatchNoByL2Block(block.NumberU64())
if err != nil {
return nil, err
}
blocks[idx] = block
idx++
}

return g.generateWitness(tx, ctx, blocks, debug, witnessFull)
return g.generateWitness(tx, ctx, firstBatch, blocks, debug, witnessFull)
}

func (g *Generator) generateWitness(tx kv.Tx, ctx context.Context, blocks []*eritypes.Block, debug, witnessFull bool) ([]byte, error) {
t := zkUtils.StartTimer("witness", "generatewitness")
defer t.LogTimer()
func (g *Generator) generateWitness(tx kv.Tx, ctx context.Context, batchNum uint64, blocks []*eritypes.Block, debug, witnessFull bool) ([]byte, error) {
now := time.Now()
defer func() {
diff := time.Since(now)
if len(blocks) == 0 {
return
}
log.Info("Generating witness timing", "batch", batchNum, "blockFrom", blocks[0].NumberU64(), "blockTo", blocks[len(blocks)-1].NumberU64(), "taken", diff)
}()

endBlock := blocks[len(blocks)-1].NumberU64()
startBlock := blocks[0].NumberU64()
Expand Down Expand Up @@ -211,13 +224,13 @@ func (g *Generator) generateWitness(tx kv.Tx, ctx context.Context, blocks []*eri

hashStageCfg := stagedsync.StageHashStateCfg(nil, g.dirs, g.historyV3, g.agg)
hashStageCfg.SetQuiet(true)
if err := stagedsync.UnwindHashStateStage(unwindState, stageState, batch, hashStageCfg, ctx); err != nil {
if err := stagedsync.UnwindHashStateStage(unwindState, stageState, batch, hashStageCfg, ctx, true); err != nil {
return nil, fmt.Errorf("unwind hash state: %w", err)
}

interHashStageCfg := zkStages.StageZkInterHashesCfg(nil, true, true, false, g.dirs.Tmp, g.blockReader, nil, g.historyV3, g.agg, nil)

if err = zkStages.UnwindZkIntermediateHashesStage(unwindState, stageState, batch, interHashStageCfg, ctx); err != nil {
if err = zkStages.UnwindZkIntermediateHashesStage(unwindState, stageState, batch, interHashStageCfg, ctx, true); err != nil {
return nil, fmt.Errorf("unwind intermediate hashes: %w", err)
}

Expand Down

0 comments on commit d3353f4

Please sign in to comment.