Skip to content

Commit

Permalink
trie: pbss fix release v1.13.5 continue (#621)
Browse files Browse the repository at this point in the history
* trie: refactor stacktrie (#28233)

This change refactors stacktrie to separate the stacktrie itself from the
internal representation of nodes: a stacktrie is not a recursive structure
of stacktries, rather, a framework for representing and operating upon a set of nodes.

---------

Co-authored-by: Gary Rong <garyrong0905@gmail.com>

* trie: remove owner and binary marshaling from stacktrie (#28291)

This change
  - Removes the owner-notion from a stacktrie; the owner is only ever needed for comitting to the database, but the commit-function, the `writeFn` is provided by the caller, so the caller can just set the owner into the `writeFn` instead of having it passed through the stacktrie.
  - Removes the `encoding.BinaryMarshaler`/`encoding.BinaryUnmarshaler` interface from stacktrie. We're not using it, and it is doubtful whether anyone downstream is either.

* core, trie, eth: refactor stacktrie constructor

This change enhances the stacktrie constructor by introducing an option struct. It also simplifies the `Hash` and `Commit` operations, getting rid of the special handling round root node.

* core, eth, trie: filter out boundary nodes and remove dangling nodes in stacktrie (#28327)

* core, eth, trie: filter out boundary nodes in stacktrie

* eth/protocol/snap: add comments

* Update trie/stacktrie.go

Co-authored-by: Martin Holst Swende <martin@swende.se>

* eth, trie: remove onBoundary callback

* eth/protocols/snap: keep complete boundary nodes

* eth/protocols/snap: skip healing if the storage trie is already complete

* eth, trie: add more metrics

* eth, trie: address comment

---------

Co-authored-by: Martin Holst Swende <martin@swende.se>

---------

Co-authored-by: Martin Holst Swende <martin@swende.se>
Co-authored-by: Gary Rong <garyrong0905@gmail.com>
  • Loading branch information
3 people authored Nov 8, 2024
1 parent a44ecb8 commit d97cf83
Show file tree
Hide file tree
Showing 11 changed files with 495 additions and 355 deletions.
3 changes: 3 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
var (
hashT = reflect.TypeOf(Hash{})
addressT = reflect.TypeOf(Address{})

// MaxHash represents the maximum possible hash value.
MaxHash = HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
)

// Hash represents the 32 byte Keccak256 hash of arbitrary data.
Expand Down
16 changes: 5 additions & 11 deletions core/state/snapshot/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,22 +363,16 @@ func generateTrieRoot(db ethdb.KeyValueWriter, scheme string, it Iterator, accou

func stackTrieGenerate(db ethdb.KeyValueWriter, scheme string, owner common.Hash, in chan trieKV, out chan common.Hash) {

var nodeWriter trie.NodeWriteFunc
options := trie.NewStackTrieOptions()
// Implement nodeWriter in case db is existed otherwise let it be nil.
if db != nil {
nodeWriter = func(owner common.Hash, path []byte, hash common.Hash, blob []byte) {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(db, owner, path, hash, blob, scheme)
}
})
}
t := trie.NewStackTrieWithOwner(nodeWriter, owner)
t := trie.NewStackTrie(options)
for leaf := range in {
t.TryUpdate(leaf.key[:], leaf.value)
}
var root common.Hash
if db == nil {
root = t.Hash()
} else {
root, _ = t.Commit()
}
out <- root
out <- t.Commit()
}
28 changes: 28 additions & 0 deletions eth/protocols/snap/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,32 @@ var (

IngressRegistrationErrorMeter = metrics.NewRegisteredMeter(ingressRegistrationErrorName, nil)
EgressRegistrationErrorMeter = metrics.NewRegisteredMeter(egressRegistrationErrorName, nil)

// deletionGauge is the metric to track how many trie node deletions
// are performed in total during the sync process.
deletionGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/delete", nil)

// lookupGauge is the metric to track how many trie node lookups are
// performed to determine if node needs to be deleted.
lookupGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/lookup", nil)

// boundaryAccountNodesGauge is the metric to track how many boundary trie
// nodes in account trie are met.
boundaryAccountNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/account", nil)

// boundaryAccountNodesGauge is the metric to track how many boundary trie
// nodes in storage tries are met.
boundaryStorageNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/storage", nil)

// smallStorageGauge is the metric to track how many storages are small enough
// to retrieved in one or two request.
smallStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/small", nil)

// largeStorageGauge is the metric to track how many storages are large enough
// to retrieved concurrently.
largeStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/large", nil)

// skipStorageHealingGauge is the metric to track how many storages are retrieved
// in multiple requests but healing is not necessary.
skipStorageHealingGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/noheal", nil)
)
156 changes: 129 additions & 27 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,19 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
}
}

// cleanPath is used to remove the dangling nodes in the stackTrie.
func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) {
if owner == (common.Hash{}) && rawdb.ExistsAccountTrieNode(s.db, path) {
rawdb.DeleteAccountTrieNode(batch, path)
deletionGauge.Inc(1)
}
if owner != (common.Hash{}) && rawdb.ExistsStorageTrieNode(s.db, owner, path) {
rawdb.DeleteStorageTrieNode(batch, owner, path)
deletionGauge.Inc(1)
}
lookupGauge.Inc(1)
}

// loadSyncStatus retrieves a previously aborted sync status from the database,
// or generates a fresh one if none is available.
func (s *Syncer) loadSyncStatus() {
Expand All @@ -721,9 +734,22 @@ func (s *Syncer) loadSyncStatus() {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
task.genTrie = trie.NewStackTrie(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(task.genBatch, owner, path, hash, val, s.scheme)
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(task.genBatch, common.Hash{}, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(task.genBatch, common.Hash{}, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(task.Next != (common.Hash{}), task.Last != common.MaxHash, boundaryAccountNodesGauge)
}
task.genTrie = trie.NewStackTrie(options)

for accountHash, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
Expand All @@ -735,9 +761,24 @@ func (s *Syncer) loadSyncStatus() {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
subtask.genTrie = trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, val, s.scheme)
}, accountHash)
owner := accountHash // local assignment for stacktrie writer closure
options := trie.NewStackTrieOptions()

options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(subtask.genBatch, owner, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(subtask.Next != common.Hash{}, subtask.Last != common.MaxHash, boundaryStorageNodesGauge)
}
subtask.genTrie = trie.NewStackTrie(options)
}
}
}
Expand Down Expand Up @@ -786,14 +827,27 @@ func (s *Syncer) loadSyncStatus() {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, common.Hash{}, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(next != common.Hash{}, last != common.MaxHash, boundaryAccountNodesGauge)
}
s.tasks = append(s.tasks, &accountTask{
Next: next,
Last: last,
SubTasks: make(map[common.Hash][]*storageTask),
genBatch: batch,
genTrie: trie.NewStackTrie(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme)
}),
genTrie: trie.NewStackTrie(options),
})
log.Debug("Created account sync task", "from", next, "last", last)
next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
Expand Down Expand Up @@ -1930,6 +1984,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
res.mainTask.needState[j] = false
res.mainTask.pend--
smallStorageGauge.Inc(1)
}
// If the last contract was chunked, mark it as needing healing
// to avoid writing it out to disk prematurely.
Expand Down Expand Up @@ -1965,22 +2020,37 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
}
r := newHashRange(lastKey, chunks)

if chunks == 1 {
smallStorageGauge.Inc(1)
} else {
largeStorageGauge.Inc(1)
}
// Our first task is the one that was just filled by this response.
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
owner := account // local assignment for stacktrie writer closure
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, owner, path)
})
// Keep the left boundary as it's the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(false, r.End() != common.MaxHash, boundaryStorageNodesGauge)
}
tasks = append(tasks, &storageTask{
Next: common.Hash{},
Last: r.End(),
root: acc.Root,
genBatch: batch,
genTrie: trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme)
}, account),
genTrie: trie.NewStackTrie(options),
})
for r.Next() {
batch := ethdb.HookedBatch{
Expand All @@ -1989,14 +2059,27 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, owner, path)
})
// Skip the left boundary as it's not the first range
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(true, r.End() != common.MaxHash, boundaryStorageNodesGauge)
}
tasks = append(tasks, &storageTask{
Next: r.Start(),
Last: r.End(),
root: acc.Root,
genBatch: batch,
genTrie: trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme)
}, account),
genTrie: trie.NewStackTrie(options),
})
}
for _, task := range tasks {
Expand Down Expand Up @@ -2041,9 +2124,23 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
slots += len(res.hashes[i])

if i < len(res.hashes)-1 || res.subTask == nil {
tr := trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme)
}, account)
// no need to make local reassignment of account: this closure does not outlive the loop
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, account, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner only in the context of the
// path scheme. Deletion is forbidden in the hash scheme, as it can
// disrupt state completeness.
//
// Notably, boundary nodes can be also kept because the whole storage
// trie is complete.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, account, path)
})
}
tr := trie.NewStackTrie(options)
for j := 0; j < len(res.hashes[i]); j++ {
tr.Update(res.hashes[i][j][:], res.slots[i][j])
}
Expand All @@ -2065,18 +2162,25 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// Large contracts could have generated new trie nodes, flush them to disk
if res.subTask != nil {
if res.subTask.done {
if root, err := res.subTask.genTrie.Commit(); err != nil {
log.Error("Failed to commit stack slots", "err", err)
} else if root == res.subTask.root {
// If the chunk's root is an overflown but full delivery, clear the heal request
root := res.subTask.genTrie.Commit()
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()

// If the chunk's root is an overflown but full delivery,
// clear the heal request.
accountHash := res.accounts[len(res.accounts)-1]
if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db, accountHash, nil, root) {
for i, account := range res.mainTask.res.hashes {
if account == res.accounts[len(res.accounts)-1] {
if account == accountHash {
res.mainTask.needHeal[i] = false
skipStorageHealingGauge.Inc(1)
}
}
}
}
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize || res.subTask.done {
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize {
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
Expand Down Expand Up @@ -2283,9 +2387,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
// flush after finalizing task.done. It's fine even if we crash and lose this
// write as it will only cause more data to be downloaded during heal.
if task.done {
if _, err := task.genTrie.Commit(); err != nil {
log.Error("Failed to commit stack account", "err", err)
}
task.genTrie.Commit()
}
if task.genBatch.ValueSize() > ethdb.IdealBatchSize || task.done {
if err := task.genBatch.Write(); err != nil {
Expand Down
19 changes: 8 additions & 11 deletions tests/fuzzers/stacktrie/trie_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ func (f *fuzzer) fuzz() int {
trieA = trie.NewEmpty(dbA)
spongeB = &spongeDb{sponge: sha3.NewLegacyKeccak256()}
dbB = trie.NewDatabase(rawdb.NewDatabase(spongeB), nil)
trieB = trie.NewStackTrie(func(owner common.Hash, path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(spongeB, owner, path, hash, blob, dbB.Scheme())
options = trie.NewStackTrieOptions().WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(spongeB, common.Hash{}, path, hash, blob, dbB.Scheme())
})
trieB = trie.NewStackTrie(options)
vals kvs
useful bool
maxElements = 10000
Expand Down Expand Up @@ -203,9 +204,7 @@ func (f *fuzzer) fuzz() int {
trieB.Update(kv.k, kv.v)
}
rootB := trieB.Hash()
if _, err := trieB.Commit(); err != nil {
panic(err)
}
trieB.Commit()
if rootA != rootB {
panic(fmt.Sprintf("roots differ: (trie) %x != %x (stacktrie)", rootA, rootB))
}
Expand All @@ -217,22 +216,20 @@ func (f *fuzzer) fuzz() int {
// Ensure all the nodes are persisted correctly
// Need tracked deleted nodes.
var (
nodeset = make(map[string][]byte) // path -> blob
trieC = trie.NewStackTrie(func(owner common.Hash, path []byte, hash common.Hash, blob []byte) {
nodeset = make(map[string][]byte) // path -> blob
optionsC = trie.NewStackTrieOptions().WithWriter(func(path []byte, hash common.Hash, blob []byte) {
if crypto.Keccak256Hash(blob) != hash {
panic("invalid node blob")
}
if owner != (common.Hash{}) {
panic("invalid node owner")
}
nodeset[string(path)] = common.CopyBytes(blob)
})
trieC = trie.NewStackTrie(optionsC)
checked int
)
for _, kv := range vals {
trieC.Update(kv.k, kv.v)
}
rootC, _ := trieC.Commit()
rootC := trieC.Commit()
if rootA != rootC {
panic(fmt.Sprintf("roots differ: (trie) %x != %x (stacktrie)", rootA, rootC))
}
Expand Down
4 changes: 4 additions & 0 deletions trie/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type kv struct {
t bool
}

func (k *kv) cmp(other *kv) int {
return bytes.Compare(k.k, other.k)
}

func TestIteratorLargeData(t *testing.T) {
trie := NewEmpty(NewDatabase(rawdb.NewMemoryDatabase(), nil))
vals := make(map[string]*kv)
Expand Down
2 changes: 1 addition & 1 deletion trie/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func VerifyRangeProof(rootHash common.Hash, firstKey []byte, lastKey []byte, key
if proof == nil {
tr := NewStackTrie(nil)
for index, key := range keys {
tr.TryUpdate(key, values[index])
tr.Update(key, values[index])
}
if have, want := tr.Hash(), rootHash; have != want {
return false, fmt.Errorf("invalid proof, want hash %x, got %x", want, have)
Expand Down
Loading

0 comments on commit d97cf83

Please sign in to comment.