Skip to content

Commit

Permalink
core, eth, trie: filter out boundary nodes and remove dangling nodes …
Browse files Browse the repository at this point in the history
…in stacktrie (#28327)

* core, eth, trie: filter out boundary nodes in stacktrie

* eth/protocol/snap: add comments

* Update trie/stacktrie.go

Co-authored-by: Martin Holst Swende <martin@swende.se>

* eth, trie: remove onBoundary callback

* eth/protocols/snap: keep complete boundary nodes

* eth/protocols/snap: skip healing if the storage trie is already complete

* eth, trie: add more metrics

* eth, trie: address comment

---------

Co-authored-by: Martin Holst Swende <martin@swende.se>
  • Loading branch information
2 people authored and Francesco4203 committed Nov 4, 2024
1 parent bea205d commit 3362b40
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 11 deletions.
3 changes: 3 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
var (
hashT = reflect.TypeOf(Hash{})
addressT = reflect.TypeOf(Address{})

// MaxHash represents the maximum possible hash value.
MaxHash = HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
)

// Hash represents the 32 byte Keccak256 hash of arbitrary data.
Expand Down
28 changes: 28 additions & 0 deletions eth/protocols/snap/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,32 @@ var (

IngressRegistrationErrorMeter = metrics.NewRegisteredMeter(ingressRegistrationErrorName, nil)
EgressRegistrationErrorMeter = metrics.NewRegisteredMeter(egressRegistrationErrorName, nil)

// deletionGauge is the metric to track how many trie node deletions
// are performed in total during the sync process.
deletionGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/delete", nil)

// lookupGauge is the metric to track how many trie node lookups are
// performed to determine if node needs to be deleted.
lookupGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/lookup", nil)

// boundaryAccountNodesGauge is the metric to track how many boundary trie
// nodes in account trie are met.
boundaryAccountNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/account", nil)

// boundaryAccountNodesGauge is the metric to track how many boundary trie
// nodes in storage tries are met.
boundaryStorageNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/storage", nil)

// smallStorageGauge is the metric to track how many storages are small enough
// to retrieved in one or two request.
smallStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/small", nil)

// largeStorageGauge is the metric to track how many storages are large enough
// to retrieved concurrently.
largeStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/large", nil)

// skipStorageHealingGauge is the metric to track how many storages are retrieved
// in multiple requests but healing is not necessary.
skipStorageHealingGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/noheal", nil)
)
99 changes: 94 additions & 5 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,19 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
}
}

// cleanPath is used to remove the dangling nodes in the stackTrie.
func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) {
if owner == (common.Hash{}) && rawdb.ExistsAccountTrieNode(s.db, path) {
rawdb.DeleteAccountTrieNode(batch, path)
deletionGauge.Inc(1)
}
if owner != (common.Hash{}) && rawdb.ExistsStorageTrieNode(s.db, owner, path) {
rawdb.DeleteStorageTrieNode(batch, owner, path)
deletionGauge.Inc(1)
}
lookupGauge.Inc(1)
}

// loadSyncStatus retrieves a previously aborted sync status from the database,
// or generates a fresh one if none is available.
func (s *Syncer) loadSyncStatus() {
Expand All @@ -725,6 +738,17 @@ func (s *Syncer) loadSyncStatus() {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(task.genBatch, common.Hash{}, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(task.genBatch, common.Hash{}, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(task.Next != (common.Hash{}), task.Last != common.MaxHash, boundaryAccountNodesGauge)
}
task.genTrie = trie.NewStackTrie(options)

for accountHash, subtasks := range task.SubTasks {
Expand All @@ -743,6 +767,17 @@ func (s *Syncer) loadSyncStatus() {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(subtask.genBatch, owner, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(subtask.Next != common.Hash{}, subtask.Last != common.MaxHash, boundaryStorageNodesGauge)
}
subtask.genTrie = trie.NewStackTrie(options)
}
}
Expand Down Expand Up @@ -796,6 +831,17 @@ func (s *Syncer) loadSyncStatus() {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, common.Hash{}, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(next != common.Hash{}, last != common.MaxHash, boundaryAccountNodesGauge)
}
s.tasks = append(s.tasks, &accountTask{
Next: next,
Last: last,
Expand Down Expand Up @@ -1938,6 +1984,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
res.mainTask.needState[j] = false
res.mainTask.pend--
smallStorageGauge.Inc(1)
}
// If the last contract was chunked, mark it as needing healing
// to avoid writing it out to disk prematurely.
Expand Down Expand Up @@ -1973,7 +2020,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
}
r := newHashRange(lastKey, chunks)

if chunks == 1 {
smallStorageGauge.Inc(1)
} else {
largeStorageGauge.Inc(1)
}
// Our first task is the one that was just filled by this response.
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
Expand All @@ -1986,6 +2037,14 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, owner, path)
})
// Keep the left boundary as it's the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(false, r.End() != common.MaxHash, boundaryStorageNodesGauge)
}
tasks = append(tasks, &storageTask{
Next: common.Hash{},
Last: r.End(),
Expand All @@ -2004,6 +2063,17 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, owner, path)
})
// Skip the left boundary as it's not the first range
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(true, r.End() != common.MaxHash, boundaryStorageNodesGauge)
}
tasks = append(tasks, &storageTask{
Next: r.Start(),
Last: r.End(),
Expand Down Expand Up @@ -2059,6 +2129,17 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, account, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner only in the context of the
// path scheme. Deletion is forbidden in the hash scheme, as it can
// disrupt state completeness.
//
// Notably, boundary nodes can be also kept because the whole storage
// trie is complete.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, account, path)
})
}
tr := trie.NewStackTrie(options)
for j := 0; j < len(res.hashes[i]); j++ {
tr.Update(res.hashes[i][j][:], res.slots[i][j])
Expand All @@ -2082,16 +2163,24 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
if res.subTask != nil {
if res.subTask.done {
root := res.subTask.genTrie.Commit()
if root == res.subTask.root {
// If the chunk's root is an overflown but full delivery, clear the heal request
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()

// If the chunk's root is an overflown but full delivery,
// clear the heal request.
accountHash := res.accounts[len(res.accounts)-1]
if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db, accountHash, nil, root) {
for i, account := range res.mainTask.res.hashes {
if account == res.accounts[len(res.accounts)-1] {
if account == accountHash {
res.mainTask.needHeal[i] = false
skipStorageHealingGauge.Inc(1)
}
}
}
}
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize || res.subTask.done {
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize {
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
Expand Down
4 changes: 4 additions & 0 deletions trie/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type kv struct {
t bool
}

func (k *kv) cmp(other *kv) int {
return bytes.Compare(k.k, other.k)
}

func TestIteratorLargeData(t *testing.T) {
trie := NewEmpty(NewDatabase(rawdb.NewMemoryDatabase(), nil))
vals := make(map[string]*kv)
Expand Down
86 changes: 80 additions & 6 deletions trie/stacktrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package trie

import (
"bytes"
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

var (
Expand All @@ -32,7 +34,12 @@ var (

// StackTrieOptions contains the configured options for manipulating the stackTrie.
type StackTrieOptions struct {
Writer func(path []byte, hash common.Hash, blob []byte) // The function to commit the dirty nodes
Writer func(path []byte, hash common.Hash, blob []byte) // The function to commit the dirty nodes
Cleaner func(path []byte) // The function to clean up dangling nodes

SkipLeftBoundary bool // Flag whether the nodes on the left boundary are skipped for committing
SkipRightBoundary bool // Flag whether the nodes on the right boundary are skipped for committing
boundaryGauge metrics.Gauge // Gauge to track how many boundary nodes are met
}

// NewStackTrieOptions initializes an empty options for stackTrie.
Expand All @@ -44,13 +51,32 @@ func (o *StackTrieOptions) WithWriter(writer func(path []byte, hash common.Hash,
return o
}

// WithCleaner configures the cleaner in the option for removing dangling nodes.
func (o *StackTrieOptions) WithCleaner(cleaner func(path []byte)) *StackTrieOptions {
o.Cleaner = cleaner
return o
}

// WithSkipBoundary configures whether the left and right boundary nodes are
// filtered for committing, along with a gauge metrics to track how many
// boundary nodes are met.
func (o *StackTrieOptions) WithSkipBoundary(skipLeft, skipRight bool, gauge metrics.Gauge) *StackTrieOptions {
o.SkipLeftBoundary = skipLeft
o.SkipRightBoundary = skipRight
o.boundaryGauge = gauge
return o
}

// StackTrie is a trie implementation that expects keys to be inserted
// in order. Once it determines that a subtree will no longer be inserted
// into, it will hash it and free up the memory it uses.
type StackTrie struct {
options *StackTrieOptions
root *stNode
h *hasher

first []byte // The (hex-encoded without terminator) key of first inserted entry, tracked as left boundary.
last []byte // The (hex-encoded without terminator) key of last inserted entry, tracked as right boundary.
}

// NewStackTrie allocates and initializes an empty trie.
Expand All @@ -77,14 +103,27 @@ func (t *StackTrie) TryUpdate(key, value []byte) error {
if len(value) == 0 {
panic("deletion not supported")
}
t.insert(t.root, k[:len(k)-1], value, nil)
k = k[:len(k)-1] // chop the termination flag

// track the first and last inserted entries.
if t.first == nil {
t.first = append([]byte{}, k...)
}
if t.last == nil {
t.last = append([]byte{}, k...) // allocate key slice
} else {
t.last = append(t.last[:0], k...) // reuse key slice
}
t.insert(t.root, k, value, nil)
return nil
}

// Reset resets the stack trie object to empty state.
func (t *StackTrie) Reset() {
t.options = NewStackTrieOptions()
t.root = stPool.Get().(*stNode)
t.first = nil
t.last = nil
}

// stNode represents a node within a StackTrie
Expand Down Expand Up @@ -308,7 +347,10 @@ func (t *StackTrie) hash(st *stNode, path []byte) {
// claim an instance until all children are done with their hashing,
// and we actually need one

var blob []byte // RLP-encoded node blob
var (
blob []byte // RLP-encoded node blob
internal [][]byte // List of node paths covered by the extension node
)

switch st.typ {
case branchNode:
Expand All @@ -335,6 +377,15 @@ func (t *StackTrie) hash(st *stNode, path []byte) {
// recursively hash and commit child as the first step
t.hash(st.children[0], append(path, st.key...))

// Collect the path of internal nodes between shortNode and its **in disk**
// child. This is essential in the case of path mode scheme to avoid leaving
// danging nodes within the range of this internal path on disk, which would
// break the guarantee for state healing.
if len(st.children[0].val) >= 32 && t.options.Cleaner != nil {
for i := 1; i < len(st.key); i++ {
internal = append(internal, append(path, st.key[:i]...))
}
}
// encode the extension node
sz := hexToCompactInPlace(st.key)
n := shortNode{Key: st.key[:sz]}
Expand Down Expand Up @@ -381,10 +432,33 @@ func (t *StackTrie) hash(st *stNode, path []byte) {

st.val = t.h.hashData(blob)

// Commit the trie node if the writer is configured.
if t.options.Writer != nil {
t.options.Writer(path, common.BytesToHash(st.val), blob)
// Short circuit if the stack trie is not configured for writing.
if t.options.Writer == nil {
return
}
// Skip committing if the node is on the left boundary and stackTrie is
// configured to filter the boundary.
if t.options.SkipLeftBoundary && bytes.HasPrefix(t.first, path) {
if t.options.boundaryGauge != nil {
t.options.boundaryGauge.Inc(1)
}
return
}
// Skip committing if the node is on the right boundary and stackTrie is
// configured to filter the boundary.
if t.options.SkipRightBoundary && bytes.HasPrefix(t.last, path) {
if t.options.boundaryGauge != nil {
t.options.boundaryGauge.Inc(1)
}
return
}
// Clean up the internal dangling nodes covered by the extension node.
// This should be done before writing the node to adhere to the committing
// order from bottom to top.
for _, path := range internal {
t.options.Cleaner(path)
}
t.options.Writer(path, common.BytesToHash(st.val), blob)
}

// Hash will firstly hash the entire trie if it's still not hashed and then commit
Expand Down
Loading

0 comments on commit 3362b40

Please sign in to comment.