Skip to content

Commit

Permalink
TxDAG: support PEVM static dispatch; (#6)
Browse files Browse the repository at this point in the history
* dag: add merge execute path method;
pevm: support dispatch with TxDAG;

* dag: add merge execute path method;
pevm: support dispatch with TxDAG;

* dag: clean code;

* statedb: fix some broken uts;

* pevm: support disable slot steal;

---------

Co-authored-by: galaio <galaio@users.noreply.github.com>
  • Loading branch information
2 people authored and sunny2022da committed Sep 25, 2024
1 parent 0f9be14 commit e2517b3
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 60 deletions.
73 changes: 63 additions & 10 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ParallelStateProcessor struct {
inConfirmStage2 bool
targetStage2Count int // when executed txNUM reach it, enter stage2 RT confirm
nextStage2TxIndex int
disableStealTx bool
}

func NewParallelStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine, parallelNum int) *ParallelStateProcessor {
Expand Down Expand Up @@ -174,6 +175,37 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) {
p.nextStage2TxIndex = 0
}

// doStaticDispatchV2 could dispatch by TxDAG metadata
// txReqs must order by TxIndex
// txDAG must convert to dependency relation
// 1. The TxDAG generates parallel execution merge paths that will ignore cross slot tx dep;
// 2. It will dispatch the most hungry slot for every isolate execution path;
// 3. TODO(galaio) it need to schedule the slow dep tx path properly;
// 4. TODO(galaio) it is unfriendly for cross slot deps, maybe we can delay dispatch when tx cross in slots, it may increase PEVM parallelism;
func (p *ParallelStateProcessor) doStaticDispatchV2(txReqs []*ParallelTxRequest, txDAG types.TxDAG) {
// only support PlainTxDAG dispatch now.
if txDAG == nil || txDAG.Type() != types.PlainTxDAGType {
p.doStaticDispatch(txReqs)
return
}

// resolve isolate execution paths from TxDAG, it indicates the tx dispatch
paths := types.MergeTxDAGExecutionPaths(txDAG)
log.Info("doStaticDispatchV2 merge parallel execution paths", "slots", len(p.slotState), "paths", len(paths))

for _, path := range paths {
slotIndex := p.mostHungrySlot()
for _, index := range path {
txReqs[index].staticSlotIndex = slotIndex // txReq is better to be executed in this slot
slot := p.slotState[slotIndex]
slot.pendingTxReqList = append(slot.pendingTxReqList, txReqs[index])
}
}

// it's unnecessary to enable slot steal mechanism, opt the steal mechanism later;
p.disableStealTx = true
}

// Benefits of StaticDispatch:
//
// ** try best to make Txs with same From() in same slot
Expand All @@ -197,14 +229,7 @@ func (p *ParallelStateProcessor) doStaticDispatch(txReqs []*ParallelTxRequest) {

// not found, dispatch to most hungry slot
if slotIndex == -1 {
var workload = len(p.slotState[0].pendingTxReqList)
slotIndex = 0
for i, slot := range p.slotState { // can start from index 1
if len(slot.pendingTxReqList) < workload {
slotIndex = i
workload = len(slot.pendingTxReqList)
}
}
slotIndex = p.mostHungrySlot()
}
// update
fromSlotMap[txReq.msg.From] = slotIndex
Expand All @@ -218,6 +243,24 @@ func (p *ParallelStateProcessor) doStaticDispatch(txReqs []*ParallelTxRequest) {
}
}

func (p *ParallelStateProcessor) mostHungrySlot() int {
var (
workload = len(p.slotState[0].pendingTxReqList)
slotIndex = 0
)
for i, slot := range p.slotState { // can start from index 1
if len(slot.pendingTxReqList) < workload {
slotIndex = i
workload = len(slot.pendingTxReqList)
}
// just return the first slot with 0 workload
if workload == 0 {
return slotIndex
}
}
return slotIndex
}

// do conflict detect
func (p *ParallelStateProcessor) hasConflict(txResult *ParallelTxResult, isStage2 bool) bool {
slotDB := txResult.slotDB
Expand Down Expand Up @@ -465,7 +508,7 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
// fmt.Printf("Dav -- runInLoop, - loopbody tail - TxREQ: %d\n", txReq.txIndex)
}
// switched to the other slot.
if interrupted {
if interrupted || p.disableStealTx {
continue
}

Expand Down Expand Up @@ -688,8 +731,18 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
p.targetStage2Count = p.targetStage2Count - stage2AheadNum
}

var (
txDAG types.TxDAG
err error
)
if len(block.TxDAG()) != 0 {
txDAG, err = types.DecodeTxDAG(block.TxDAG())
if err != nil {
return nil, nil, 0, err
}
}
// From now on, entering parallel execution.
p.doStaticDispatch(p.allTxReqs) // todo: put txReqs in unit?
p.doStaticDispatchV2(p.allTxReqs, txDAG) // todo: put txReqs in unit?

// after static dispatch, we notify the slot to work.
for _, slot := range p.slotState {
Expand Down
30 changes: 17 additions & 13 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,15 @@ func (s *StateDB) CreateAccount(addr common.Address) {
newObj.setBalance(new(uint256.Int).Set(preBalance)) // new big.Int for newObj
}

// CopyWithMvStates will copy state with MVStates
func (s *StateDB) CopyWithMvStates(doPrefetch bool) *StateDB {
state := s.copyInternal(doPrefetch)
if s.mvStates != nil {
state.mvStates = s.mvStates
}
return state
}

// Copy creates a deep, independent copy of the state.
// Snapshots of the copied state cannot be applied to the copy.
func (s *StateDB) Copy() *StateDB {
Expand Down Expand Up @@ -1150,11 +1159,6 @@ func (s *StateDB) copyInternal(doPrefetch bool) *StateDB {
state.prefetcher = s.prefetcher.copy()
}

// parallel EVM related
if s.mvStates != nil {
state.mvStates = s.mvStates
}

return state
}

Expand Down Expand Up @@ -2371,9 +2375,15 @@ func (s *StateDB) FinaliseRWSet() error {
if s.mvStates == nil || s.rwSet == nil {
return nil
}
ver := types.StateVersion{
TxIndex: s.txIndex,
}
if ver != s.rwSet.Version() {
return errors.New("you finalize a wrong ver of RWSet")
}

// finalise stateObjectsDestruct
for addr, acc := range s.stateObjectsDestructDirty {
s.stateObjectsDestruct[addr] = acc
for addr := range s.stateObjectsDestructDirty {
s.RecordWrite(types.AccountStateKey(addr, types.AccountSuicide), struct{}{})
}
for addr := range s.journal.dirties {
Expand All @@ -2394,12 +2404,6 @@ func (s *StateDB) FinaliseRWSet() error {
obj.finaliseRWSet()
}
}
ver := types.StateVersion{
TxIndex: s.txIndex,
}
if ver != s.rwSet.Version() {
return errors.New("you finalize a wrong ver of RWSet")
}

return s.mvStates.FulfillRWSet(s.rwSet, s.es)
}
Expand Down
91 changes: 78 additions & 13 deletions core/types/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type TxDAG interface {
DelayGasDistribution() bool

// TxDep query TxDeps from TxDAG
// TODO(galaio): txDAG must convert to dependency relation
TxDep(int) TxDep

// TxCount return tx count
Expand Down Expand Up @@ -136,7 +137,7 @@ func NewPlainTxDAG(txLen int) *PlainTxDAG {

func (d *PlainTxDAG) String() string {
builder := strings.Builder{}
exePaths := travelExecutionPaths(d)
exePaths := travelTxDAGExecutionPaths(d)
for _, path := range exePaths {
builder.WriteString(fmt.Sprintf("%v\n", path))
}
Expand All @@ -151,15 +152,67 @@ func (d *PlainTxDAG) Size() int {
return len(enc)
}

func travelExecutionPaths(d TxDAG) [][]uint64 {
// MergeTxDAGExecutionPaths will merge duplicate tx path for scheduling parallel.
// Any tx cannot exist in >= 2 paths.
func MergeTxDAGExecutionPaths(d TxDAG) [][]uint64 {
mergeMap := make(map[uint64][]uint64, d.TxCount())
txMap := make(map[uint64]uint64, d.TxCount())
for i := d.TxCount() - 1; i >= 0; i-- {
index, merge := uint64(i), uint64(i)
deps := d.TxDep(i).TxIndexes
if oldIdx, exist := findTxPathIndex(deps, index, txMap); exist {
merge = oldIdx
}
for _, tx := range deps {
txMap[tx] = merge
}
txMap[index] = merge
}

// result by index order
for f, t := range txMap {
if mergeMap[t] == nil {
mergeMap[t] = make([]uint64, 0)
}
mergeMap[t] = append(mergeMap[t], f)
}
mergePaths := make([][]uint64, 0, len(mergeMap))
for i := 0; i < d.TxCount(); i++ {
path, ok := mergeMap[uint64(i)]
if !ok {
continue
}
slices.Sort(path)
mergePaths = append(mergePaths, path)
}

return mergePaths
}

func findTxPathIndex(path []uint64, cur uint64, txMap map[uint64]uint64) (uint64, bool) {
if old, ok := txMap[cur]; ok {
return old, true
}

for _, index := range path {
if old, ok := txMap[index]; ok {
return old, true
}
}

return 0, false
}

// travelTxDAGExecutionPaths will print all tx execution path
func travelTxDAGExecutionPaths(d TxDAG) [][]uint64 {
txCount := d.TxCount()
deps := make([]TxDep, txCount)
for i := 0; i < txCount; i++ {
dep := d.TxDep(i)
if dep.Relation == 0 {
deps[i] = dep
continue
}

// recover to relation 0
for j := 0; j < i; j++ {
if !dep.Exist(j) {
Expand All @@ -171,7 +224,7 @@ func travelExecutionPaths(d TxDAG) [][]uint64 {
exePaths := make([][]uint64, 0)
// travel tx deps with BFS
for i := uint64(0); i < uint64(txCount); i++ {
exePaths = append(exePaths, travelTargetPath(deps, i))
exePaths = append(exePaths, travelTxDAGTargetPath(deps, i))
}
return exePaths
}
Expand Down Expand Up @@ -199,6 +252,17 @@ func (d *TxDep) Exist(i int) bool {
return false
}

func (d *TxDep) Count() int {
return len(d.TxIndexes)
}

func (d *TxDep) Last() int {
if d.Count() == 0 {
return -1
}
return int(d.TxIndexes[len(d.TxIndexes)-1])
}

var (
longestTimeTimer = metrics.NewRegisteredTimer("dag/longesttime", nil)
longestGasTimer = metrics.NewRegisteredTimer("dag/longestgas", nil)
Expand All @@ -225,7 +289,7 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) string {
// sb.WriteString(fmt.Sprintf("%v: %v\n", i, dep.TxIndexes))
//}
//sb.WriteString("Parallel Execution Path:\n")
paths := travelExecutionPaths(dag)
paths := travelTxDAGExecutionPaths(dag)
// Attention: this is based on best schedule, it will reduce a lot by executing previous txs in parallel
// It assumes that there is no parallel thread limit
txCount := dag.TxCount()
Expand Down Expand Up @@ -327,23 +391,24 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) string {
return sb.String()
}

func travelTargetPath(deps []TxDep, from uint64) []uint64 {
q := make([]uint64, 0, len(deps))
// travelTxDAGTargetPath will print target execution path
func travelTxDAGTargetPath(deps []TxDep, from uint64) []uint64 {
queue := make([]uint64, 0, len(deps))
path := make([]uint64, 0, len(deps))

q = append(q, from)
queue = append(queue, from)
path = append(path, from)
for len(q) > 0 {
t := make([]uint64, 0, len(deps))
for _, i := range q {
for len(queue) > 0 {
next := make([]uint64, 0, len(deps))
for _, i := range queue {
for _, dep := range deps[i].TxIndexes {
if !slices.Contains(path, dep) {
path = append(path, dep)
t = append(t, dep)
next = append(next, dep)
}
}
}
q = t
queue = next
}
slices.Sort(path)
return path
Expand Down
Loading

0 comments on commit e2517b3

Please sign in to comment.