Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pevm: unordered merge mode flag #202

Merged
merged 12 commits into from
Oct 22, 2024
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ var (
utils.RollupSuperchainUpgradesFlag,
utils.ParallelTxLegacyFlag,
utils.ParallelTxFlag,
utils.ParallelTxUnorderedMergeFlag,
utils.ParallelTxNumFlag,
utils.ParallelTxDAGFlag,
utils.ParallelTxDAGFileFlag,
Expand Down
13 changes: 13 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,12 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Category: flags.VMCategory,
}

ParallelTxUnorderedMergeFlag = &cli.BoolFlag{
Name: "parallel.unordered-merge",
Usage: "Enable unordered merge mode, during the parallel confirm phase, merge transaction execution results without following the transaction order.",
Category: flags.VMCategory,
}

ParallelTxNumFlag = &cli.IntFlag{
Name: "parallel.num",
Usage: "Number of slot for transaction execution, only valid in parallel mode (runtime calculated, no fixed default value)",
Expand Down Expand Up @@ -2049,6 +2055,13 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.ParallelTxMode = ctx.Bool(ParallelTxFlag.Name)
}

if ctx.IsSet(ParallelTxUnorderedMergeFlag.Name) {
cfg.ParallelTxUnorderedMerge = ctx.Bool(ParallelTxUnorderedMergeFlag.Name)
if ctx.IsSet(ParallelTxLegacyFlag.Name) && ctx.Bool(ParallelTxLegacyFlag.Name) {
log.Warn("ParallelTxUnorderedMergeFlag does not have any effect in ParallelTxLegacy mode")
}
}

if ctx.IsSet(ParallelTxDAGFlag.Name) {
cfg.EnableParallelTxDAG = ctx.Bool(ParallelTxDAGFlag.Name)
}
Expand Down
6 changes: 5 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ var (

parallelTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/txs", nil)
parallelConflictTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/conflicttxs", nil)
parallelExecutionTimer = metrics.NewRegisteredTimer("chain/parallel/exec", nil)
parallelConfirmTimer = metrics.NewRegisteredTimer("chain/parallel/confirm", nil)
parallelTxLevelsSizeMeter = metrics.NewRegisteredGauge("chain/parallel/txlevel/size", nil)
parallelTxLevelTxSizeMeter = metrics.NewRegisteredGauge("chain/parallel/txlevel/txsize", nil)

blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil)
mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil)
Expand Down Expand Up @@ -2751,7 +2755,7 @@ func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) {
}

func (bc *BlockChain) TxDAGEnabledWhenMine() bool {
return bc.enableTxDAG && bc.txDAGWriteCh == nil && bc.txDAGReader == nil
return bc.enableTxDAG && bc.txDAGWriteCh == nil && bc.txDAGReader == nil && !bc.vmConfig.EnableParallelExec && !bc.vmConfig.EnableParallelExecLegacy
}

func (bc *BlockChain) SetupTxDAGGeneration(output string, readFile bool) {
Expand Down
25 changes: 20 additions & 5 deletions core/parallel_state_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"fmt"
"runtime"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

var runner chan func()
Expand Down Expand Up @@ -99,7 +101,7 @@ func (cq *confirmQueue) collect(result *PEVMTxResult) error {
return nil
}

func (cq *confirmQueue) confirmWithTrust(level TxLevel, execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error) (error, int) {
func (cq *confirmQueue) confirmWithUnordered(level TxLevel, execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error) (error, int) {
// find all able-to-confirm transactions, and try to confirm them
for _, tx := range level {
i := tx.txIndex
Expand Down Expand Up @@ -187,16 +189,23 @@ func (cq *confirmQueue) rerun(i int, execute func(*PEVMTxRequest) *PEVMTxResult,

// run runs the transactions in parallel
// execute must return a non-nil result, otherwise it panics.
func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error) (error, int) {
func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error, unorderedMerge bool) (error, int) {
toConfirm := &confirmQueue{
queue: make([]confirmation, tls.txCount()),
confirmed: -1,
}

trustDAG := false
totalExecutionTime := int64(0)
totalConfirmTime := int64(0)
maxLevelTxCount := 0

// execute all transactions in parallel
for _, txLevel := range tls {
start := time.Now()
log.Debug("txLevel tx count", "tx count", len(txLevel))
if len(txLevel) > maxLevelTxCount {
maxLevelTxCount = len(txLevel)
}
wait := sync.WaitGroup{}
trunks := txLevel.Split(runtime.NumCPU())
wait.Add(len(trunks))
Expand All @@ -215,9 +224,11 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func
runner <- run
}
wait.Wait()
totalExecutionTime += time.Since(start).Nanoseconds()
start = time.Now()
// all transactions of current level are executed, now try to confirm.
if trustDAG {
if err, txIndex := toConfirm.confirmWithTrust(txLevel, execute, confirm); err != nil {
if unorderedMerge {
if err, txIndex := toConfirm.confirmWithUnordered(txLevel, execute, confirm); err != nil {
// something very wrong, stop the process
return err, txIndex
}
Expand All @@ -227,7 +238,11 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func
return err, txIndex
}
}
totalConfirmTime += time.Since(start).Nanoseconds()
}
parallelTxLevelTxSizeMeter.Update(int64(maxLevelTxCount))
parallelExecutionTimer.Update(time.Duration(totalExecutionTime))
parallelConfirmTimer.Update(time.Duration(totalConfirmTime))
return nil, 0
}

Expand Down
16 changes: 7 additions & 9 deletions core/parallel_state_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,7 @@ func setTxIndex(allReq []*PEVMTxRequest) {
func TestTxLevelRun(t *testing.T) {
// case 1: empty txs
case1 := func() {
levels([]uint64{}, [][]int{}).Run(
func(*PEVMTxRequest) *PEVMTxResult { return nil },
func(*PEVMTxResult) error { return nil })
levels([]uint64{}, [][]int{}).Run(func(*PEVMTxRequest) *PEVMTxResult { return nil }, func(*PEVMTxResult) error { return nil }, false)
}
// case 2: 4 txs with no dependencies, no conflicts
case2 := func() {
Expand All @@ -287,7 +285,7 @@ func TestTxLevelRun(t *testing.T) {
nil, nil, nil, nil,
})
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(map[int]int{1: 0, 2: 0, 3: 0, 4: 0, 5: 11, 6: 21, 7: 31, 8: 41})
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand Down Expand Up @@ -323,7 +321,7 @@ func TestTxLevelRun(t *testing.T) {
nil, nil, {0}, {1},
})
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(map[int]int{1: 0, 2: 0, 3: 0, 4: 0, 5: 11, 6: 21})
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand Down Expand Up @@ -359,7 +357,7 @@ func TestTxLevelRun(t *testing.T) {
{0}, nil, {-1}, {-1},
})
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(map[int]int{1: 0, 2: 0, 3: 0, 4: 0, 5: 11, 6: 21})
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand Down Expand Up @@ -407,7 +405,7 @@ func TestTxLevelRun(t *testing.T) {
res[i+2000] = i
}
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(res)
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand All @@ -433,7 +431,7 @@ func TestTxLevelRun(t *testing.T) {
}
setTxIndex(allReqs)
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(map[int]int{1: 5, 2: 20, 3: 10})
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand Down Expand Up @@ -462,7 +460,7 @@ func TestTxLevelRun(t *testing.T) {
}
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
setTxIndex(allReqs)
err, _ := NewTxLevels(allReqs, dag).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, dag).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(map[int]int{1: 5, 2: 20, 3: 10})
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand Down
30 changes: 25 additions & 5 deletions core/pevm_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ type PEVMProcessor struct {
commonTxs []*types.Transaction
receipts types.Receipts
debugConflictRedoNum uint64
unorderedMerge bool
}

func newPEVMProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *PEVMProcessor {
processor := &PEVMProcessor{
StateProcessor: *NewStateProcessor(config, bc, engine),
unorderedMerge: bc.vmConfig.EnableParallelUnorderedMerge,
}
log.Info("Parallel execution mode is enabled", "Parallel Num", ParallelNum(),
"CPUNum", runtime.NumCPU())
"CPUNum", runtime.NumCPU(), "unorderedMerge", processor.unorderedMerge)
return processor
}

Expand Down Expand Up @@ -120,9 +122,14 @@ func (p *PEVMProcessor) executeInSlot(maindb *state.StateDB, txReq *PEVMTxReques
// if it is in Stage 2 it is a likely result, not 100% sure
func (p *PEVMProcessor) toConfirmTxIndexResult(txResult *PEVMTxResult) error {
txReq := txResult.txReq
if err := p.hasConflict(txResult); err != nil {
log.Info(fmt.Sprintf("HasConflict!! block: %d, txIndex: %d\n", txResult.txReq.block.NumberU64(), txResult.txReq.txIndex))
return err
if !p.unorderedMerge || !txReq.useDAG {
// If we do not use a DAG, then we need to check for conflicts to ensure correct execution.
// When we perform an unordered merge, we cannot conduct conflict checks
// and can only choose to trust that the DAG is correct and that conflicts do not exist.
if err := p.hasConflict(txResult); err != nil {
log.Info(fmt.Sprintf("HasConflict!! block: %d, txIndex: %d\n", txResult.txReq.block.NumberU64(), txResult.txReq.txIndex))
return err
}
}

// goroutine unsafe operation will be handled from here for safety
Expand Down Expand Up @@ -253,6 +260,8 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
// parallel execution
start := time.Now()
txLevels := NewTxLevels(p.allTxReqs, txDAG)
log.Debug("txLevels size", "txLevels size", len(txLevels))
parallelTxLevelsSizeMeter.Update(int64(len(txLevels)))
buildLevelsDuration := time.Since(start)
var executeDurations, confirmDurations int64 = 0, 0
err, txIndex := txLevels.Run(func(pr *PEVMTxRequest) (res *PEVMTxResult) {
Expand All @@ -274,8 +283,9 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
atomic.AddUint64(&p.debugConflictRedoNum, 1)
}
}(time.Now())
log.Debug("pevm confirm", "txIndex", pr.txReq.txIndex)
return p.confirmTxResult(statedb, gp, pr)
})
}, p.unorderedMerge && txDAG != nil)
parallelRunDuration := time.Since(start) - buildLevelsDuration
if err != nil {
tx := allTxs[txIndex]
Expand Down Expand Up @@ -320,7 +330,17 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
p.engine.Finalize(p.bc, header, statedb, p.commonTxs, block.Uncles(), withdrawals)

var allLogs []*types.Log
var lindex = 0
var cumulativeGasUsed uint64
for _, receipt := range p.receipts {
// reset the log index
for _, log := range receipt.Logs {
log.Index = uint(lindex)
lindex++
}
// re-calculate the cumulativeGasUsed
cumulativeGasUsed += receipt.GasUsed
receipt.CumulativeGasUsed = cumulativeGasUsed
allLogs = append(allLogs, receipt.Logs...)
}
return p.receipts, allLogs, *usedGas, nil
Expand Down
6 changes: 3 additions & 3 deletions core/state/pevm_statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,9 @@ func (pst *UncommittedDB) Merge(deleteEmptyObjects bool) error {
// so we don't need to merge anything.
return nil
}
if err := pst.conflictsToMaindb(); err != nil {
return err
}
//if err := pst.conflictsToMaindb(); err != nil {
// return err
//}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can keep this uncommented, and make separate PR later for this when it is confirmed to be OK


// 0. set the TxContext
pst.maindb.SetTxContext(pst.txHash, pst.txIndex)
Expand Down
1 change: 0 additions & 1 deletion core/types/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func ValidatePlainTxDAG(d TxDAG, txCnt int) error {
return fmt.Errorf("PlainTxDAG contains unordered dependency, tx: %v", i)
}
}

}
return nil
}
Expand Down
22 changes: 22 additions & 0 deletions core/types/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/golang/snappy"

"github.com/cometbft/cometbft/libs/rand"
Expand All @@ -21,16 +22,37 @@ var (

func TestEncodeTxDAGCalldata(t *testing.T) {
tg := mockSimpleDAG()
originTg := tg
data, err := EncodeTxDAGCalldata(tg)
assert.Equal(t, nil, err)
tg, err = DecodeTxDAGCalldata(data)
assert.Equal(t, nil, err)
assert.Equal(t, true, tg.TxCount() > 0)
assert.Equal(t, originTg, tg)

_, err = DecodeTxDAGCalldata(nil)
assert.NotEqual(t, nil, err)
}

func TestDecodeCalldata(t *testing.T) {
calldata := "0x5517ed8c000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000001b201f901aef901abc2c002c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c2c152c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c2c0010000000000000000000000000000"
decode, err := hexutil.Decode(calldata)
if err != nil {
return
}
dagCalldata, err := DecodeTxDAGCalldata(decode)
if err != nil {
t.Errorf("Error decoding calldata: %s", err)
return
}
//for i := 0; i < dagCalldata.TxCount(); i++ {
// dep := dagCalldata.TxDep(i)
// log.Printf("idx:%d,dep:%v", i, dep.TxIndexes)
//}
assert.Equal(t, true, dagCalldata.TxDep(186).Exist(82))
assert.Equal(t, 0, dagCalldata.TxDep(187).Count())
}

func TestTxDAG_SetTxDep(t *testing.T) {
dag := mockSimpleDAG()
require.NoError(t, dag.SetTxDep(9, NewTxDep(nil, NonDependentRelFlag)))
Expand Down
21 changes: 11 additions & 10 deletions core/vm/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ type PrecompileOverrides func(params.Rules, PrecompiledContract, common.Address)

// Config are the configuration options for the Interpreter
type Config struct {
Tracer EVMLogger // Opcode logger
NoBaseFee bool // Forces the EIP-1559 baseFee to 0 (needed for 0 price calls)
EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages
ExtraEips []int // Additional EIPS that are to be enabled
EnableParallelExecLegacy bool // Whether to execute transaction in parallel mode when do full sync
EnableParallelExec bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
OptimismPrecompileOverrides PrecompileOverrides // Precompile overrides for Optimism
EnableOpcodeOptimizations bool // Enable opcode optimization
TxDAG types.TxDAG
Tracer EVMLogger // Opcode logger
NoBaseFee bool // Forces the EIP-1559 baseFee to 0 (needed for 0 price calls)
EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages
ExtraEips []int // Additional EIPS that are to be enabled
EnableParallelExecLegacy bool // Whether to execute transaction in parallel mode when do full sync
EnableParallelExec bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
OptimismPrecompileOverrides PrecompileOverrides // Precompile overrides for Optimism
EnableOpcodeOptimizations bool // Enable opcode optimization
TxDAG types.TxDAG
EnableParallelUnorderedMerge bool // Whether to enable unordered merge in parallel mode
}

// ScopeContext contains the things that are per-call, such as stack and memory,
Expand Down
11 changes: 6 additions & 5 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
journalFilePath := fmt.Sprintf("%s/%s", stack.ResolvePath(ChainData), JournalFileName)
var (
vmConfig = vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
EnableParallelExecLegacy: config.ParallelTxLegacyMode,
EnableParallelExec: config.ParallelTxMode,
ParallelTxNum: config.ParallelTxNum,
EnableOpcodeOptimizations: config.EnableOpcodeOptimizing,
EnablePreimageRecording: config.EnablePreimageRecording,
EnableParallelExecLegacy: config.ParallelTxLegacyMode,
EnableParallelExec: config.ParallelTxMode,
EnableParallelUnorderedMerge: config.ParallelTxUnorderedMerge,
ParallelTxNum: config.ParallelTxNum,
EnableOpcodeOptimizations: config.EnableOpcodeOptimizing,
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
Expand Down
13 changes: 7 additions & 6 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,13 @@ type Config struct {
RollupDisableTxPoolAdmission bool
RollupHaltOnIncompatibleProtocolVersion string

ParallelTxLegacyMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
ParallelTxLegacyMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode
}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down
Loading