Skip to content

Commit

Permalink
fix: recover node buffer list trie nodes for graceful kill
Browse files Browse the repository at this point in the history
  • Loading branch information
VM committed Nov 15, 2024
1 parent b208103 commit c04ee8d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 77 deletions.
18 changes: 12 additions & 6 deletions triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ func (db *Database) loadLayers() layer {
if (errors.Is(err, errMissJournal) || errors.Is(err, errUnmatchedJournal)) && db.fastRecovery &&
db.config.TrieNodeBufferType == NodeBufferList && !db.useBase {
start := time.Now()
if db.freezer == nil {
log.Crit("Use unopened freezer db to recover node buffer list")
}
log.Info("Recover node buffer list from ancient db")

nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0,
Expand Down Expand Up @@ -363,15 +360,24 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp
}
}

log.Info("duh2uh23uh")
// Calculate the internal state transitions by id difference.
nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, id-stored, db.config.ProposeBlockInterval,
db.config.NotifyKeep, db.freezer, true, db.useBase)
log.Info("e23j3e1")
db.config.NotifyKeep, db.freezer, db.fastRecovery, db.useBase)
if err != nil {
log.Error("Failed to new trie node buffer", "error", err)
return nil, err
}

if db.config.TrieNodeBufferType == NodeBufferList {
recoveredRoot, recoveredStateID, _ := nb.getLatestStatus()
if recoveredRoot != root && recoveredStateID != id {
return nil, errors.New("Unmatched root and state id with recovered")
}

log.Info("Finish recovering node buffer list", "latest root hash", recoveredRoot.String(),
"latest state_id", recoveredStateID)
}

base := newDiskLayer(root, id, db, nil, nb)
nb.setClean(base.cleans)
return base, nil
Expand Down
118 changes: 47 additions & 71 deletions triedb/pathdb/nodebufferlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,114 +111,90 @@ func newNodeBufferList(
if nodes == nil {
nodes = make(map[common.Hash]map[string]*trienode.Node)
}
var size = uint64(0)
// for _, subset := range nodes {
// for path, n := range subset {
// size += uint64(len(n.Blob) + len(path))
// }
// }
base := newMultiDifflayer(limit, size, common.Hash{}, nodes, layers)

var (
nf *nodebufferlist
err error
)
log.Info("dm3o2020do3")
if !useBase {
log.Info("nuunf2nu")
nf, err = recoverNodeBufferList(db, freezer, base, wpBlocks, rsevMdNum, dlInMd, limit, keepFunc)
nf := &nodebufferlist{
db: db,
wpBlocks: wpBlocks,
rsevMdNum: rsevMdNum,
dlInMd: dlInMd,
limit: limit,
base: newMultiDifflayer(limit, 0, common.Hash{}, nodes, layers),
persistID: rawdb.ReadPersistentStateID(db),
stopCh: make(chan struct{}),
waitStopCh: make(chan struct{}),
forceKeepCh: make(chan struct{}),
waitForceKeepCh: make(chan struct{}),
keepFunc: keepFunc,
}

if !useBase && fastRecovery {
if freezer == nil {
log.Crit("Use unopened freezer db to recover node buffer list")
}

err := nf.recoverNodeBufferList(db, freezer)
if err != nil {
log.Error("Failed to recover node buffer list", "error", err)
return nil, err
}
} else {
log.Info("fde2mod23")
ele := newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0)
nf = &nodebufferlist{
db: db,
wpBlocks: wpBlocks,
rsevMdNum: rsevMdNum,
dlInMd: dlInMd,
limit: limit,
base: base,
head: ele,
tail: ele,
count: 1,
persistID: rawdb.ReadPersistentStateID(db),
stopCh: make(chan struct{}),
waitStopCh: make(chan struct{}),
forceKeepCh: make(chan struct{}),
waitForceKeepCh: make(chan struct{}),
keepFunc: keepFunc,
}
nf.head = ele
nf.tail = ele
nf.count = 1
}
nf.useBase.Store(useBase)

go nf.loop()

log.Info("new node buffer list", "proposed block interval", nf.wpBlocks,
"reserve multi diff_layers", nf.rsevMdNum, "diff_layers in multi_diff_layer", nf.dlInMd,
"limit", common.StorageSize(limit), "layers", layers, "persist_id", nf.persistID, "base_size", size)
"limit", common.StorageSize(limit), "layers", layers, "persist_id", nf.persistID, "base_size", nf.size)
return nf, nil
}

// recoverNodeBufferList recovers node buffer list
func recoverNodeBufferList(db ethdb.Database, freezer *rawdb.ResettableFreezer, base *multiDifflayer,
wpBlocks, rsevMdNum, dlInMd, limit uint64, keepFunc NotifyKeepFunc) (*nodebufferlist, error) {
nbl := &nodebufferlist{
db: db,
wpBlocks: wpBlocks,
rsevMdNum: rsevMdNum,
dlInMd: dlInMd,
limit: limit,
base: base,
persistID: rawdb.ReadPersistentStateID(db),
stopCh: make(chan struct{}),
waitStopCh: make(chan struct{}),
forceKeepCh: make(chan struct{}),
waitForceKeepCh: make(chan struct{}),
keepFunc: keepFunc,
}
func (nf *nodebufferlist) recoverNodeBufferList(db ethdb.Database, freezer *rawdb.ResettableFreezer) error {
head, err := freezer.Ancients()
if err != nil {
log.Error("Failed to get freezer ancients", "error", err)
return nil, err
return err
}
tail, err := freezer.Tail()
if err != nil {
log.Error("Failed to get freezer tail", "error", err)
return nil, err
return err
}
log.Info("Ancient db meta info", "persistent_state_id", nbl.persistID, "head_state_id", head,
"tail_state_id", tail, "waiting_recover_num", head-nbl.persistID)
log.Info("Ancient db meta info", "persistent_state_id", nf.persistID, "head_state_id", head,
"tail_state_id", tail, "waiting_recover_num", head-nf.persistID)

startStateID := nbl.persistID + 1
startStateID := nf.persistID + 1
startBlock, err := readBlockNumber(freezer, startStateID)
if err != nil {
log.Error("Failed to read start block number", "error", err, "tail_state_id", startStateID)
return nil, err
return err
}
endBlock, err := readBlockNumber(freezer, head)
if err != nil {
log.Error("Failed to read end block number", "error", err, "head_state_id", head)
return nil, err
return err
}
blockIntervals := nbl.createBlockInterval(startBlock, endBlock)
stateIntervals, err := nbl.createStateInterval(freezer, startStateID, head, blockIntervals)
blockIntervals := nf.createBlockInterval(startBlock, endBlock)
stateIntervals, err := nf.createStateInterval(freezer, startStateID, head, blockIntervals)
if err != nil {
return nil, err
return err
}
log.Info("block intervals info", "block_intervals", blockIntervals, "state_intervals", stateIntervals,
"start_block", startBlock, "end_block", endBlock)

var eg errgroup.Group
nbl.linkMultiDiffLayers(len(blockIntervals))
for current, i := nbl.head, 0; current != nil; current, i = current.next, i+1 {
nf.linkMultiDiffLayers(len(blockIntervals))
for current, i := nf.head, 0; current != nil; current, i = current.next, i+1 {
index := i
mdl := current
eg.Go(func() error {
for j := stateIntervals[index][0]; j <= stateIntervals[index][1]; j++ {
h, err := nbl.readStateHistory(freezer, j)
h, err := nf.readStateHistory(freezer, j)
if err != nil {
log.Error("Failed to read state history", "error", err)
return err
Expand All @@ -232,18 +208,18 @@ func recoverNodeBufferList(db ethdb.Database, freezer *rawdb.ResettableFreezer,
})
}
if err = eg.Wait(); err != nil {
return nil, err
return err
}

for current, i := nbl.head, 0; current != nil; current, i = current.next, i+1 {
nbl.size += current.size
nbl.layers += current.layers
for current, i := nf.head, 0; current != nil; current, i = current.next, i+1 {
nf.size += current.size
nf.layers += current.layers
}
nbl.diffToBase()
nf.diffToBase()

log.Info("Succeed to add diff layer", "base_size", nbl.base.size, "tail_state_id", nbl.tail.id,
"head_state_id", nbl.head.id, "nbl_layers", nbl.layers, "base_layers", nbl.base.layers)
return nbl, nil
log.Info("Succeed to add diff layer", "base_size", nf.base.size, "tail_state_id", nf.tail.id,
"head_state_id", nf.head.id, "nbl_layers", nf.layers, "base_layers", nf.base.layers)
return nil
}

// linkMultiDiffLayers links specified amount of multiDiffLayers for recovering
Expand Down

0 comments on commit c04ee8d

Please sign in to comment.