diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index a37eea05a4..85e9548539 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -2036,6 +2036,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.Bool(MonitorFinalityVoteFlag.Name) { cfg.EnableMonitorFinalityVote = true } + // Set concurrent update threshold + if ctx.IsSet(ConcurrentUpdateThresholdFlag.Name) { + cfg.ConcurrentUpdateThreshold = ctx.Int(ConcurrentUpdateThresholdFlag.Name) + } } // SetDNSDiscoveryDefaults configures DNS discovery with the given URL if diff --git a/core/state/state_object.go b/core/state/state_object.go index 138fcbdecd..dc1de32c23 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -319,11 +319,17 @@ func (s *stateObject) finalise(prefetch bool) { // updateTrie writes cached storage modifications into the object's storage trie. // It will return nil if the trie has not been loaded and no changes have been made -func (s *stateObject) updateTrie(db Database) Trie { +func (s *stateObject) updateTrie(db Database, concurrent bool) (Trie, []struct { + s *stateObject + key common.Hash + value common.Hash + v []byte + tr Trie +}) { // Make sure all dirty slots are finalized into the pending storage area s.finalise(false) // Don't prefetch anymore, pull directly if need be if len(s.pendingStorage) == 0 { - return s.trie + return s.trie, nil } // Track the amount of time wasted on updating the storage trie if metrics.EnabledExpensive { @@ -334,6 +340,14 @@ func (s *stateObject) updateTrie(db Database) Trie { // Insert all the pending updates into the trie tr := s.getTrie(db) hasher := s.db.hasher + // Storage saved for later update + var savedStorage []struct { + s *stateObject + key common.Hash + value common.Hash + v []byte + tr Trie + } usedStorage := make([][]byte, 0, len(s.pendingStorage)) for key, value := range s.pendingStorage { @@ -344,15 +358,33 @@ func (s *stateObject) updateTrie(db Database) Trie { s.originStorage[key] = value var v []byte - if (value == common.Hash{}) { - s.setError(tr.TryDelete(key[:])) - s.db.StorageDeleted += 1 + + if concurrent { + if (value == common.Hash{}) { + s.db.StorageDeleted += 1 + } else { + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + s.db.StorageUpdated += 1 + } + savedStorage = append(savedStorage, struct { + s *stateObject + key common.Hash + value common.Hash + v []byte + tr Trie + }{s, key, value, v, tr}) } else { - // Encoding []byte cannot fail, ok to ignore the error. - v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) - s.setError(tr.TryUpdate(key[:], v)) - s.db.StorageUpdated += 1 + if (value == common.Hash{}) { + s.setError(tr.TryDelete(key[:])) + s.db.StorageDeleted += 1 + } else { + // Encoding []byte cannot fail, ok to ignore the error. + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + s.setError(tr.TryUpdate(key[:], v)) + s.db.StorageUpdated += 1 + } } + // If state snapshotting is active, cache the data til commit if s.db.snap != nil { if storage == nil { @@ -372,13 +404,13 @@ func (s *stateObject) updateTrie(db Database) Trie { if len(s.pendingStorage) > 0 { s.pendingStorage = make(Storage) } - return tr + return tr, savedStorage } // UpdateRoot sets the trie root to the current root hash of func (s *stateObject) updateRoot(db Database) { // If nothing changed, don't bother with hashing anything - if s.updateTrie(db) == nil { + if trie, _ := s.updateTrie(db, false); trie == nil { return } // Track the amount of time wasted on hashing the storage trie @@ -392,7 +424,7 @@ func (s *stateObject) updateRoot(db Database) { // This updates the trie root. func (s *stateObject) CommitTrie(db Database) (int, error) { // If nothing changed, don't bother with hashing anything - if s.updateTrie(db) == nil { + if trie, _ := s.updateTrie(db, false); trie == nil { return 0, nil } if s.dbErr != nil { diff --git a/core/state/statedb.go b/core/state/statedb.go index 227a7a2d3c..61c728cbec 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -363,7 +363,7 @@ func (s *StateDB) StorageTrie(addr common.Address) Trie { return nil } cpy := stateObject.deepCopy(s) - cpy.updateTrie(s.db) + cpy.updateTrie(s.db, false) return cpy.getTrie(s.db) } @@ -875,30 +875,101 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { obj.updateRoot(s.db) } } else { - // Declare min function + var pendingStorage []struct { + s *stateObject + key common.Hash + value common.Hash + v []byte + tr Trie + } + var pendingRootUpdate []*stateObject + + for addr := range s.stateObjectsPending { + if obj := s.stateObjects[addr]; !obj.deleted { + trie, savedStorage := obj.updateTrie(s.db, true) + pendingStorage = append(pendingStorage, savedStorage...) + if trie != nil { + pendingRootUpdate = append(pendingRootUpdate, obj) + } + } + } + + // Update the account trie min := func(a, b int) int { if a < b { return a } return b } - // Update the state objects using goroutines, with maximum of NumCPU goroutines - nRoutines := min(runtime.NumCPU(), len(updateObjs)) + nRoutines := min(runtime.NumCPU(), len(pendingStorage)) if nRoutines != 0 { - nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines + nUpdatePerRoutine := (len(pendingStorage) + nRoutines - 1) / nRoutines wg := sync.WaitGroup{} wg.Add(nRoutines) - for i := 0; i < len(updateObjs); i += nObjPerRoutine { - go func(objs []*stateObject) { + for i := 0; i < len(pendingStorage); i += nUpdatePerRoutine { + go func(stores []struct { + s *stateObject + key common.Hash + value common.Hash + v []byte + tr Trie + }) { defer wg.Done() - for _, obj := range objs { - obj.updateRoot(s.db) + for _, store := range stores { + if (store.value == common.Hash{}) { + s.setError(store.tr.TryDelete(store.key[:])) + } else { + // Encoding []byte cannot fail, ok to ignore the error. + store.v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(store.value[:])) + s.setError(store.tr.TryUpdate(store.key[:], store.v)) + } } - }(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))]) + }(pendingStorage[i:min(i+nUpdatePerRoutine, len(pendingStorage))]) } wg.Wait() } + for _, obj := range pendingRootUpdate { + obj.data.Root = obj.trie.Hash() + } } + // // Get the stateObjects needed to be updated + // updateObjs := []*stateObject{} + // for addr := range s.stateObjectsPending { + // if obj := s.stateObjects[addr]; !obj.deleted { + // updateObjs = append(updateObjs, obj) + // } + // } + + // if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 { + // // Update the state objects sequentially + // for _, obj := range updateObjs { + // obj.updateRoot(s.db) + // } + // } else { + // // Declare min function + // min := func(a, b int) int { + // if a < b { + // return a + // } + // return b + // } + // // Update the state objects using goroutines, with maximum of NumCPU goroutines + // nRoutines := min(runtime.NumCPU(), len(updateObjs)) + // if nRoutines != 0 { + // nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines + // wg := sync.WaitGroup{} + // wg.Add(nRoutines) + // for i := 0; i < len(updateObjs); i += nObjPerRoutine { + // go func(objs []*stateObject) { + // defer wg.Done() + // for _, obj := range objs { + // obj.updateRoot(s.db) + // } + // }(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))]) + // } + // wg.Wait() + // } + // } // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 69afe80e08..d8adc91775 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -946,48 +946,11 @@ func TestIntermediateUpdateConcurrently(t *testing.T) { state1.ConcurrentUpdateThreshold = 0 state2.ConcurrentUpdateThreshold = 1 - state1.IntermediateRoot(false) // sequential - state2.IntermediateRoot(false) // concurrent + root1 := state1.IntermediateRoot(false) // sequential + root2 := state2.IntermediateRoot(false) // concurrent - root1, err1 := state1.Commit(false) - root2, err2 := state2.Commit(false) - - if err1 != nil { - t.Fatalf("sequential commit failed: %v", err1) - } - if err1 = state1.Database().TrieDB().Commit(root1, false, nil); err1 != nil { - t.Errorf("cannot commit trie %v to persistent database", root1.Hex()) - } - if err2 != nil { - t.Fatalf("concurrent commit failed: %v", err2) - } - if err2 = state2.Database().TrieDB().Commit(root2, false, nil); err2 != nil { - t.Errorf("cannot commit trie %v to persistent database", root2.Hex()) + if root1 != root2 { + t.Fatalf("intermediate roots mismatch: %v != %v", root1.Hex(), root2.Hex()) } - it1 := db1.NewIterator(nil, nil) - it2 := db2.NewIterator(nil, nil) - for it1.Next() { - if !it2.Next() { - t.Fatalf("concurrent iterator ended prematurely") - } - if !bytes.Equal(it1.Key(), it2.Key()) { - t.Fatalf("concurrent iterator key mismatch: " + string(it1.Key()) + " != " + string(it2.Key())) - } - if !bytes.Equal(it1.Value(), it2.Value()) { - t.Fatalf("concurrent iterator value mismatch: " + string(it1.Value()) + " != " + string(it2.Value())) - } - } - if it1.Error() != nil { - t.Fatalf("sequential iterator error: %v", it1.Error()) - } - if it2.Error() != nil { - t.Fatalf("concurrent iterator error: %v", it2.Error()) - } - if it1.Next() { - t.Fatalf("sequential iterator has extra data") - } - if it2.Next() { - t.Fatalf("concurrent iterator has extra data") - } }