diff --git a/common/types.go b/common/types.go index 0f5340698..7779e800a 100644 --- a/common/types.go +++ b/common/types.go @@ -49,6 +49,9 @@ const ( var ( hashT = reflect.TypeOf(Hash{}) addressT = reflect.TypeOf(Address{}) + + // MaxHash represents the maximum possible hash value. + MaxHash = HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") ) // Hash represents the 32 byte Keccak256 hash of arbitrary data. diff --git a/eth/protocols/snap/metrics.go b/eth/protocols/snap/metrics.go index a8ea143b5..ffc9a6a5f 100644 --- a/eth/protocols/snap/metrics.go +++ b/eth/protocols/snap/metrics.go @@ -8,4 +8,32 @@ var ( IngressRegistrationErrorMeter = metrics.NewRegisteredMeter(ingressRegistrationErrorName, nil) EgressRegistrationErrorMeter = metrics.NewRegisteredMeter(egressRegistrationErrorName, nil) + + // deletionGauge is the metric to track how many trie node deletions + // are performed in total during the sync process. + deletionGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/delete", nil) + + // lookupGauge is the metric to track how many trie node lookups are + // performed to determine if node needs to be deleted. + lookupGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/lookup", nil) + + // boundaryAccountNodesGauge is the metric to track how many boundary trie + // nodes in account trie are met. + boundaryAccountNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/account", nil) + + // boundaryAccountNodesGauge is the metric to track how many boundary trie + // nodes in storage tries are met. + boundaryStorageNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/storage", nil) + + // smallStorageGauge is the metric to track how many storages are small enough + // to retrieved in one or two request. + smallStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/small", nil) + + // largeStorageGauge is the metric to track how many storages are large enough + // to retrieved concurrently. + largeStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/large", nil) + + // skipStorageHealingGauge is the metric to track how many storages are retrieved + // in multiple requests but healing is not necessary. + skipStorageHealingGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/noheal", nil) ) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index d127e8cba..e1d9406f2 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -699,6 +699,19 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { } } +// cleanPath is used to remove the dangling nodes in the stackTrie. +func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) { + if owner == (common.Hash{}) && rawdb.ExistsAccountTrieNode(s.db, path) { + rawdb.DeleteAccountTrieNode(batch, path) + deletionGauge.Inc(1) + } + if owner != (common.Hash{}) && rawdb.ExistsStorageTrieNode(s.db, owner, path) { + rawdb.DeleteStorageTrieNode(batch, owner, path) + deletionGauge.Inc(1) + } + lookupGauge.Inc(1) +} + // loadSyncStatus retrieves a previously aborted sync status from the database, // or generates a fresh one if none is available. func (s *Syncer) loadSyncStatus() { @@ -725,6 +738,17 @@ func (s *Syncer) loadSyncStatus() { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(task.genBatch, common.Hash{}, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + // Configure the dangling node cleaner and also filter out boundary nodes + // only in the context of the path scheme. Deletion is forbidden in the + // hash scheme, as it can disrupt state completeness. + options = options.WithCleaner(func(path []byte) { + s.cleanPath(task.genBatch, common.Hash{}, path) + }) + // Skip the left boundary if it's not the first range. + // Skip the right boundary if it's not the last range. + options = options.WithSkipBoundary(task.Next != (common.Hash{}), task.Last != common.MaxHash, boundaryAccountNodesGauge) + } task.genTrie = trie.NewStackTrie(options) for accountHash, subtasks := range task.SubTasks { @@ -743,6 +767,17 @@ func (s *Syncer) loadSyncStatus() { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + // Configure the dangling node cleaner and also filter out boundary nodes + // only in the context of the path scheme. Deletion is forbidden in the + // hash scheme, as it can disrupt state completeness. + options = options.WithCleaner(func(path []byte) { + s.cleanPath(subtask.genBatch, owner, path) + }) + // Skip the left boundary if it's not the first range. + // Skip the right boundary if it's not the last range. + options = options.WithSkipBoundary(subtask.Next != common.Hash{}, subtask.Last != common.MaxHash, boundaryStorageNodesGauge) + } subtask.genTrie = trie.NewStackTrie(options) } } @@ -796,6 +831,17 @@ func (s *Syncer) loadSyncStatus() { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + // Configure the dangling node cleaner and also filter out boundary nodes + // only in the context of the path scheme. Deletion is forbidden in the + // hash scheme, as it can disrupt state completeness. + options = options.WithCleaner(func(path []byte) { + s.cleanPath(batch, common.Hash{}, path) + }) + // Skip the left boundary if it's not the first range. + // Skip the right boundary if it's not the last range. + options = options.WithSkipBoundary(next != common.Hash{}, last != common.MaxHash, boundaryAccountNodesGauge) + } s.tasks = append(s.tasks, &accountTask{ Next: next, Last: last, @@ -1938,6 +1984,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) { res.mainTask.needState[j] = false res.mainTask.pend-- + smallStorageGauge.Inc(1) } // If the last contract was chunked, mark it as needing healing // to avoid writing it out to disk prematurely. @@ -1973,7 +2020,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks) } r := newHashRange(lastKey, chunks) - + if chunks == 1 { + smallStorageGauge.Inc(1) + } else { + largeStorageGauge.Inc(1) + } // Our first task is the one that was just filled by this response. batch := ethdb.HookedBatch{ Batch: s.db.NewBatch(), @@ -1986,6 +2037,14 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + options = options.WithCleaner(func(path []byte) { + s.cleanPath(batch, owner, path) + }) + // Keep the left boundary as it's the first range. + // Skip the right boundary if it's not the last range. + options = options.WithSkipBoundary(false, r.End() != common.MaxHash, boundaryStorageNodesGauge) + } tasks = append(tasks, &storageTask{ Next: common.Hash{}, Last: r.End(), @@ -2004,6 +2063,17 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + // Configure the dangling node cleaner and also filter out boundary nodes + // only in the context of the path scheme. Deletion is forbidden in the + // hash scheme, as it can disrupt state completeness. + options = options.WithCleaner(func(path []byte) { + s.cleanPath(batch, owner, path) + }) + // Skip the left boundary as it's not the first range + // Skip the right boundary if it's not the last range. + options = options.WithSkipBoundary(true, r.End() != common.MaxHash, boundaryStorageNodesGauge) + } tasks = append(tasks, &storageTask{ Next: r.Start(), Last: r.End(), @@ -2059,6 +2129,17 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { rawdb.WriteTrieNode(batch, account, path, hash, blob, s.scheme) }) + if s.scheme == rawdb.PathScheme { + // Configure the dangling node cleaner only in the context of the + // path scheme. Deletion is forbidden in the hash scheme, as it can + // disrupt state completeness. + // + // Notably, boundary nodes can be also kept because the whole storage + // trie is complete. + options = options.WithCleaner(func(path []byte) { + s.cleanPath(batch, account, path) + }) + } tr := trie.NewStackTrie(options) for j := 0; j < len(res.hashes[i]); j++ { tr.Update(res.hashes[i][j][:], res.slots[i][j]) @@ -2082,16 +2163,24 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { if res.subTask != nil { if res.subTask.done { root := res.subTask.genTrie.Commit() - if root == res.subTask.root { - // If the chunk's root is an overflown but full delivery, clear the heal request + if err := res.subTask.genBatch.Write(); err != nil { + log.Error("Failed to persist stack slots", "err", err) + } + res.subTask.genBatch.Reset() + + // If the chunk's root is an overflown but full delivery, + // clear the heal request. + accountHash := res.accounts[len(res.accounts)-1] + if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db, accountHash, nil, root) { for i, account := range res.mainTask.res.hashes { - if account == res.accounts[len(res.accounts)-1] { + if account == accountHash { res.mainTask.needHeal[i] = false + skipStorageHealingGauge.Inc(1) } } } } - if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize || res.subTask.done { + if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize { if err := res.subTask.genBatch.Write(); err != nil { log.Error("Failed to persist stack slots", "err", err) } diff --git a/trie/iterator_test.go b/trie/iterator_test.go index 240aa2528..f173e3d45 100644 --- a/trie/iterator_test.go +++ b/trie/iterator_test.go @@ -86,6 +86,10 @@ type kv struct { t bool } +func (k *kv) cmp(other *kv) int { + return bytes.Compare(k.k, other.k) +} + func TestIteratorLargeData(t *testing.T) { trie := NewEmpty(NewDatabase(rawdb.NewMemoryDatabase(), nil)) vals := make(map[string]*kv) diff --git a/trie/stacktrie.go b/trie/stacktrie.go index bc8b7ea2f..52c9ff3e3 100644 --- a/trie/stacktrie.go +++ b/trie/stacktrie.go @@ -17,12 +17,14 @@ package trie import ( + "bytes" "fmt" "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" ) var ( @@ -32,7 +34,12 @@ var ( // StackTrieOptions contains the configured options for manipulating the stackTrie. type StackTrieOptions struct { - Writer func(path []byte, hash common.Hash, blob []byte) // The function to commit the dirty nodes + Writer func(path []byte, hash common.Hash, blob []byte) // The function to commit the dirty nodes + Cleaner func(path []byte) // The function to clean up dangling nodes + + SkipLeftBoundary bool // Flag whether the nodes on the left boundary are skipped for committing + SkipRightBoundary bool // Flag whether the nodes on the right boundary are skipped for committing + boundaryGauge metrics.Gauge // Gauge to track how many boundary nodes are met } // NewStackTrieOptions initializes an empty options for stackTrie. @@ -44,6 +51,22 @@ func (o *StackTrieOptions) WithWriter(writer func(path []byte, hash common.Hash, return o } +// WithCleaner configures the cleaner in the option for removing dangling nodes. +func (o *StackTrieOptions) WithCleaner(cleaner func(path []byte)) *StackTrieOptions { + o.Cleaner = cleaner + return o +} + +// WithSkipBoundary configures whether the left and right boundary nodes are +// filtered for committing, along with a gauge metrics to track how many +// boundary nodes are met. +func (o *StackTrieOptions) WithSkipBoundary(skipLeft, skipRight bool, gauge metrics.Gauge) *StackTrieOptions { + o.SkipLeftBoundary = skipLeft + o.SkipRightBoundary = skipRight + o.boundaryGauge = gauge + return o +} + // StackTrie is a trie implementation that expects keys to be inserted // in order. Once it determines that a subtree will no longer be inserted // into, it will hash it and free up the memory it uses. @@ -51,6 +74,9 @@ type StackTrie struct { options *StackTrieOptions root *stNode h *hasher + + first []byte // The (hex-encoded without terminator) key of first inserted entry, tracked as left boundary. + last []byte // The (hex-encoded without terminator) key of last inserted entry, tracked as right boundary. } // NewStackTrie allocates and initializes an empty trie. @@ -77,7 +103,18 @@ func (t *StackTrie) TryUpdate(key, value []byte) error { if len(value) == 0 { panic("deletion not supported") } - t.insert(t.root, k[:len(k)-1], value, nil) + k = k[:len(k)-1] // chop the termination flag + + // track the first and last inserted entries. + if t.first == nil { + t.first = append([]byte{}, k...) + } + if t.last == nil { + t.last = append([]byte{}, k...) // allocate key slice + } else { + t.last = append(t.last[:0], k...) // reuse key slice + } + t.insert(t.root, k, value, nil) return nil } @@ -85,6 +122,8 @@ func (t *StackTrie) TryUpdate(key, value []byte) error { func (t *StackTrie) Reset() { t.options = NewStackTrieOptions() t.root = stPool.Get().(*stNode) + t.first = nil + t.last = nil } // stNode represents a node within a StackTrie @@ -308,7 +347,10 @@ func (t *StackTrie) hash(st *stNode, path []byte) { // claim an instance until all children are done with their hashing, // and we actually need one - var blob []byte // RLP-encoded node blob + var ( + blob []byte // RLP-encoded node blob + internal [][]byte // List of node paths covered by the extension node + ) switch st.typ { case branchNode: @@ -335,6 +377,15 @@ func (t *StackTrie) hash(st *stNode, path []byte) { // recursively hash and commit child as the first step t.hash(st.children[0], append(path, st.key...)) + // Collect the path of internal nodes between shortNode and its **in disk** + // child. This is essential in the case of path mode scheme to avoid leaving + // danging nodes within the range of this internal path on disk, which would + // break the guarantee for state healing. + if len(st.children[0].val) >= 32 && t.options.Cleaner != nil { + for i := 1; i < len(st.key); i++ { + internal = append(internal, append(path, st.key[:i]...)) + } + } // encode the extension node sz := hexToCompactInPlace(st.key) n := shortNode{Key: st.key[:sz]} @@ -381,10 +432,33 @@ func (t *StackTrie) hash(st *stNode, path []byte) { st.val = t.h.hashData(blob) - // Commit the trie node if the writer is configured. - if t.options.Writer != nil { - t.options.Writer(path, common.BytesToHash(st.val), blob) + // Short circuit if the stack trie is not configured for writing. + if t.options.Writer == nil { + return + } + // Skip committing if the node is on the left boundary and stackTrie is + // configured to filter the boundary. + if t.options.SkipLeftBoundary && bytes.HasPrefix(t.first, path) { + if t.options.boundaryGauge != nil { + t.options.boundaryGauge.Inc(1) + } + return + } + // Skip committing if the node is on the right boundary and stackTrie is + // configured to filter the boundary. + if t.options.SkipRightBoundary && bytes.HasPrefix(t.last, path) { + if t.options.boundaryGauge != nil { + t.options.boundaryGauge.Inc(1) + } + return + } + // Clean up the internal dangling nodes covered by the extension node. + // This should be done before writing the node to adhere to the committing + // order from bottom to top. + for _, path := range internal { + t.options.Cleaner(path) } + t.options.Writer(path, common.BytesToHash(st.val), blob) } // Hash will firstly hash the entire trie if it's still not hashed and then commit diff --git a/trie/stacktrie_test.go b/trie/stacktrie_test.go index c33a71584..6f93c1f53 100644 --- a/trie/stacktrie_test.go +++ b/trie/stacktrie_test.go @@ -19,11 +19,14 @@ package trie import ( "bytes" "math/big" + "math/rand" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/trie/testutil" + "golang.org/x/exp/slices" ) func TestStackTrieInsertAndHash(t *testing.T) { @@ -348,3 +351,87 @@ func TestStacktrieNotModifyValues(t *testing.T) { } } + +func buildPartialTree(entries []*kv, t *testing.T) map[string]common.Hash { + var ( + options = NewStackTrieOptions() + nodes = make(map[string]common.Hash) + ) + var ( + first int + last = len(entries) - 1 + + noLeft bool + noRight bool + ) + // Enter split mode if there are at least two elements + if rand.Intn(5) != 0 { + for { + first = rand.Intn(len(entries)) + last = rand.Intn(len(entries)) + if first <= last { + break + } + } + if first != 0 { + noLeft = true + } + if last != len(entries)-1 { + noRight = true + } + } + options = options.WithSkipBoundary(noLeft, noRight, nil) + options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) { + nodes[string(path)] = hash + }) + tr := NewStackTrie(options) + + for i := first; i <= last; i++ { + tr.TryUpdate(entries[i].k, entries[i].v) + } + tr.Commit() + return nodes +} + +func TestPartialStackTrie(t *testing.T) { + for round := 0; round < 100; round++ { + var ( + n = rand.Intn(100) + 1 + entries []*kv + ) + for i := 0; i < n; i++ { + var val []byte + if rand.Intn(3) == 0 { + val = testutil.RandBytes(3) + } else { + val = testutil.RandBytes(32) + } + entries = append(entries, &kv{ + k: testutil.RandBytes(32), + v: val, + }) + } + slices.SortFunc(entries, (*kv).cmp) + + var ( + nodes = make(map[string]common.Hash) + options = NewStackTrieOptions().WithWriter(func(path []byte, hash common.Hash, blob []byte) { + nodes[string(path)] = hash + }) + ) + tr := NewStackTrie(options) + + for i := 0; i < len(entries); i++ { + tr.TryUpdate(entries[i].k, entries[i].v) + } + tr.Commit() + + for j := 0; j < 100; j++ { + for path, hash := range buildPartialTree(entries, t) { + if nodes[path] != hash { + t.Errorf("%v, want %x, got %x", []byte(path), nodes[path], hash) + } + } + } + } +} diff --git a/trie/sync.go b/trie/sync.go index 2b257fe2f..195395f73 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -49,6 +49,18 @@ var ( // lookupGauge is the metric to track how many trie node lookups are // performed to determine if node needs to be deleted. lookupGauge = metrics.NewRegisteredGauge("trie/sync/lookup", nil) + + // accountNodeSyncedGauge is the metric to track how many account trie + // node are written during the sync. + accountNodeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/nodes/account", nil) + + // storageNodeSyncedGauge is the metric to track how many account trie + // node are written during the sync. + storageNodeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/nodes/storage", nil) + + // codeSyncedGauge is the metric to track how many contract codes are + // written during the sync. + codeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/codes", nil) ) // SyncPath is a path tuple identifying a particular trie node either in a single @@ -373,14 +385,26 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error { // storage, returning any occurred error. func (s *Sync) Commit(dbw ethdb.Batch) error { // Flush the pending node writes into database batch. + var ( + account int + storage int + ) for path, value := range s.membatch.nodes { owner, inner := ResolvePath([]byte(path)) + if owner == (common.Hash{}) { + account += 1 + } else { + storage += 1 + } rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme) hash := s.membatch.hashes[path] if s.bloom != nil { s.bloom.Add(hash[:]) } } + accountNodeSyncedGauge.Inc(int64(account)) + storageNodeSyncedGauge.Inc(int64(storage)) + // Flush the pending node deletes into the database batch. // Please note that each written and deleted node has a // unique path, ensuring no duplication occurs. @@ -395,6 +419,8 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { s.bloom.Add(hash[:]) } } + codeSyncedGauge.Inc(int64(len(s.membatch.codes))) + s.membatch = newSyncMemBatch() // reset the batch return nil }