From 10ee5b5ea4ac33270861b386afaac3052e5dc1e9 Mon Sep 17 00:00:00 2001 From: will-2012 <117156346+will-2012@users.noreply.github.com> Date: Thu, 4 Jul 2024 14:55:36 +0800 Subject: [PATCH] perf: speedup pbss trienode read (#122) Co-authored-by: will@2012 --- eth/backend.go | 18 +++- internal/build/util.go | 3 +- trie/triedb/pathdb/database.go | 17 ++-- trie/triedb/pathdb/difflayer.go | 155 ++++++++++++++++++++++++++++++++ trie/triedb/pathdb/disklayer.go | 3 + trie/triedb/pathdb/layertree.go | 53 +++++++++++ trie/triedb/pathdb/metrics.go | 7 ++ 7 files changed, 244 insertions(+), 12 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index c432bd85f0..d5b59984fa 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -59,6 +59,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/trie/triedb/pathdb" ) const ( @@ -139,8 +140,21 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } config.TrieDirtyCache = 0 } - log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024) - + // Optimize memory distribution by reallocating surplus allowance from the + // dirty cache to the clean cache. + if config.StateScheme == rawdb.PathScheme && config.TrieDirtyCache > pathdb.MaxBufferSize/1024/1024 { + log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024, + "adjusted", common.StorageSize(pathdb.MaxBufferSize)) + log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024, + "adjusted", common.StorageSize(config.TrieCleanCache+config.TrieDirtyCache-pathdb.MaxBufferSize/1024/1024)*1024*1024) + config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxBufferSize/1024/1024 + config.TrieDirtyCache = pathdb.MaxBufferSize / 1024 / 1024 + } + log.Info("Allocated memory caches", + "state_scheme", config.StateScheme, + "trie_clean_cache", common.StorageSize(config.TrieCleanCache)*1024*1024, + "trie_dirty_cache", common.StorageSize(config.TrieDirtyCache)*1024*1024, + "snapshot_cache", common.StorageSize(config.SnapshotCache)*1024*1024) // Assemble the Ethereum object chainDb, err := stack.OpenDatabaseWithFreezer(ChainData, config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, ChainDBNamespace, false) diff --git a/internal/build/util.go b/internal/build/util.go index b41014a16f..249ba2810c 100644 --- a/internal/build/util.go +++ b/internal/build/util.go @@ -40,11 +40,12 @@ var DryRunFlag = flag.Bool("n", false, "dry run, don't execute commands") // MustRun executes the given command and exits the host process for // any error. func MustRun(cmd *exec.Cmd) { - fmt.Println(">>>", printArgs(cmd.Args)) + fmt.Println("start_cmd >>>", printArgs(cmd.Args)) if !*DryRunFlag { cmd.Stderr = os.Stderr cmd.Stdout = os.Stdout if err := cmd.Run(); err != nil { + fmt.Println("return_err >>>", err) log.Fatal(err) } } diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index 107523c49c..4139dfc8b3 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -42,11 +42,11 @@ const ( // defaultCleanSize is the default memory allowance of clean cache. defaultCleanSize = 16 * 1024 * 1024 - // maxBufferSize is the maximum memory allowance of node buffer. + // MaxBufferSize is the maximum memory allowance of node buffer. // Too large nodebuffer will cause the system to pause for a long // time when write happens. Also, the largest batch that pebble can // support is 4GB, node will panic if batch size exceeds this limit. - maxBufferSize = 256 * 1024 * 1024 + MaxBufferSize = 256 * 1024 * 1024 // DefaultBufferSize is the default memory allowance of node buffer // that aggregates the writes from above until it's flushed into the @@ -118,10 +118,9 @@ type Config struct { // unreasonable or unworkable. func (c *Config) sanitize() *Config { conf := *c - if conf.DirtyCacheSize > maxBufferSize { - conf.CleanCacheSize = conf.DirtyCacheSize - maxBufferSize - log.Warn("Sanitizing invalid node buffer size", "provided", common.StorageSize(conf.DirtyCacheSize), "updated", common.StorageSize(maxBufferSize)) - conf.DirtyCacheSize = maxBufferSize + if conf.DirtyCacheSize > MaxBufferSize { + log.Warn("Sanitizing invalid node buffer size", "provided", common.StorageSize(conf.DirtyCacheSize), "updated", common.StorageSize(MaxBufferSize)) + conf.DirtyCacheSize = MaxBufferSize } return &conf } @@ -508,9 +507,9 @@ func (db *Database) SetBufferSize(size int) error { db.lock.Lock() defer db.lock.Unlock() - if size > maxBufferSize { - log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(maxBufferSize)) - size = maxBufferSize + if size > MaxBufferSize { + log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(MaxBufferSize)) + size = MaxBufferSize } db.bufferSize = size return db.tree.bottom().setBufferSize(db.bufferSize) diff --git a/trie/triedb/pathdb/difflayer.go b/trie/triedb/pathdb/difflayer.go index 10567715d2..dded3a0b4f 100644 --- a/trie/triedb/pathdb/difflayer.go +++ b/trie/triedb/pathdb/difflayer.go @@ -26,6 +26,106 @@ import ( "github.com/ethereum/go-ethereum/trie/triestate" ) +type RefTrieNode struct { + refCount uint32 + node *trienode.Node +} + +type HashNodeCache struct { + lock sync.RWMutex + cache map[common.Hash]*RefTrieNode +} + +func (h *HashNodeCache) length() int { + if h == nil { + return 0 + } + h.lock.RLock() + defer h.lock.RUnlock() + return len(h.cache) +} + +func (h *HashNodeCache) set(hash common.Hash, node *trienode.Node) { + if h == nil { + return + } + h.lock.Lock() + defer h.lock.Unlock() + if n, ok := h.cache[hash]; ok { + n.refCount++ + } else { + h.cache[hash] = &RefTrieNode{1, node} + } +} + +func (h *HashNodeCache) Get(hash common.Hash) *trienode.Node { + if h == nil { + return nil + } + h.lock.RLock() + defer h.lock.RUnlock() + if n, ok := h.cache[hash]; ok { + return n.node + } + return nil +} + +func (h *HashNodeCache) del(hash common.Hash) { + if h == nil { + return + } + h.lock.Lock() + defer h.lock.Unlock() + n, ok := h.cache[hash] + if !ok { + return + } + if n.refCount > 0 { + n.refCount-- + } + if n.refCount == 0 { + delete(h.cache, hash) + } +} + +func (h *HashNodeCache) Add(ly layer) { + if h == nil { + return + } + dl, ok := ly.(*diffLayer) + if !ok { + return + } + beforeAdd := h.length() + for _, subset := range dl.nodes { + for _, node := range subset { + h.set(node.Hash, node) + } + } + diffHashCacheLengthGauge.Update(int64(h.length())) + log.Debug("Add difflayer to hash map", "root", ly.rootHash(), "block_number", dl.block, "map_len", h.length(), "add_delta", h.length()-beforeAdd) +} + +func (h *HashNodeCache) Remove(ly layer) { + if h == nil { + return + } + dl, ok := ly.(*diffLayer) + if !ok { + return + } + go func() { + beforeDel := h.length() + for _, subset := range dl.nodes { + for _, node := range subset { + h.del(node.Hash) + } + } + diffHashCacheLengthGauge.Update(int64(h.length())) + log.Debug("Remove difflayer from hash map", "root", ly.rootHash(), "block_number", dl.block, "map_len", h.length(), "del_delta", beforeDel-h.length()) + }() +} + // diffLayer represents a collection of modifications made to the in-memory tries // along with associated state changes after running a block on top. // @@ -39,7 +139,10 @@ type diffLayer struct { nodes map[common.Hash]map[string]*trienode.Node // Cached trie nodes indexed by owner and path states *triestate.Set // Associated state change set for building history memory uint64 // Approximate guess as to how much memory we use + cache *HashNodeCache // trienode cache by hash key. cache is immutable, but cache's item can be add/del. + // mutables + origin *diskLayer // The current difflayer corresponds to the underlying disklayer and is updated during cap. parent layer // Parent layer modified by this one, never nil, **can be changed** lock sync.RWMutex // Lock used to protect parent } @@ -58,6 +161,20 @@ func newDiffLayer(parent layer, root common.Hash, id uint64, block uint64, nodes states: states, parent: parent, } + + switch l := parent.(type) { + case *diskLayer: + dl.origin = l + dl.cache = &HashNodeCache{ + cache: make(map[common.Hash]*RefTrieNode), + } + case *diffLayer: + dl.origin = l.originDiskLayer() + dl.cache = l.cache + default: + panic("unknown parent type") + } + for _, subset := range nodes { for path, n := range subset { dl.memory += uint64(n.Size() + len(path)) @@ -75,6 +192,18 @@ func newDiffLayer(parent layer, root common.Hash, id uint64, block uint64, nodes return dl } +func (dl *diffLayer) originDiskLayer() *diskLayer { + dl.lock.RLock() + defer dl.lock.RUnlock() + return dl.origin +} + +func (dl *diffLayer) updateOriginDiskLayer(persistLayer *diskLayer) { + dl.lock.Lock() + defer dl.lock.Unlock() + dl.origin = persistLayer +} + // rootHash implements the layer interface, returning the root hash of // corresponding state. func (dl *diffLayer) rootHash() common.Hash { @@ -133,6 +262,32 @@ func (dl *diffLayer) node(owner common.Hash, path []byte, hash common.Hash, dept // Node implements the layer interface, retrieving the trie node blob with the // provided node information. No error will be returned if the node is not found. func (dl *diffLayer) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) { + if n := dl.cache.Get(hash); n != nil { + // The query from the hash map is fastpath, + // avoiding recursive query of 128 difflayers. + diffHashCacheHitMeter.Mark(1) + diffHashCacheReadMeter.Mark(int64(len(n.Blob))) + return n.Blob, nil + } + diffHashCacheMissMeter.Mark(1) + + persistLayer := dl.originDiskLayer() + if persistLayer != nil { + blob, err := persistLayer.Node(owner, path, hash) + if err != nil { + // This is a bad case with a very low probability. + // r/w the difflayer cache and r/w the disklayer are not in the same lock, + // so in extreme cases, both reading the difflayer cache and reading the disklayer may fail, eg, disklayer is stale. + // In this case, fallback to the original 128-layer recursive difflayer query path. + diffHashCacheSlowPathMeter.Mark(1) + log.Debug("Retry difflayer due to query origin failed", "owner", owner, "path", path, "hash", hash.String(), "error", err) + return dl.node(owner, path, hash, 0) + } else { // This is the fastpath. + return blob, nil + } + } + diffHashCacheSlowPathMeter.Mark(1) + log.Debug("Retry difflayer due to origin is nil", "owner", owner, "path", path, "hash", hash.String()) return dl.node(owner, path, hash, 0) } diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index 8d935e9422..26219ad58d 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -334,6 +334,9 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { } log.Debug("Pruned state history", "items", pruned, "tailid", oldest) } + + // The bottom has been eaten by disklayer, releasing the hash cache of bottom difflayer. + bottom.cache.Remove(bottom) return ndl, nil } diff --git a/trie/triedb/pathdb/layertree.go b/trie/triedb/pathdb/layertree.go index 7c59bffb4d..47f5a034ae 100644 --- a/trie/triedb/pathdb/layertree.go +++ b/trie/triedb/pathdb/layertree.go @@ -51,9 +51,20 @@ func (tree *layerTree) reset(head layer) { tree.lock.Lock() defer tree.lock.Unlock() + for _, ly := range tree.layers { + if dl, ok := ly.(*diffLayer); ok { + // Clean up the hash cache of difflayers due to reset. + dl.cache.Remove(dl) + } + } + var layers = make(map[common.Hash]layer) for head != nil { layers[head.rootHash()] = head + if dl, ok := head.(*diffLayer); ok { + // Add the hash cache of difflayers due to reset. + dl.cache.Add(dl) + } head = head.parentLayer() } tree.layers = layers @@ -98,12 +109,23 @@ func (tree *layerTree) add(root common.Hash, parentRoot common.Hash, block uint6 if root == parentRoot { return errors.New("layer cycle") } + parent := tree.get(parentRoot) if parent == nil { return fmt.Errorf("triedb parent [%#x] layer missing", parentRoot) } l := parent.update(root, parent.stateID()+1, block, nodes.Flatten(), states) + // Before adding layertree, update the hash cache. + l.cache.Add(l) + + if old := tree.get(l.rootHash()); old != nil { + if oldDiff, ok := old.(*diffLayer); ok { + oldDiff.cache.Remove(oldDiff) + log.Info("remove repeated difflayer cache", "root", l.rootHash(), "block_id", l.block) + } + } + tree.lock.Lock() tree.layers[l.rootHash()] = l tree.lock.Unlock() @@ -132,8 +154,15 @@ func (tree *layerTree) cap(root common.Hash, layers int) error { if err != nil { return err } + for _, ly := range tree.layers { + if dl, ok := ly.(*diffLayer); ok { + dl.cache.Remove(dl) + log.Debug("Cleanup difflayer hash cache due to cap all", "diff_root", dl.root.String(), "diff_block_number", dl.block) + } + } // Replace the entire layer tree with the flat base tree.layers = map[common.Hash]layer{base.rootHash(): base} + log.Debug("Cap all difflayers to disklayer", "disk_root", base.rootHash().String()) return nil } // Dive until we run out of layers or reach the persistent database @@ -146,6 +175,7 @@ func (tree *layerTree) cap(root common.Hash, layers int) error { return nil } } + var persisted *diskLayer // We're out of layers, flatten anything below, stopping if it's the disk or if // the memory limit is not yet exceeded. switch parent := diff.parentLayer().(type) { @@ -166,6 +196,7 @@ func (tree *layerTree) cap(root common.Hash, layers int) error { diff.parent = base diff.lock.Unlock() + persisted = base.(*diskLayer) default: panic(fmt.Sprintf("unknown data layer in triedb: %T", parent)) @@ -180,6 +211,13 @@ func (tree *layerTree) cap(root common.Hash, layers int) error { } var remove func(root common.Hash) remove = func(root common.Hash) { + if df, exist := tree.layers[root]; exist { + if dl, ok := df.(*diffLayer); ok { + // Clean up the hash cache of the child difflayer corresponding to the stale parent, include the re-org case. + dl.cache.Remove(dl) + log.Debug("Cleanup difflayer hash cache due to reorg", "diff_root", dl.root.String(), "diff_block_number", dl.block) + } + } delete(tree.layers, root) for _, child := range children[root] { remove(child) @@ -189,8 +227,23 @@ func (tree *layerTree) cap(root common.Hash, layers int) error { for root, layer := range tree.layers { if dl, ok := layer.(*diskLayer); ok && dl.isStale() { remove(root) + log.Debug("Remove stale the disklayer", "disk_root", dl.root.String()) } } + + if persisted != nil { + var updateOriginFunc func(root common.Hash) + updateOriginFunc = func(root common.Hash) { + if diff, ok := tree.layers[root].(*diffLayer); ok { + diff.updateOriginDiskLayer(persisted) + } + for _, child := range children[root] { + updateOriginFunc(child) + } + } + updateOriginFunc(persisted.root) + } + return nil } diff --git a/trie/triedb/pathdb/metrics.go b/trie/triedb/pathdb/metrics.go index ee91e5dc20..59191d5d92 100644 --- a/trie/triedb/pathdb/metrics.go +++ b/trie/triedb/pathdb/metrics.go @@ -61,4 +61,11 @@ var ( baseNodeBufferDifflayerAvgSize = metrics.NewRegisteredGauge("pathdb/basenodebuffer/difflayeravgsize", nil) proposedBlockReaderSuccess = metrics.NewRegisteredMeter("pathdb/nodebufferlist/proposedblockreader/success", nil) proposedBlockReaderMismatch = metrics.NewRegisteredMeter("pathdb/nodebufferlist/proposedblockreader/mismatch", nil) + + // pbss difflayer cache + diffHashCacheHitMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/hit", nil) + diffHashCacheReadMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/read", nil) + diffHashCacheMissMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/miss", nil) + diffHashCacheSlowPathMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/slowpath", nil) + diffHashCacheLengthGauge = metrics.NewRegisteredGauge("pathdb/difflayer/hashcache/size", nil) )