-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IntermediateRoot: add flag for threshold to update concurrently #453
base: master
Are you sure you want to change the base?
Changes from 3 commits
51fef19
08fd7c3
f1c23c9
bb5526e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -319,7 +319,7 @@ func (s *stateObject) finalise(prefetch bool) { | |
|
||
// updateTrie writes cached storage modifications into the object's storage trie. | ||
// It will return nil if the trie has not been loaded and no changes have been made | ||
func (s *stateObject) updateTrie(db Database) Trie { | ||
func (s *stateObject) updateTrie(db Database, concurrent bool) Trie { | ||
// Make sure all dirty slots are finalized into the pending storage area | ||
s.finalise(false) // Don't prefetch anymore, pull directly if need be | ||
if len(s.pendingStorage) == 0 { | ||
|
@@ -332,26 +332,33 @@ func (s *stateObject) updateTrie(db Database) Trie { | |
// The snapshot storage map for the object | ||
var storage map[common.Hash][]byte | ||
// Insert all the pending updates into the trie | ||
tr := s.getTrie(db) | ||
var tr Trie = nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: don't need to initialize to nil here, the default value is nil already |
||
if !concurrent { | ||
tr = s.getTrie(db) | ||
} | ||
hasher := s.db.hasher | ||
|
||
usedStorage := make([][]byte, 0, len(s.pendingStorage)) | ||
for key, value := range s.pendingStorage { | ||
// Skip noop changes, persist actual changes | ||
if value == s.originStorage[key] { | ||
continue | ||
} | ||
s.originStorage[key] = value | ||
|
||
if !concurrent { | ||
s.originStorage[key] = value | ||
} | ||
var v []byte | ||
if (value == common.Hash{}) { | ||
s.setError(tr.TryDelete(key[:])) | ||
s.db.StorageDeleted += 1 | ||
if !concurrent { | ||
s.setError(tr.TryDelete(key[:])) | ||
s.db.StorageDeleted += 1 | ||
} | ||
} else { | ||
// Encoding []byte cannot fail, ok to ignore the error. | ||
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) | ||
s.setError(tr.TryUpdate(key[:], v)) | ||
s.db.StorageUpdated += 1 | ||
if !concurrent { | ||
s.setError(tr.TryUpdate(key[:], v)) | ||
s.db.StorageUpdated += 1 | ||
} | ||
} | ||
// If state snapshotting is active, cache the data til commit | ||
if s.db.snap != nil { | ||
|
@@ -369,16 +376,46 @@ func (s *stateObject) updateTrie(db Database) Trie { | |
if s.db.prefetcher != nil { | ||
s.db.prefetcher.used(s.data.Root, usedStorage) | ||
} | ||
if !concurrent { | ||
if len(s.pendingStorage) > 0 { | ||
s.pendingStorage = make(Storage) | ||
} | ||
} | ||
return tr | ||
} | ||
|
||
func (s *stateObject) updateTrieConcurrent(db Database) { | ||
tr := s.getTrie(db) | ||
for key, value := range s.pendingStorage { | ||
// Skip noop changes, persist actual changes | ||
if value == s.originStorage[key] { | ||
continue | ||
} | ||
s.originStorage[key] = value | ||
|
||
var v []byte | ||
|
||
if (value == common.Hash{}) { | ||
s.setError(tr.TryDelete(key[:])) | ||
s.db.StorageDeleted += 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is shared resource, not safe to update concurrently. Moreover, this is updated in updateTrie already |
||
} else { | ||
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) | ||
s.setError(tr.TryUpdate(key[:], v)) | ||
s.db.StorageUpdated += 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above comment |
||
} | ||
} | ||
if len(s.pendingStorage) > 0 { | ||
s.pendingStorage = make(Storage) | ||
} | ||
return tr | ||
if tr != nil { | ||
s.data.Root = s.trie.Hash() | ||
} | ||
} | ||
|
||
// UpdateRoot sets the trie root to the current root hash of | ||
func (s *stateObject) updateRoot(db Database) { | ||
// If nothing changed, don't bother with hashing anything | ||
if s.updateTrie(db) == nil { | ||
if s.updateTrie(db, false) == nil { | ||
return | ||
} | ||
// Track the amount of time wasted on hashing the storage trie | ||
|
@@ -392,7 +429,7 @@ func (s *stateObject) updateRoot(db Database) { | |
// This updates the trie root. | ||
func (s *stateObject) CommitTrie(db Database) (int, error) { | ||
// If nothing changed, don't bother with hashing anything | ||
if s.updateTrie(db) == nil { | ||
if s.updateTrie(db, false) == nil { | ||
return 0, nil | ||
} | ||
if s.dbErr != nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,9 @@ import ( | |
"errors" | ||
"fmt" | ||
"math/big" | ||
"runtime" | ||
"sort" | ||
"sync" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
|
@@ -122,6 +124,12 @@ type StateDB struct { | |
StorageUpdated int | ||
AccountDeleted int | ||
StorageDeleted int | ||
|
||
// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable | ||
ConcurrentUpdateThreshold int | ||
|
||
// Lock for concurrent access | ||
lock sync.RWMutex | ||
} | ||
|
||
// New creates a new state from a given trie. | ||
|
@@ -358,7 +366,7 @@ func (s *StateDB) StorageTrie(addr common.Address) Trie { | |
return nil | ||
} | ||
cpy := stateObject.deepCopy(s) | ||
cpy.updateTrie(s.db) | ||
cpy.updateTrie(s.db, false) | ||
return cpy.getTrie(s.db) | ||
} | ||
|
||
|
@@ -855,11 +863,52 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { | |
// the account prefetcher. Instead, let's process all the storage updates | ||
// first, giving the account prefeches just a few more milliseconds of time | ||
// to pull useful data from disk. | ||
|
||
// ---------------------------------------------- DEBUGGING ------------------------------------------------------------------------------------------- | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please clean this up |
||
|
||
// Get the stateObjects needed to be updated | ||
updateObjs := []*stateObject{} | ||
for addr := range s.stateObjectsPending { | ||
if obj := s.stateObjects[addr]; !obj.deleted { | ||
obj.updateRoot(s.db) | ||
updateObjs = append(updateObjs, obj) | ||
obj.updateTrie(s.db, true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use |
||
} | ||
} | ||
min := func(a, b int) int { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the new way of calculating number of objects per routine, this function is only called once, so I think removing this function and using if for that one is better |
||
if a < b { | ||
return a | ||
} | ||
return b | ||
} | ||
nGoroutines := min(runtime.NumCPU(), len(updateObjs)) | ||
if nGoroutines != 0 { | ||
nObjectsPerRoutine := len(updateObjs) / nGoroutines | ||
nObjectsRemaining := len(updateObjs) % nGoroutines | ||
wg := sync.WaitGroup{} | ||
wg.Add(nGoroutines) | ||
i := 0 | ||
for { | ||
nObjects := nObjectsPerRoutine | ||
if nObjectsRemaining > 0 { | ||
nObjects++ | ||
nObjectsRemaining-- | ||
} | ||
go func(objs []*stateObject) { | ||
defer wg.Done() | ||
for _, obj := range objs { | ||
obj.updateTrieConcurrent(s.db) | ||
} | ||
}(updateObjs[i : i+nObjects]) | ||
i += nObjects | ||
if i == len(updateObjs) { | ||
break | ||
} | ||
} | ||
wg.Wait() | ||
} | ||
|
||
// ---------------------------------------------- DEBUGGING ------------------------------------------------------------------------------------------- | ||
|
||
// Now we're about to start to write changes to the trie. The trie is so far | ||
// _untouched_. We can check with the prefetcher, if it can give us a trie | ||
// which has the same root, but also has some content loaded into it. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think change this variable to skipTrieUpdate is more reasonable