From 51fef19f92eb7eeac46bf26da4bc7f42d1e8b9eb Mon Sep 17 00:00:00 2001 From: Francesco4203 Date: Tue, 21 May 2024 10:37:39 +0700 Subject: [PATCH 1/4] IntermediateRoot: add flag for threshold to update concurrently Divide the root updating of stateObjects into goroutines if number of stateObjects is at least the threshold -> results in 1.55 times faster statedb_test.go/TestIntermediateUpdateConcurrently: add test to check if the states after processed with both options are identical --- cmd/ronin/main.go | 1 + cmd/utils/flags.go | 7 ++++ core/blockchain.go | 4 ++ core/state/statedb.go | 40 ++++++++++++++++++++ core/state/statedb_test.go | 76 ++++++++++++++++++++++++++++++++++++++ eth/backend.go | 21 ++++++----- eth/ethconfig/config.go | 3 ++ 7 files changed, 142 insertions(+), 10 deletions(-) diff --git a/cmd/ronin/main.go b/cmd/ronin/main.go index 811b631b4d..baaa04f9a4 100644 --- a/cmd/ronin/main.go +++ b/cmd/ronin/main.go @@ -178,6 +178,7 @@ var ( utils.DisableRoninProtocol, utils.AdditionalChainEventFlag, utils.DBEngineFlag, + utils.ConcurrentUpdateThresholdFlag, } rpcFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index a7c3b246fe..a37eea05a4 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1073,6 +1073,13 @@ var ( Usage: "List of mock bls public keys which are reflect 1:1 with mock.validators", Category: flags.MockCategory, } + + ConcurrentUpdateThresholdFlag = &cli.IntFlag{ + Name: "concurrent-update-threshold", + Usage: "The threshold of concurrent update", + Value: 0, // disable concurrent update by default + Category: flags.EthCategory, + } ) // MakeDataDir retrieves the currently requested data directory, terminating diff --git a/core/blockchain.go b/core/blockchain.go index 9d123fc9e8..17d066cfe7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -139,6 +139,9 @@ type CacheConfig struct { TriesInMemory int // The number of tries is kept in memory before pruning SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it + + // Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable + ConcurrentUpdateThreshold int } // defaultCacheConfig are the default caching values if none are specified by the @@ -1814,6 +1817,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) + statedb.ConcurrentUpdateThreshold = bc.cacheConfig.ConcurrentUpdateThreshold if err != nil { return it.index, err } diff --git a/core/state/statedb.go b/core/state/statedb.go index 014e8a0891..227a7a2d3c 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -21,7 +21,9 @@ import ( "errors" "fmt" "math/big" + "runtime" "sort" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -122,6 +124,9 @@ type StateDB struct { StorageUpdated int AccountDeleted int StorageDeleted int + + // Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable + ConcurrentUpdateThreshold int } // New creates a new state from a given trie. @@ -855,11 +860,46 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // the account prefetcher. Instead, let's process all the storage updates // first, giving the account prefeches just a few more milliseconds of time // to pull useful data from disk. + + // Get the stateObjects needed to be updated + updateObjs := []*stateObject{} for addr := range s.stateObjectsPending { if obj := s.stateObjects[addr]; !obj.deleted { + updateObjs = append(updateObjs, obj) + } + } + + if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 { + // Update the state objects sequentially + for _, obj := range updateObjs { obj.updateRoot(s.db) } + } else { + // Declare min function + min := func(a, b int) int { + if a < b { + return a + } + return b + } + // Update the state objects using goroutines, with maximum of NumCPU goroutines + nRoutines := min(runtime.NumCPU(), len(updateObjs)) + if nRoutines != 0 { + nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines + wg := sync.WaitGroup{} + wg.Add(nRoutines) + for i := 0; i < len(updateObjs); i += nObjPerRoutine { + go func(objs []*stateObject) { + defer wg.Done() + for _, obj := range objs { + obj.updateRoot(s.db) + } + }(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))]) + } + wg.Wait() + } } + // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie // which has the same root, but also has some content loaded into it. diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index e9576d4dc4..69afe80e08 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -28,6 +28,7 @@ import ( "sync" "testing" "testing/quick" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" @@ -915,3 +916,78 @@ func TestStateDBAccessList(t *testing.T) { t.Fatalf("expected empty, got %d", got) } } + +func TestIntermediateUpdateConcurrently(t *testing.T) { + rng := rand.New(rand.NewSource(time.Now().Unix())) + // Create an empty state + db1 := rawdb.NewMemoryDatabase() + db2 := rawdb.NewMemoryDatabase() + state1, _ := New(common.Hash{}, NewDatabase(db1), nil) + state2, _ := New(common.Hash{}, NewDatabase(db2), nil) + + // Update it with random data + for i := int64(0); i < 1000; i++ { + addr := common.BigToAddress(big.NewInt(i)) + balance := big.NewInt(int64(rng.Int63())) + nonce := rng.Uint64() + key := common.BigToHash(big.NewInt(int64(rng.Int63()))) + value := common.BigToHash(big.NewInt(int64(rng.Int63()))) + code := []byte{byte(rng.Uint64()), byte(rng.Uint64()), byte(rng.Uint64())} + state1.SetBalance(addr, balance) + state2.SetBalance(addr, balance) + state1.SetNonce(addr, nonce) + state2.SetNonce(addr, nonce) + state1.SetState(addr, key, value) + state2.SetState(addr, key, value) + state1.SetCode(addr, code) + state2.SetCode(addr, code) + } + + state1.ConcurrentUpdateThreshold = 0 + state2.ConcurrentUpdateThreshold = 1 + + state1.IntermediateRoot(false) // sequential + state2.IntermediateRoot(false) // concurrent + + root1, err1 := state1.Commit(false) + root2, err2 := state2.Commit(false) + + if err1 != nil { + t.Fatalf("sequential commit failed: %v", err1) + } + if err1 = state1.Database().TrieDB().Commit(root1, false, nil); err1 != nil { + t.Errorf("cannot commit trie %v to persistent database", root1.Hex()) + } + if err2 != nil { + t.Fatalf("concurrent commit failed: %v", err2) + } + if err2 = state2.Database().TrieDB().Commit(root2, false, nil); err2 != nil { + t.Errorf("cannot commit trie %v to persistent database", root2.Hex()) + } + + it1 := db1.NewIterator(nil, nil) + it2 := db2.NewIterator(nil, nil) + for it1.Next() { + if !it2.Next() { + t.Fatalf("concurrent iterator ended prematurely") + } + if !bytes.Equal(it1.Key(), it2.Key()) { + t.Fatalf("concurrent iterator key mismatch: " + string(it1.Key()) + " != " + string(it2.Key())) + } + if !bytes.Equal(it1.Value(), it2.Value()) { + t.Fatalf("concurrent iterator value mismatch: " + string(it1.Value()) + " != " + string(it2.Value())) + } + } + if it1.Error() != nil { + t.Fatalf("sequential iterator error: %v", it1.Error()) + } + if it2.Error() != nil { + t.Fatalf("concurrent iterator error: %v", it2.Error()) + } + if it1.Next() { + t.Fatalf("sequential iterator has extra data") + } + if it2.Next() { + t.Fatalf("concurrent iterator has extra data") + } +} diff --git a/eth/backend.go b/eth/backend.go index 4f8c10ec45..060a889ec6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -187,16 +187,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { EnablePreimageRecording: config.EnablePreimageRecording, } cacheConfig = &core.CacheConfig{ - TrieCleanLimit: config.TrieCleanCache, - TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal), - TrieCleanRejournal: config.TrieCleanCacheRejournal, - TrieCleanNoPrefetch: config.NoPrefetch, - TrieDirtyLimit: config.TrieDirtyCache, - TrieDirtyDisabled: config.NoPruning, - TrieTimeLimit: config.TrieTimeout, - SnapshotLimit: config.SnapshotCache, - Preimages: config.Preimages, - TriesInMemory: config.TriesInMemory, + TrieCleanLimit: config.TrieCleanCache, + TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal), + TrieCleanRejournal: config.TrieCleanCacheRejournal, + TrieCleanNoPrefetch: config.NoPrefetch, + TrieDirtyLimit: config.TrieDirtyCache, + TrieDirtyDisabled: config.NoPruning, + TrieTimeLimit: config.TrieTimeout, + SnapshotLimit: config.SnapshotCache, + Preimages: config.Preimages, + TriesInMemory: config.TriesInMemory, + ConcurrentUpdateThreshold: config.ConcurrentUpdateThreshold, } ) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 5681bea965..d3c5d42d7c 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -222,6 +222,9 @@ type Config struct { // Send additional chain event EnableAdditionalChainEvent bool + + // Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable + ConcurrentUpdateThreshold int } // CreateConsensusEngine creates a consensus engine for the given chain configuration. From 08fd7c310f3d81dda92fa54e27bb7c144c29cd91 Mon Sep 17 00:00:00 2001 From: Francesco4203 Date: Wed, 22 May 2024 17:58:08 +0700 Subject: [PATCH 2/4] intermediateRoot: update concurrency handling statedb/intermediateRoot: update options for concurrent: get changes that are not made yet to run concurrently state_object/updateTrie: add option for concurrency: store and return data waiting for update --- cmd/utils/flags.go | 4 ++ core/state/state_object.go | 56 ++++++++++++++++++----- core/state/statedb.go | 91 +++++++++++++++++++++++++++++++++----- core/state/statedb_test.go | 45 ++----------------- 4 files changed, 133 insertions(+), 63 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index a37eea05a4..85e9548539 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -2036,6 +2036,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.Bool(MonitorFinalityVoteFlag.Name) { cfg.EnableMonitorFinalityVote = true } + // Set concurrent update threshold + if ctx.IsSet(ConcurrentUpdateThresholdFlag.Name) { + cfg.ConcurrentUpdateThreshold = ctx.Int(ConcurrentUpdateThresholdFlag.Name) + } } // SetDNSDiscoveryDefaults configures DNS discovery with the given URL if diff --git a/core/state/state_object.go b/core/state/state_object.go index 138fcbdecd..dc1de32c23 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -319,11 +319,17 @@ func (s *stateObject) finalise(prefetch bool) { // updateTrie writes cached storage modifications into the object's storage trie. // It will return nil if the trie has not been loaded and no changes have been made -func (s *stateObject) updateTrie(db Database) Trie { +func (s *stateObject) updateTrie(db Database, concurrent bool) (Trie, []struct { + s *stateObject + key common.Hash + value common.Hash + v []byte + tr Trie +}) { // Make sure all dirty slots are finalized into the pending storage area s.finalise(false) // Don't prefetch anymore, pull directly if need be if len(s.pendingStorage) == 0 { - return s.trie + return s.trie, nil } // Track the amount of time wasted on updating the storage trie if metrics.EnabledExpensive { @@ -334,6 +340,14 @@ func (s *stateObject) updateTrie(db Database) Trie { // Insert all the pending updates into the trie tr := s.getTrie(db) hasher := s.db.hasher + // Storage saved for later update + var savedStorage []struct { + s *stateObject + key common.Hash + value common.Hash + v []byte + tr Trie + } usedStorage := make([][]byte, 0, len(s.pendingStorage)) for key, value := range s.pendingStorage { @@ -344,15 +358,33 @@ func (s *stateObject) updateTrie(db Database) Trie { s.originStorage[key] = value var v []byte - if (value == common.Hash{}) { - s.setError(tr.TryDelete(key[:])) - s.db.StorageDeleted += 1 + + if concurrent { + if (value == common.Hash{}) { + s.db.StorageDeleted += 1 + } else { + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + s.db.StorageUpdated += 1 + } + savedStorage = append(savedStorage, struct { + s *stateObject + key common.Hash + value common.Hash + v []byte + tr Trie + }{s, key, value, v, tr}) } else { - // Encoding []byte cannot fail, ok to ignore the error. - v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) - s.setError(tr.TryUpdate(key[:], v)) - s.db.StorageUpdated += 1 + if (value == common.Hash{}) { + s.setError(tr.TryDelete(key[:])) + s.db.StorageDeleted += 1 + } else { + // Encoding []byte cannot fail, ok to ignore the error. + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + s.setError(tr.TryUpdate(key[:], v)) + s.db.StorageUpdated += 1 + } } + // If state snapshotting is active, cache the data til commit if s.db.snap != nil { if storage == nil { @@ -372,13 +404,13 @@ func (s *stateObject) updateTrie(db Database) Trie { if len(s.pendingStorage) > 0 { s.pendingStorage = make(Storage) } - return tr + return tr, savedStorage } // UpdateRoot sets the trie root to the current root hash of func (s *stateObject) updateRoot(db Database) { // If nothing changed, don't bother with hashing anything - if s.updateTrie(db) == nil { + if trie, _ := s.updateTrie(db, false); trie == nil { return } // Track the amount of time wasted on hashing the storage trie @@ -392,7 +424,7 @@ func (s *stateObject) updateRoot(db Database) { // This updates the trie root. func (s *stateObject) CommitTrie(db Database) (int, error) { // If nothing changed, don't bother with hashing anything - if s.updateTrie(db) == nil { + if trie, _ := s.updateTrie(db, false); trie == nil { return 0, nil } if s.dbErr != nil { diff --git a/core/state/statedb.go b/core/state/statedb.go index 227a7a2d3c..61c728cbec 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -363,7 +363,7 @@ func (s *StateDB) StorageTrie(addr common.Address) Trie { return nil } cpy := stateObject.deepCopy(s) - cpy.updateTrie(s.db) + cpy.updateTrie(s.db, false) return cpy.getTrie(s.db) } @@ -875,30 +875,101 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { obj.updateRoot(s.db) } } else { - // Declare min function + var pendingStorage []struct { + s *stateObject + key common.Hash + value common.Hash + v []byte + tr Trie + } + var pendingRootUpdate []*stateObject + + for addr := range s.stateObjectsPending { + if obj := s.stateObjects[addr]; !obj.deleted { + trie, savedStorage := obj.updateTrie(s.db, true) + pendingStorage = append(pendingStorage, savedStorage...) + if trie != nil { + pendingRootUpdate = append(pendingRootUpdate, obj) + } + } + } + + // Update the account trie min := func(a, b int) int { if a < b { return a } return b } - // Update the state objects using goroutines, with maximum of NumCPU goroutines - nRoutines := min(runtime.NumCPU(), len(updateObjs)) + nRoutines := min(runtime.NumCPU(), len(pendingStorage)) if nRoutines != 0 { - nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines + nUpdatePerRoutine := (len(pendingStorage) + nRoutines - 1) / nRoutines wg := sync.WaitGroup{} wg.Add(nRoutines) - for i := 0; i < len(updateObjs); i += nObjPerRoutine { - go func(objs []*stateObject) { + for i := 0; i < len(pendingStorage); i += nUpdatePerRoutine { + go func(stores []struct { + s *stateObject + key common.Hash + value common.Hash + v []byte + tr Trie + }) { defer wg.Done() - for _, obj := range objs { - obj.updateRoot(s.db) + for _, store := range stores { + if (store.value == common.Hash{}) { + s.setError(store.tr.TryDelete(store.key[:])) + } else { + // Encoding []byte cannot fail, ok to ignore the error. + store.v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(store.value[:])) + s.setError(store.tr.TryUpdate(store.key[:], store.v)) + } } - }(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))]) + }(pendingStorage[i:min(i+nUpdatePerRoutine, len(pendingStorage))]) } wg.Wait() } + for _, obj := range pendingRootUpdate { + obj.data.Root = obj.trie.Hash() + } } + // // Get the stateObjects needed to be updated + // updateObjs := []*stateObject{} + // for addr := range s.stateObjectsPending { + // if obj := s.stateObjects[addr]; !obj.deleted { + // updateObjs = append(updateObjs, obj) + // } + // } + + // if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 { + // // Update the state objects sequentially + // for _, obj := range updateObjs { + // obj.updateRoot(s.db) + // } + // } else { + // // Declare min function + // min := func(a, b int) int { + // if a < b { + // return a + // } + // return b + // } + // // Update the state objects using goroutines, with maximum of NumCPU goroutines + // nRoutines := min(runtime.NumCPU(), len(updateObjs)) + // if nRoutines != 0 { + // nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines + // wg := sync.WaitGroup{} + // wg.Add(nRoutines) + // for i := 0; i < len(updateObjs); i += nObjPerRoutine { + // go func(objs []*stateObject) { + // defer wg.Done() + // for _, obj := range objs { + // obj.updateRoot(s.db) + // } + // }(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))]) + // } + // wg.Wait() + // } + // } // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 69afe80e08..d8adc91775 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -946,48 +946,11 @@ func TestIntermediateUpdateConcurrently(t *testing.T) { state1.ConcurrentUpdateThreshold = 0 state2.ConcurrentUpdateThreshold = 1 - state1.IntermediateRoot(false) // sequential - state2.IntermediateRoot(false) // concurrent + root1 := state1.IntermediateRoot(false) // sequential + root2 := state2.IntermediateRoot(false) // concurrent - root1, err1 := state1.Commit(false) - root2, err2 := state2.Commit(false) - - if err1 != nil { - t.Fatalf("sequential commit failed: %v", err1) - } - if err1 = state1.Database().TrieDB().Commit(root1, false, nil); err1 != nil { - t.Errorf("cannot commit trie %v to persistent database", root1.Hex()) - } - if err2 != nil { - t.Fatalf("concurrent commit failed: %v", err2) - } - if err2 = state2.Database().TrieDB().Commit(root2, false, nil); err2 != nil { - t.Errorf("cannot commit trie %v to persistent database", root2.Hex()) + if root1 != root2 { + t.Fatalf("intermediate roots mismatch: %v != %v", root1.Hex(), root2.Hex()) } - it1 := db1.NewIterator(nil, nil) - it2 := db2.NewIterator(nil, nil) - for it1.Next() { - if !it2.Next() { - t.Fatalf("concurrent iterator ended prematurely") - } - if !bytes.Equal(it1.Key(), it2.Key()) { - t.Fatalf("concurrent iterator key mismatch: " + string(it1.Key()) + " != " + string(it2.Key())) - } - if !bytes.Equal(it1.Value(), it2.Value()) { - t.Fatalf("concurrent iterator value mismatch: " + string(it1.Value()) + " != " + string(it2.Value())) - } - } - if it1.Error() != nil { - t.Fatalf("sequential iterator error: %v", it1.Error()) - } - if it2.Error() != nil { - t.Fatalf("concurrent iterator error: %v", it2.Error()) - } - if it1.Next() { - t.Fatalf("sequential iterator has extra data") - } - if it2.Next() { - t.Fatalf("concurrent iterator has extra data") - } } From f1c23c947e01c298bef92896bdfa06d99d14a358 Mon Sep 17 00:00:00 2001 From: Francesco4203 Date: Mon, 27 May 2024 11:10:43 +0700 Subject: [PATCH 3/4] updateTrieConcurrent: divide updateTrie into 2 parts, the 2nd part can be run concurrently --- core/state/state_object.go | 91 +++++++++++++------------ core/state/statedb.go | 132 ++++++++++--------------------------- 2 files changed, 83 insertions(+), 140 deletions(-) diff --git a/core/state/state_object.go b/core/state/state_object.go index dc1de32c23..78d7e64b61 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -319,17 +319,11 @@ func (s *stateObject) finalise(prefetch bool) { // updateTrie writes cached storage modifications into the object's storage trie. // It will return nil if the trie has not been loaded and no changes have been made -func (s *stateObject) updateTrie(db Database, concurrent bool) (Trie, []struct { - s *stateObject - key common.Hash - value common.Hash - v []byte - tr Trie -}) { +func (s *stateObject) updateTrie(db Database, concurrent bool) Trie { // Make sure all dirty slots are finalized into the pending storage area s.finalise(false) // Don't prefetch anymore, pull directly if need be if len(s.pendingStorage) == 0 { - return s.trie, nil + return s.trie } // Track the amount of time wasted on updating the storage trie if metrics.EnabledExpensive { @@ -338,53 +332,34 @@ func (s *stateObject) updateTrie(db Database, concurrent bool) (Trie, []struct { // The snapshot storage map for the object var storage map[common.Hash][]byte // Insert all the pending updates into the trie - tr := s.getTrie(db) - hasher := s.db.hasher - // Storage saved for later update - var savedStorage []struct { - s *stateObject - key common.Hash - value common.Hash - v []byte - tr Trie + var tr Trie = nil + if !concurrent { + tr = s.getTrie(db) } - + hasher := s.db.hasher usedStorage := make([][]byte, 0, len(s.pendingStorage)) for key, value := range s.pendingStorage { // Skip noop changes, persist actual changes if value == s.originStorage[key] { continue } - s.originStorage[key] = value - + if !concurrent { + s.originStorage[key] = value + } var v []byte - - if concurrent { - if (value == common.Hash{}) { + if (value == common.Hash{}) { + if !concurrent { + s.setError(tr.TryDelete(key[:])) s.db.StorageDeleted += 1 - } else { - v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) - s.db.StorageUpdated += 1 } - savedStorage = append(savedStorage, struct { - s *stateObject - key common.Hash - value common.Hash - v []byte - tr Trie - }{s, key, value, v, tr}) } else { - if (value == common.Hash{}) { - s.setError(tr.TryDelete(key[:])) - s.db.StorageDeleted += 1 - } else { - // Encoding []byte cannot fail, ok to ignore the error. - v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + // Encoding []byte cannot fail, ok to ignore the error. + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + if !concurrent { s.setError(tr.TryUpdate(key[:], v)) s.db.StorageUpdated += 1 } } - // If state snapshotting is active, cache the data til commit if s.db.snap != nil { if storage == nil { @@ -401,16 +376,46 @@ func (s *stateObject) updateTrie(db Database, concurrent bool) (Trie, []struct { if s.db.prefetcher != nil { s.db.prefetcher.used(s.data.Root, usedStorage) } + if !concurrent { + if len(s.pendingStorage) > 0 { + s.pendingStorage = make(Storage) + } + } + return tr +} + +func (s *stateObject) updateTrieConcurrent(db Database) { + tr := s.getTrie(db) + for key, value := range s.pendingStorage { + // Skip noop changes, persist actual changes + if value == s.originStorage[key] { + continue + } + s.originStorage[key] = value + + var v []byte + + if (value == common.Hash{}) { + s.setError(tr.TryDelete(key[:])) + s.db.StorageDeleted += 1 + } else { + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + s.setError(tr.TryUpdate(key[:], v)) + s.db.StorageUpdated += 1 + } + } if len(s.pendingStorage) > 0 { s.pendingStorage = make(Storage) } - return tr, savedStorage + if tr != nil { + s.data.Root = s.trie.Hash() + } } // UpdateRoot sets the trie root to the current root hash of func (s *stateObject) updateRoot(db Database) { // If nothing changed, don't bother with hashing anything - if trie, _ := s.updateTrie(db, false); trie == nil { + if s.updateTrie(db, false) == nil { return } // Track the amount of time wasted on hashing the storage trie @@ -424,7 +429,7 @@ func (s *stateObject) updateRoot(db Database) { // This updates the trie root. func (s *stateObject) CommitTrie(db Database) (int, error) { // If nothing changed, don't bother with hashing anything - if trie, _ := s.updateTrie(db, false); trie == nil { + if s.updateTrie(db, false) == nil { return 0, nil } if s.dbErr != nil { diff --git a/core/state/statedb.go b/core/state/statedb.go index 61c728cbec..5d19c81bc3 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -127,6 +127,9 @@ type StateDB struct { // Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable ConcurrentUpdateThreshold int + + // Lock for concurrent access + lock sync.RWMutex } // New creates a new state from a given trie. @@ -861,115 +864,50 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // first, giving the account prefeches just a few more milliseconds of time // to pull useful data from disk. + // ---------------------------------------------- DEBUGGING ------------------------------------------------------------------------------------------- + // Get the stateObjects needed to be updated updateObjs := []*stateObject{} for addr := range s.stateObjectsPending { if obj := s.stateObjects[addr]; !obj.deleted { updateObjs = append(updateObjs, obj) + obj.updateTrie(s.db, true) } } - - if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 { - // Update the state objects sequentially - for _, obj := range updateObjs { - obj.updateRoot(s.db) + min := func(a, b int) int { + if a < b { + return a } - } else { - var pendingStorage []struct { - s *stateObject - key common.Hash - value common.Hash - v []byte - tr Trie - } - var pendingRootUpdate []*stateObject - - for addr := range s.stateObjectsPending { - if obj := s.stateObjects[addr]; !obj.deleted { - trie, savedStorage := obj.updateTrie(s.db, true) - pendingStorage = append(pendingStorage, savedStorage...) - if trie != nil { - pendingRootUpdate = append(pendingRootUpdate, obj) - } + return b + } + nGoroutines := min(runtime.NumCPU(), len(updateObjs)) + if nGoroutines != 0 { + nObjectsPerRoutine := len(updateObjs) / nGoroutines + nObjectsRemaining := len(updateObjs) % nGoroutines + wg := sync.WaitGroup{} + wg.Add(nGoroutines) + i := 0 + for { + nObjects := nObjectsPerRoutine + if nObjectsRemaining > 0 { + nObjects++ + nObjectsRemaining-- } - } - - // Update the account trie - min := func(a, b int) int { - if a < b { - return a - } - return b - } - nRoutines := min(runtime.NumCPU(), len(pendingStorage)) - if nRoutines != 0 { - nUpdatePerRoutine := (len(pendingStorage) + nRoutines - 1) / nRoutines - wg := sync.WaitGroup{} - wg.Add(nRoutines) - for i := 0; i < len(pendingStorage); i += nUpdatePerRoutine { - go func(stores []struct { - s *stateObject - key common.Hash - value common.Hash - v []byte - tr Trie - }) { - defer wg.Done() - for _, store := range stores { - if (store.value == common.Hash{}) { - s.setError(store.tr.TryDelete(store.key[:])) - } else { - // Encoding []byte cannot fail, ok to ignore the error. - store.v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(store.value[:])) - s.setError(store.tr.TryUpdate(store.key[:], store.v)) - } - } - }(pendingStorage[i:min(i+nUpdatePerRoutine, len(pendingStorage))]) + go func(objs []*stateObject) { + defer wg.Done() + for _, obj := range objs { + obj.updateTrieConcurrent(s.db) + } + }(updateObjs[i : i+nObjects]) + i += nObjects + if i == len(updateObjs) { + break } - wg.Wait() - } - for _, obj := range pendingRootUpdate { - obj.data.Root = obj.trie.Hash() } + wg.Wait() } - // // Get the stateObjects needed to be updated - // updateObjs := []*stateObject{} - // for addr := range s.stateObjectsPending { - // if obj := s.stateObjects[addr]; !obj.deleted { - // updateObjs = append(updateObjs, obj) - // } - // } - - // if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 { - // // Update the state objects sequentially - // for _, obj := range updateObjs { - // obj.updateRoot(s.db) - // } - // } else { - // // Declare min function - // min := func(a, b int) int { - // if a < b { - // return a - // } - // return b - // } - // // Update the state objects using goroutines, with maximum of NumCPU goroutines - // nRoutines := min(runtime.NumCPU(), len(updateObjs)) - // if nRoutines != 0 { - // nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines - // wg := sync.WaitGroup{} - // wg.Add(nRoutines) - // for i := 0; i < len(updateObjs); i += nObjPerRoutine { - // go func(objs []*stateObject) { - // defer wg.Done() - // for _, obj := range objs { - // obj.updateRoot(s.db) - // } - // }(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))]) - // } - // wg.Wait() - // } - // } + + // ---------------------------------------------- DEBUGGING ------------------------------------------------------------------------------------------- // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie From bb5526e746cafa700e6252861f5f6902a99107da Mon Sep 17 00:00:00 2001 From: Francesco4203 Date: Mon, 27 May 2024 16:30:12 +0700 Subject: [PATCH 4/4] IntermediateRoot: select sequential/concurrent trie update based on flag statedb.go/StateDB struct: remove lock since it will not be used for concurrent update statedb.go/IntermediateRoot: check ConcurrentUpdateThreshold for choosing between sequential and concurrent update state_object.go: change "concurrent" variable to a more reasonable name "skipTrieUpdate" flags.go/MakeChain: add flag when creating a chain --- cmd/utils/flags.go | 15 ++++---- core/state/state_object.go | 20 +++++------ core/state/statedb.go | 73 ++++++++++++++++++++------------------ 3 files changed, 55 insertions(+), 53 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 85e9548539..5e614e51b3 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -2261,13 +2261,14 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name) } cache := &core.CacheConfig{ - TrieCleanLimit: ethconfig.Defaults.TrieCleanCache, - TrieCleanNoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name), - TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache, - TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive", - TrieTimeLimit: ethconfig.Defaults.TrieTimeout, - SnapshotLimit: ethconfig.Defaults.SnapshotCache, - Preimages: ctx.Bool(CachePreimagesFlag.Name), + TrieCleanLimit: ethconfig.Defaults.TrieCleanCache, + TrieCleanNoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name), + TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache, + TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive", + TrieTimeLimit: ethconfig.Defaults.TrieTimeout, + SnapshotLimit: ethconfig.Defaults.SnapshotCache, + Preimages: ctx.Bool(CachePreimagesFlag.Name), + ConcurrentUpdateThreshold: ctx.Int(ConcurrentUpdateThresholdFlag.Name), } if cache.TrieDirtyDisabled && !cache.Preimages { cache.Preimages = true diff --git a/core/state/state_object.go b/core/state/state_object.go index 78d7e64b61..6ba299a7fe 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -319,7 +319,7 @@ func (s *stateObject) finalise(prefetch bool) { // updateTrie writes cached storage modifications into the object's storage trie. // It will return nil if the trie has not been loaded and no changes have been made -func (s *stateObject) updateTrie(db Database, concurrent bool) Trie { +func (s *stateObject) updateTrie(db Database, skipTrieUpdate bool) Trie { // Make sure all dirty slots are finalized into the pending storage area s.finalise(false) // Don't prefetch anymore, pull directly if need be if len(s.pendingStorage) == 0 { @@ -332,8 +332,8 @@ func (s *stateObject) updateTrie(db Database, concurrent bool) Trie { // The snapshot storage map for the object var storage map[common.Hash][]byte // Insert all the pending updates into the trie - var tr Trie = nil - if !concurrent { + var tr Trie + if !skipTrieUpdate { tr = s.getTrie(db) } hasher := s.db.hasher @@ -343,22 +343,22 @@ func (s *stateObject) updateTrie(db Database, concurrent bool) Trie { if value == s.originStorage[key] { continue } - if !concurrent { + if !skipTrieUpdate { s.originStorage[key] = value } var v []byte if (value == common.Hash{}) { - if !concurrent { + if !skipTrieUpdate { s.setError(tr.TryDelete(key[:])) - s.db.StorageDeleted += 1 } + s.db.StorageDeleted += 1 } else { // Encoding []byte cannot fail, ok to ignore the error. v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) - if !concurrent { + if !skipTrieUpdate { s.setError(tr.TryUpdate(key[:], v)) - s.db.StorageUpdated += 1 } + s.db.StorageUpdated += 1 } // If state snapshotting is active, cache the data til commit if s.db.snap != nil { @@ -376,7 +376,7 @@ func (s *stateObject) updateTrie(db Database, concurrent bool) Trie { if s.db.prefetcher != nil { s.db.prefetcher.used(s.data.Root, usedStorage) } - if !concurrent { + if !skipTrieUpdate { if len(s.pendingStorage) > 0 { s.pendingStorage = make(Storage) } @@ -397,11 +397,9 @@ func (s *stateObject) updateTrieConcurrent(db Database) { if (value == common.Hash{}) { s.setError(tr.TryDelete(key[:])) - s.db.StorageDeleted += 1 } else { v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) s.setError(tr.TryUpdate(key[:], v)) - s.db.StorageUpdated += 1 } } if len(s.pendingStorage) > 0 { diff --git a/core/state/statedb.go b/core/state/statedb.go index 5d19c81bc3..854b97396c 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -127,9 +127,6 @@ type StateDB struct { // Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable ConcurrentUpdateThreshold int - - // Lock for concurrent access - lock sync.RWMutex } // New creates a new state from a given trie. @@ -864,51 +861,57 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // first, giving the account prefeches just a few more milliseconds of time // to pull useful data from disk. - // ---------------------------------------------- DEBUGGING ------------------------------------------------------------------------------------------- - // Get the stateObjects needed to be updated updateObjs := []*stateObject{} for addr := range s.stateObjectsPending { if obj := s.stateObjects[addr]; !obj.deleted { updateObjs = append(updateObjs, obj) - obj.updateTrie(s.db, true) } } - min := func(a, b int) int { - if a < b { - return a + + if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 { + // Update the state trie sequentially + for _, obj := range updateObjs { + obj.updateRoot(s.db) } - return b - } - nGoroutines := min(runtime.NumCPU(), len(updateObjs)) - if nGoroutines != 0 { - nObjectsPerRoutine := len(updateObjs) / nGoroutines - nObjectsRemaining := len(updateObjs) % nGoroutines - wg := sync.WaitGroup{} - wg.Add(nGoroutines) - i := 0 - for { - nObjects := nObjectsPerRoutine - if nObjectsRemaining > 0 { - nObjects++ - nObjectsRemaining-- - } - go func(objs []*stateObject) { - defer wg.Done() - for _, obj := range objs { - obj.updateTrieConcurrent(s.db) + } else { + // Update the state trie concurrently + for _, obj := range updateObjs { + obj.updateTrie(s.db, true) + } + + nGoroutines := runtime.NumCPU() + if nGoroutines > len(updateObjs) { + nGoroutines = len(updateObjs) + } + + if nGoroutines != 0 { + nObjectsPerRoutine := len(updateObjs) / nGoroutines + nObjectsRemaining := len(updateObjs) % nGoroutines + wg := sync.WaitGroup{} + wg.Add(nGoroutines) + i := 0 + for { + nObjects := nObjectsPerRoutine + if nObjectsRemaining > 0 { + nObjects++ + nObjectsRemaining-- + } + go func(objs []*stateObject) { + defer wg.Done() + for _, obj := range objs { + obj.updateTrieConcurrent(s.db) + } + }(updateObjs[i : i+nObjects]) + i += nObjects + if i == len(updateObjs) { + break } - }(updateObjs[i : i+nObjects]) - i += nObjects - if i == len(updateObjs) { - break } + wg.Wait() } - wg.Wait() } - // ---------------------------------------------- DEBUGGING ------------------------------------------------------------------------------------------- - // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie // which has the same root, but also has some content loaded into it.