From e2517b3cd7f710893ec0b20794e51a70af1ccd75 Mon Sep 17 00:00:00 2001 From: galaio <12880651+galaio@users.noreply.github.com> Date: Thu, 18 Jul 2024 09:38:40 +0800 Subject: [PATCH] TxDAG: support PEVM static dispatch; (#6) * dag: add merge execute path method; pevm: support dispatch with TxDAG; * dag: add merge execute path method; pevm: support dispatch with TxDAG; * dag: clean code; * statedb: fix some broken uts; * pevm: support disable slot steal; --------- Co-authored-by: galaio --- core/parallel_state_processor.go | 73 +++++++++++++++++++++---- core/state/statedb.go | 30 ++++++----- core/types/dag.go | 91 +++++++++++++++++++++++++++----- core/types/dag_test.go | 57 ++++++++++++++++++++ core/types/mvstates.go | 45 ++++++++-------- 5 files changed, 236 insertions(+), 60 deletions(-) diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 0968f74e02..e0338f28c9 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -46,6 +46,7 @@ type ParallelStateProcessor struct { inConfirmStage2 bool targetStage2Count int // when executed txNUM reach it, enter stage2 RT confirm nextStage2TxIndex int + disableStealTx bool } func NewParallelStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine, parallelNum int) *ParallelStateProcessor { @@ -174,6 +175,37 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) { p.nextStage2TxIndex = 0 } +// doStaticDispatchV2 could dispatch by TxDAG metadata +// txReqs must order by TxIndex +// txDAG must convert to dependency relation +// 1. The TxDAG generates parallel execution merge paths that will ignore cross slot tx dep; +// 2. It will dispatch the most hungry slot for every isolate execution path; +// 3. TODO(galaio) it need to schedule the slow dep tx path properly; +// 4. TODO(galaio) it is unfriendly for cross slot deps, maybe we can delay dispatch when tx cross in slots, it may increase PEVM parallelism; +func (p *ParallelStateProcessor) doStaticDispatchV2(txReqs []*ParallelTxRequest, txDAG types.TxDAG) { + // only support PlainTxDAG dispatch now. + if txDAG == nil || txDAG.Type() != types.PlainTxDAGType { + p.doStaticDispatch(txReqs) + return + } + + // resolve isolate execution paths from TxDAG, it indicates the tx dispatch + paths := types.MergeTxDAGExecutionPaths(txDAG) + log.Info("doStaticDispatchV2 merge parallel execution paths", "slots", len(p.slotState), "paths", len(paths)) + + for _, path := range paths { + slotIndex := p.mostHungrySlot() + for _, index := range path { + txReqs[index].staticSlotIndex = slotIndex // txReq is better to be executed in this slot + slot := p.slotState[slotIndex] + slot.pendingTxReqList = append(slot.pendingTxReqList, txReqs[index]) + } + } + + // it's unnecessary to enable slot steal mechanism, opt the steal mechanism later; + p.disableStealTx = true +} + // Benefits of StaticDispatch: // // ** try best to make Txs with same From() in same slot @@ -197,14 +229,7 @@ func (p *ParallelStateProcessor) doStaticDispatch(txReqs []*ParallelTxRequest) { // not found, dispatch to most hungry slot if slotIndex == -1 { - var workload = len(p.slotState[0].pendingTxReqList) - slotIndex = 0 - for i, slot := range p.slotState { // can start from index 1 - if len(slot.pendingTxReqList) < workload { - slotIndex = i - workload = len(slot.pendingTxReqList) - } - } + slotIndex = p.mostHungrySlot() } // update fromSlotMap[txReq.msg.From] = slotIndex @@ -218,6 +243,24 @@ func (p *ParallelStateProcessor) doStaticDispatch(txReqs []*ParallelTxRequest) { } } +func (p *ParallelStateProcessor) mostHungrySlot() int { + var ( + workload = len(p.slotState[0].pendingTxReqList) + slotIndex = 0 + ) + for i, slot := range p.slotState { // can start from index 1 + if len(slot.pendingTxReqList) < workload { + slotIndex = i + workload = len(slot.pendingTxReqList) + } + // just return the first slot with 0 workload + if workload == 0 { + return slotIndex + } + } + return slotIndex +} + // do conflict detect func (p *ParallelStateProcessor) hasConflict(txResult *ParallelTxResult, isStage2 bool) bool { slotDB := txResult.slotDB @@ -465,7 +508,7 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) { // fmt.Printf("Dav -- runInLoop, - loopbody tail - TxREQ: %d\n", txReq.txIndex) } // switched to the other slot. - if interrupted { + if interrupted || p.disableStealTx { continue } @@ -688,8 +731,18 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat p.targetStage2Count = p.targetStage2Count - stage2AheadNum } + var ( + txDAG types.TxDAG + err error + ) + if len(block.TxDAG()) != 0 { + txDAG, err = types.DecodeTxDAG(block.TxDAG()) + if err != nil { + return nil, nil, 0, err + } + } // From now on, entering parallel execution. - p.doStaticDispatch(p.allTxReqs) // todo: put txReqs in unit? + p.doStaticDispatchV2(p.allTxReqs, txDAG) // todo: put txReqs in unit? // after static dispatch, we notify the slot to work. for _, slot := range p.slotState { diff --git a/core/state/statedb.go b/core/state/statedb.go index a87cb5f67c..b11bf723c9 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1031,6 +1031,15 @@ func (s *StateDB) CreateAccount(addr common.Address) { newObj.setBalance(new(uint256.Int).Set(preBalance)) // new big.Int for newObj } +// CopyWithMvStates will copy state with MVStates +func (s *StateDB) CopyWithMvStates(doPrefetch bool) *StateDB { + state := s.copyInternal(doPrefetch) + if s.mvStates != nil { + state.mvStates = s.mvStates + } + return state +} + // Copy creates a deep, independent copy of the state. // Snapshots of the copied state cannot be applied to the copy. func (s *StateDB) Copy() *StateDB { @@ -1150,11 +1159,6 @@ func (s *StateDB) copyInternal(doPrefetch bool) *StateDB { state.prefetcher = s.prefetcher.copy() } - // parallel EVM related - if s.mvStates != nil { - state.mvStates = s.mvStates - } - return state } @@ -2371,9 +2375,15 @@ func (s *StateDB) FinaliseRWSet() error { if s.mvStates == nil || s.rwSet == nil { return nil } + ver := types.StateVersion{ + TxIndex: s.txIndex, + } + if ver != s.rwSet.Version() { + return errors.New("you finalize a wrong ver of RWSet") + } + // finalise stateObjectsDestruct - for addr, acc := range s.stateObjectsDestructDirty { - s.stateObjectsDestruct[addr] = acc + for addr := range s.stateObjectsDestructDirty { s.RecordWrite(types.AccountStateKey(addr, types.AccountSuicide), struct{}{}) } for addr := range s.journal.dirties { @@ -2394,12 +2404,6 @@ func (s *StateDB) FinaliseRWSet() error { obj.finaliseRWSet() } } - ver := types.StateVersion{ - TxIndex: s.txIndex, - } - if ver != s.rwSet.Version() { - return errors.New("you finalize a wrong ver of RWSet") - } return s.mvStates.FulfillRWSet(s.rwSet, s.es) } diff --git a/core/types/dag.go b/core/types/dag.go index a4d111458b..b407c2bc77 100644 --- a/core/types/dag.go +++ b/core/types/dag.go @@ -28,6 +28,7 @@ type TxDAG interface { DelayGasDistribution() bool // TxDep query TxDeps from TxDAG + // TODO(galaio): txDAG must convert to dependency relation TxDep(int) TxDep // TxCount return tx count @@ -136,7 +137,7 @@ func NewPlainTxDAG(txLen int) *PlainTxDAG { func (d *PlainTxDAG) String() string { builder := strings.Builder{} - exePaths := travelExecutionPaths(d) + exePaths := travelTxDAGExecutionPaths(d) for _, path := range exePaths { builder.WriteString(fmt.Sprintf("%v\n", path)) } @@ -151,15 +152,67 @@ func (d *PlainTxDAG) Size() int { return len(enc) } -func travelExecutionPaths(d TxDAG) [][]uint64 { +// MergeTxDAGExecutionPaths will merge duplicate tx path for scheduling parallel. +// Any tx cannot exist in >= 2 paths. +func MergeTxDAGExecutionPaths(d TxDAG) [][]uint64 { + mergeMap := make(map[uint64][]uint64, d.TxCount()) + txMap := make(map[uint64]uint64, d.TxCount()) + for i := d.TxCount() - 1; i >= 0; i-- { + index, merge := uint64(i), uint64(i) + deps := d.TxDep(i).TxIndexes + if oldIdx, exist := findTxPathIndex(deps, index, txMap); exist { + merge = oldIdx + } + for _, tx := range deps { + txMap[tx] = merge + } + txMap[index] = merge + } + + // result by index order + for f, t := range txMap { + if mergeMap[t] == nil { + mergeMap[t] = make([]uint64, 0) + } + mergeMap[t] = append(mergeMap[t], f) + } + mergePaths := make([][]uint64, 0, len(mergeMap)) + for i := 0; i < d.TxCount(); i++ { + path, ok := mergeMap[uint64(i)] + if !ok { + continue + } + slices.Sort(path) + mergePaths = append(mergePaths, path) + } + + return mergePaths +} + +func findTxPathIndex(path []uint64, cur uint64, txMap map[uint64]uint64) (uint64, bool) { + if old, ok := txMap[cur]; ok { + return old, true + } + + for _, index := range path { + if old, ok := txMap[index]; ok { + return old, true + } + } + + return 0, false +} + +// travelTxDAGExecutionPaths will print all tx execution path +func travelTxDAGExecutionPaths(d TxDAG) [][]uint64 { txCount := d.TxCount() deps := make([]TxDep, txCount) for i := 0; i < txCount; i++ { dep := d.TxDep(i) if dep.Relation == 0 { deps[i] = dep + continue } - // recover to relation 0 for j := 0; j < i; j++ { if !dep.Exist(j) { @@ -171,7 +224,7 @@ func travelExecutionPaths(d TxDAG) [][]uint64 { exePaths := make([][]uint64, 0) // travel tx deps with BFS for i := uint64(0); i < uint64(txCount); i++ { - exePaths = append(exePaths, travelTargetPath(deps, i)) + exePaths = append(exePaths, travelTxDAGTargetPath(deps, i)) } return exePaths } @@ -199,6 +252,17 @@ func (d *TxDep) Exist(i int) bool { return false } +func (d *TxDep) Count() int { + return len(d.TxIndexes) +} + +func (d *TxDep) Last() int { + if d.Count() == 0 { + return -1 + } + return int(d.TxIndexes[len(d.TxIndexes)-1]) +} + var ( longestTimeTimer = metrics.NewRegisteredTimer("dag/longesttime", nil) longestGasTimer = metrics.NewRegisteredTimer("dag/longestgas", nil) @@ -225,7 +289,7 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) string { // sb.WriteString(fmt.Sprintf("%v: %v\n", i, dep.TxIndexes)) //} //sb.WriteString("Parallel Execution Path:\n") - paths := travelExecutionPaths(dag) + paths := travelTxDAGExecutionPaths(dag) // Attention: this is based on best schedule, it will reduce a lot by executing previous txs in parallel // It assumes that there is no parallel thread limit txCount := dag.TxCount() @@ -327,23 +391,24 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) string { return sb.String() } -func travelTargetPath(deps []TxDep, from uint64) []uint64 { - q := make([]uint64, 0, len(deps)) +// travelTxDAGTargetPath will print target execution path +func travelTxDAGTargetPath(deps []TxDep, from uint64) []uint64 { + queue := make([]uint64, 0, len(deps)) path := make([]uint64, 0, len(deps)) - q = append(q, from) + queue = append(queue, from) path = append(path, from) - for len(q) > 0 { - t := make([]uint64, 0, len(deps)) - for _, i := range q { + for len(queue) > 0 { + next := make([]uint64, 0, len(deps)) + for _, i := range queue { for _, dep := range deps[i].TxIndexes { if !slices.Contains(path, dep) { path = append(path, dep) - t = append(t, dep) + next = append(next, dep) } } } - q = t + queue = next } slices.Sort(path) return path diff --git a/core/types/dag_test.go b/core/types/dag_test.go index e86fff110c..bf12324246 100644 --- a/core/types/dag_test.go +++ b/core/types/dag_test.go @@ -1,6 +1,7 @@ package types import ( + "github.com/cometbft/cometbft/libs/rand" "testing" "time" @@ -172,6 +173,36 @@ func TestIsEqualRWVal(t *testing.T) { } } +func TestMergeTxDAGExecutionPaths_Simple(t *testing.T) { + paths := MergeTxDAGExecutionPaths(mockSimpleDAG()) + require.Equal(t, [][]uint64{ + {0, 3, 4}, + {1, 2, 5, 6, 7}, + {8, 9}, + }, paths) +} + +func TestMergeTxDAGExecutionPaths_Random(t *testing.T) { + dag := mockRandomDAG(10000) + paths := MergeTxDAGExecutionPaths(dag) + txMap := make(map[uint64]uint64, dag.TxCount()) + for _, path := range paths { + for _, index := range path { + old, ok := txMap[index] + require.False(t, ok, index, path, old) + txMap[index] = path[0] + } + } + require.Equal(t, dag.TxCount(), len(txMap)) +} + +func BenchmarkMergeTxDAGExecutionPaths(b *testing.B) { + dag := mockRandomDAG(100000) + for i := 0; i < b.N; i++ { + MergeTxDAGExecutionPaths(dag) + } +} + func mockSimpleDAG() TxDAG { dag := NewPlainTxDAG(10) dag.TxDeps[0].TxIndexes = []uint64{} @@ -187,6 +218,32 @@ func mockSimpleDAG() TxDAG { return dag } +func mockRandomDAG(txLen int) TxDAG { + dag := NewPlainTxDAG(txLen) + for i := 0; i < txLen; i++ { + var deps []uint64 + if i == 0 || rand.Bool() { + dag.TxDeps[i].TxIndexes = deps + continue + } + depCnt := rand.Int()%i + 1 + for j := 0; j < depCnt; j++ { + var dep uint64 + if j > 0 && deps[j-1]+1 == uint64(i) { + break + } + if j > 0 { + dep = uint64(rand.Int())%(uint64(i)-deps[j-1]-1) + deps[j-1] + 1 + } else { + dep = uint64(rand.Int() % i) + } + deps = append(deps, dep) + } + dag.TxDeps[i].TxIndexes = deps + } + return dag +} + func mockSystemTxDAG() TxDAG { dag := NewPlainTxDAG(12) dag.TxDeps[0].TxIndexes = []uint64{} diff --git a/core/types/mvstates.go b/core/types/mvstates.go index d5b9ef8b39..4db7586727 100644 --- a/core/types/mvstates.go +++ b/core/types/mvstates.go @@ -355,6 +355,22 @@ func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error { s.stats[index] = stat } + s.resolveDepsCache(index, rwSet) + // append to pending write set + for k, v := range rwSet.writeSet { + // TODO(galaio): this action is only for testing, it can be removed in production mode. + // ignore no changed write record + checkRWSetInconsistent(index, k, rwSet.readSet, rwSet.writeSet) + if _, exist := s.pendingWriteSet[k]; !exist { + s.pendingWriteSet[k] = NewPendingWrites() + } + s.pendingWriteSet[k].Append(NewPendingWrite(rwSet.ver, v)) + } + s.rwSets[index] = rwSet + return nil +} + +func (s *MVStates) resolveDepsCache(index int, rwSet *RWSet) { // analysis dep, if the previous transaction is not executed/validated, re-analysis is required if _, ok := s.depsCache[index]; !ok { s.depsCache[index] = NewTxDeps(0) @@ -365,6 +381,8 @@ func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error { if _, ok := s.rwSets[prev]; !ok { continue } + // TODO: check if there are RW with system address for gas delay calculation + // check if there has written op before i if checkDependency(s.rwSets[prev].writeSet, rwSet.readSet) { s.depsCache[index].add(prev) // clear redundancy deps compared with prev @@ -375,19 +393,6 @@ func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error { } } } - - // append to pending write set - for k, v := range rwSet.writeSet { - // TODO(galaio): this action is only for testing, it can be removed in production mode. - // ignore no changed write record - checkRWSetInconsistent(index, k, rwSet.readSet, rwSet.writeSet) - if _, exist := s.pendingWriteSet[k]; !exist { - s.pendingWriteSet[k] = NewPendingWrites() - } - s.pendingWriteSet[k].Append(NewPendingWrite(rwSet.ver, v)) - } - s.rwSets[index] = rwSet - return nil } func checkRWSetInconsistent(index int, k RWKey, readSet map[RWKey]*ReadRecord, writeSet map[RWKey]*WriteRecord) bool { @@ -423,18 +428,10 @@ func (s *MVStates) ResolveTxDAG() TxDAG { txDAG.TxDeps[i].Relation = 1 continue } - if s.depsCache[i] != nil { - txDAG.TxDeps[i].TxIndexes = s.depsCache[i].toArray() - continue - } - readSet := rwSets[i].ReadSet() - // TODO: check if there are RW with system address - // check if there has written op before i - for j := 0; j < i; j++ { - if checkDependency(rwSets[j].writeSet, readSet) { - txDAG.TxDeps[i].AppendDep(j) - } + if s.depsCache[i] == nil { + s.resolveDepsCache(i, rwSets[i]) } + txDAG.TxDeps[i].TxIndexes = s.depsCache[i].toArray() } return txDAG