From a340a72e79eada1951184295109432c9088cf1ff Mon Sep 17 00:00:00 2001 From: welkin22 Date: Thu, 10 Oct 2024 16:30:50 +0800 Subject: [PATCH 01/12] fix parallel merge --- core/parallel_state_scheduler.go | 2 +- core/pevm_processor.go | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/parallel_state_scheduler.go b/core/parallel_state_scheduler.go index 615b56f2d..63ce58b93 100644 --- a/core/parallel_state_scheduler.go +++ b/core/parallel_state_scheduler.go @@ -193,7 +193,7 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func confirmed: -1, } - trustDAG := false + trustDAG := true // execute all transactions in parallel for _, txLevel := range tls { diff --git a/core/pevm_processor.go b/core/pevm_processor.go index f1fe0adb4..defbfe13f 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -320,7 +320,17 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg p.engine.Finalize(p.bc, header, statedb, p.commonTxs, block.Uncles(), withdrawals) var allLogs []*types.Log + var lindex = 0 + var cumulativeGasUsed uint64 for _, receipt := range p.receipts { + // reset the log index + for _, log := range receipt.Logs { + log.Index = uint(lindex) + lindex++ + } + // re-calculate the cumulativeGasUsed + cumulativeGasUsed += receipt.GasUsed + receipt.CumulativeGasUsed = cumulativeGasUsed allLogs = append(allLogs, receipt.Logs...) } return p.receipts, allLogs, *usedGas, nil From b745c88557eca301c844e0c005d05ae090281573 Mon Sep 17 00:00:00 2001 From: welkin22 Date: Fri, 11 Oct 2024 14:57:27 +0800 Subject: [PATCH 02/12] remove conflict --- core/pevm_processor.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/pevm_processor.go b/core/pevm_processor.go index defbfe13f..99b43c99f 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -120,10 +120,10 @@ func (p *PEVMProcessor) executeInSlot(maindb *state.StateDB, txReq *PEVMTxReques // if it is in Stage 2 it is a likely result, not 100% sure func (p *PEVMProcessor) toConfirmTxIndexResult(txResult *PEVMTxResult) error { txReq := txResult.txReq - if err := p.hasConflict(txResult); err != nil { - log.Info(fmt.Sprintf("HasConflict!! block: %d, txIndex: %d\n", txResult.txReq.block.NumberU64(), txResult.txReq.txIndex)) - return err - } + //if err := p.hasConflict(txResult); err != nil { + // log.Info(fmt.Sprintf("HasConflict!! block: %d, txIndex: %d\n", txResult.txReq.block.NumberU64(), txResult.txReq.txIndex)) + // return err + //} // goroutine unsafe operation will be handled from here for safety gasConsumed := txReq.gasLimit - txResult.gpSlot.Gas() From b77be539c25e627b9eaa3439ffd64788300c5d11 Mon Sep 17 00:00:00 2001 From: welkin22 Date: Fri, 11 Oct 2024 19:48:06 +0800 Subject: [PATCH 03/12] TxDAGEnabledWhenMine add condition --- core/blockchain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/blockchain.go b/core/blockchain.go index 8845c8842..365c6edb8 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2751,7 +2751,7 @@ func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) { } func (bc *BlockChain) TxDAGEnabledWhenMine() bool { - return bc.enableTxDAG && bc.txDAGWriteCh == nil && bc.txDAGReader == nil + return bc.enableTxDAG && bc.txDAGWriteCh == nil && bc.txDAGReader == nil && !bc.vmConfig.EnableParallelExec && !bc.vmConfig.EnableParallelExecV2 } func (bc *BlockChain) SetupTxDAGGeneration(output string, readFile bool) { From d2dceaa4d959d1879ca079e4100ce6b693ec7741 Mon Sep 17 00:00:00 2001 From: welkin22 Date: Fri, 11 Oct 2024 14:19:17 +0800 Subject: [PATCH 04/12] add txlevel metrics --- core/blockchain.go | 2 ++ core/parallel_state_scheduler.go | 3 +++ core/pevm_processor.go | 2 ++ core/types/dag.go | 1 - 4 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/blockchain.go b/core/blockchain.go index 365c6edb8..ef3c6d252 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -100,6 +100,8 @@ var ( parallelTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/txs", nil) parallelConflictTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/conflicttxs", nil) + parallelTxLevelsSizeMeter = metrics.NewRegisteredMeter("chain/parallel/txlevel/size", nil) + parallelTxLevelTxSizeMeter = metrics.NewRegisteredMeter("chain/parallel/txlevel/txsize", nil) blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil) mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil) diff --git a/core/parallel_state_scheduler.go b/core/parallel_state_scheduler.go index 63ce58b93..b474b9744 100644 --- a/core/parallel_state_scheduler.go +++ b/core/parallel_state_scheduler.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" ) var runner chan func() @@ -197,6 +198,8 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func // execute all transactions in parallel for _, txLevel := range tls { + log.Debug("txLevel tx count:%d", len(txLevel)) + parallelTxLevelTxSizeMeter.Mark(int64(len(txLevel))) wait := sync.WaitGroup{} trunks := txLevel.Split(runtime.NumCPU()) wait.Add(len(trunks)) diff --git a/core/pevm_processor.go b/core/pevm_processor.go index 99b43c99f..c34bd0713 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -253,6 +253,8 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg // parallel execution start := time.Now() txLevels := NewTxLevels(p.allTxReqs, txDAG) + log.Debug("txLevels size:%d", len(txLevels)) + parallelTxLevelsSizeMeter.Mark(int64(len(txLevels))) buildLevelsDuration := time.Since(start) var executeDurations, confirmDurations int64 = 0, 0 err, txIndex := txLevels.Run(func(pr *PEVMTxRequest) (res *PEVMTxResult) { diff --git a/core/types/dag.go b/core/types/dag.go index b3d5f0f08..7dc0d89e9 100644 --- a/core/types/dag.go +++ b/core/types/dag.go @@ -176,7 +176,6 @@ func ValidatePlainTxDAG(d TxDAG, txCnt int) error { return fmt.Errorf("PlainTxDAG contains unordered dependency, tx: %v", i) } } - } return nil } From e676fa1ee2f28e8d2cb0c9a3391f809fb95cc5a4 Mon Sep 17 00:00:00 2001 From: welkin22 Date: Fri, 11 Oct 2024 14:58:24 +0800 Subject: [PATCH 05/12] fix log --- core/parallel_state_scheduler.go | 2 +- core/pevm_processor.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/parallel_state_scheduler.go b/core/parallel_state_scheduler.go index b474b9744..237909731 100644 --- a/core/parallel_state_scheduler.go +++ b/core/parallel_state_scheduler.go @@ -198,7 +198,7 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func // execute all transactions in parallel for _, txLevel := range tls { - log.Debug("txLevel tx count:%d", len(txLevel)) + log.Debug("txLevel tx count", "tx count", len(txLevel)) parallelTxLevelTxSizeMeter.Mark(int64(len(txLevel))) wait := sync.WaitGroup{} trunks := txLevel.Split(runtime.NumCPU()) diff --git a/core/pevm_processor.go b/core/pevm_processor.go index c34bd0713..4804d0979 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -253,7 +253,7 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg // parallel execution start := time.Now() txLevels := NewTxLevels(p.allTxReqs, txDAG) - log.Debug("txLevels size:%d", len(txLevels)) + log.Debug("txLevels size", "txLevels size", len(txLevels)) parallelTxLevelsSizeMeter.Mark(int64(len(txLevels))) buildLevelsDuration := time.Since(start) var executeDurations, confirmDurations int64 = 0, 0 From 680b6e6b45fd330cc032ad791f3e422f0c509f13 Mon Sep 17 00:00:00 2001 From: welkin22 Date: Fri, 11 Oct 2024 15:06:09 +0800 Subject: [PATCH 06/12] change metrics type --- core/blockchain.go | 4 ++-- core/parallel_state_scheduler.go | 2 +- core/pevm_processor.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index ef3c6d252..a59d5f2c8 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -100,8 +100,8 @@ var ( parallelTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/txs", nil) parallelConflictTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/conflicttxs", nil) - parallelTxLevelsSizeMeter = metrics.NewRegisteredMeter("chain/parallel/txlevel/size", nil) - parallelTxLevelTxSizeMeter = metrics.NewRegisteredMeter("chain/parallel/txlevel/txsize", nil) + parallelTxLevelsSizeMeter = metrics.NewRegisteredGauge("chain/parallel/txlevel/size", nil) + parallelTxLevelTxSizeMeter = metrics.NewRegisteredGauge("chain/parallel/txlevel/txsize", nil) blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil) mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil) diff --git a/core/parallel_state_scheduler.go b/core/parallel_state_scheduler.go index 237909731..0a5a64765 100644 --- a/core/parallel_state_scheduler.go +++ b/core/parallel_state_scheduler.go @@ -199,7 +199,7 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func // execute all transactions in parallel for _, txLevel := range tls { log.Debug("txLevel tx count", "tx count", len(txLevel)) - parallelTxLevelTxSizeMeter.Mark(int64(len(txLevel))) + parallelTxLevelTxSizeMeter.Update(int64(len(txLevel))) wait := sync.WaitGroup{} trunks := txLevel.Split(runtime.NumCPU()) wait.Add(len(trunks)) diff --git a/core/pevm_processor.go b/core/pevm_processor.go index 4804d0979..020355563 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -254,7 +254,7 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg start := time.Now() txLevels := NewTxLevels(p.allTxReqs, txDAG) log.Debug("txLevels size", "txLevels size", len(txLevels)) - parallelTxLevelsSizeMeter.Mark(int64(len(txLevels))) + parallelTxLevelsSizeMeter.Update(int64(len(txLevels))) buildLevelsDuration := time.Since(start) var executeDurations, confirmDurations int64 = 0, 0 err, txIndex := txLevels.Run(func(pr *PEVMTxRequest) (res *PEVMTxResult) { From 704c821d6ab5f51519242bc73e6c9b6fc5e49feb Mon Sep 17 00:00:00 2001 From: welkin22 Date: Sat, 12 Oct 2024 15:12:02 +0800 Subject: [PATCH 07/12] add metrics/ignore conflict --- core/blockchain.go | 2 ++ core/parallel_state_scheduler.go | 9 +++++++++ core/pevm_processor.go | 1 + core/state/pevm_statedb.go | 6 +++--- core/types/dag_test.go | 19 +++++++++++++++++++ 5 files changed, 34 insertions(+), 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index a59d5f2c8..eecf24d83 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -100,6 +100,8 @@ var ( parallelTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/txs", nil) parallelConflictTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/conflicttxs", nil) + parallelExecutionTimer = metrics.NewRegisteredTimer("chain/parallel/exec", nil) + parallelConfirmTimer = metrics.NewRegisteredTimer("chain/parallel/confirm", nil) parallelTxLevelsSizeMeter = metrics.NewRegisteredGauge("chain/parallel/txlevel/size", nil) parallelTxLevelTxSizeMeter = metrics.NewRegisteredGauge("chain/parallel/txlevel/txsize", nil) diff --git a/core/parallel_state_scheduler.go b/core/parallel_state_scheduler.go index 0a5a64765..45624aa68 100644 --- a/core/parallel_state_scheduler.go +++ b/core/parallel_state_scheduler.go @@ -4,6 +4,7 @@ import ( "fmt" "runtime" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -195,9 +196,12 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func } trustDAG := true + totalExecutionTime := int64(0) + totalConfirmTime := int64(0) // execute all transactions in parallel for _, txLevel := range tls { + start := time.Now() log.Debug("txLevel tx count", "tx count", len(txLevel)) parallelTxLevelTxSizeMeter.Update(int64(len(txLevel))) wait := sync.WaitGroup{} @@ -218,6 +222,8 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func runner <- run } wait.Wait() + totalExecutionTime += time.Since(start).Nanoseconds() + start = time.Now() // all transactions of current level are executed, now try to confirm. if trustDAG { if err, txIndex := toConfirm.confirmWithTrust(txLevel, execute, confirm); err != nil { @@ -230,7 +236,10 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func return err, txIndex } } + totalConfirmTime += time.Since(start).Nanoseconds() } + parallelExecutionTimer.Update(time.Duration(totalExecutionTime)) + parallelConfirmTimer.Update(time.Duration(totalConfirmTime)) return nil, 0 } diff --git a/core/pevm_processor.go b/core/pevm_processor.go index 020355563..8862f57d7 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -276,6 +276,7 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg atomic.AddUint64(&p.debugConflictRedoNum, 1) } }(time.Now()) + log.Debug("pevm confirm", "txIndex", pr.txReq.txIndex) return p.confirmTxResult(statedb, gp, pr) }) parallelRunDuration := time.Since(start) - buildLevelsDuration diff --git a/core/state/pevm_statedb.go b/core/state/pevm_statedb.go index f511b0c65..38cde4b53 100644 --- a/core/state/pevm_statedb.go +++ b/core/state/pevm_statedb.go @@ -486,9 +486,9 @@ func (pst *UncommittedDB) Merge(deleteEmptyObjects bool) error { // so we don't need to merge anything. return nil } - if err := pst.conflictsToMaindb(); err != nil { - return err - } + //if err := pst.conflictsToMaindb(); err != nil { + // return err + //} // 0. set the TxContext pst.maindb.SetTxContext(pst.txHash, pst.txIndex) diff --git a/core/types/dag_test.go b/core/types/dag_test.go index cc50f2e5d..36a3403c9 100644 --- a/core/types/dag_test.go +++ b/core/types/dag_test.go @@ -2,9 +2,11 @@ package types import ( "encoding/hex" + "log" "testing" "time" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/golang/snappy" "github.com/cometbft/cometbft/libs/rand" @@ -31,6 +33,23 @@ func TestEncodeTxDAGCalldata(t *testing.T) { assert.NotEqual(t, nil, err) } +func TestDecodeCalldata(t *testing.T) { + calldata := "0x5517ed8c000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000001b201f901aef901abc2c002c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c2c152c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c2c0010000000000000000000000000000" + decode, err := hexutil.Decode(calldata) + if err != nil { + return + } + dagCalldata, err := DecodeTxDAGCalldata(decode) + if err != nil { + t.Errorf("Error decoding calldata: %s", err) + return + } + for i := 0; i < dagCalldata.TxCount(); i++ { + dep := dagCalldata.TxDep(i) + log.Printf("idx:%d,dep:%v", i, dep.TxIndexes) + } +} + func TestTxDAG_SetTxDep(t *testing.T) { dag := mockSimpleDAG() require.NoError(t, dag.SetTxDep(9, NewTxDep(nil, NonDependentRelFlag))) From d07960dad8dfd12919fe2aa6a10d403ce0a10823 Mon Sep 17 00:00:00 2001 From: welkin22 Date: Mon, 21 Oct 2024 14:36:02 +0800 Subject: [PATCH 08/12] fix parallelTxLevelTxSizeMeter --- core/parallel_state_scheduler.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/parallel_state_scheduler.go b/core/parallel_state_scheduler.go index 45624aa68..b3def0db7 100644 --- a/core/parallel_state_scheduler.go +++ b/core/parallel_state_scheduler.go @@ -198,12 +198,15 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func trustDAG := true totalExecutionTime := int64(0) totalConfirmTime := int64(0) + maxLevelTxCount := 0 // execute all transactions in parallel for _, txLevel := range tls { start := time.Now() log.Debug("txLevel tx count", "tx count", len(txLevel)) - parallelTxLevelTxSizeMeter.Update(int64(len(txLevel))) + if len(txLevel) > maxLevelTxCount { + maxLevelTxCount = len(txLevel) + } wait := sync.WaitGroup{} trunks := txLevel.Split(runtime.NumCPU()) wait.Add(len(trunks)) @@ -238,6 +241,7 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func } totalConfirmTime += time.Since(start).Nanoseconds() } + parallelTxLevelTxSizeMeter.Update(int64(maxLevelTxCount)) parallelExecutionTimer.Update(time.Duration(totalExecutionTime)) parallelConfirmTimer.Update(time.Duration(totalConfirmTime)) return nil, 0 From 1d693cc2c9822340653db97ab0b033709db2600b Mon Sep 17 00:00:00 2001 From: welkin22 Date: Mon, 21 Oct 2024 15:20:57 +0800 Subject: [PATCH 09/12] add ParallelTxUnorderedMergeFlag --- cmd/geth/main.go | 1 + cmd/utils/flags.go | 13 +++++++++++++ core/blockchain.go | 2 +- core/parallel_state_scheduler.go | 9 ++++----- core/parallel_state_scheduler_test.go | 16 +++++++--------- core/pevm_processor.go | 17 ++++++++++++----- core/vm/interpreter.go | 21 +++++++++++---------- eth/backend.go | 11 ++++++----- eth/ethconfig/config.go | 13 +++++++------ 9 files changed, 62 insertions(+), 41 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 9944682ba..24788ec6d 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -171,6 +171,7 @@ var ( utils.RollupSuperchainUpgradesFlag, utils.ParallelTxLegacyFlag, utils.ParallelTxFlag, + utils.ParallelTxUnorderedMergeFlag, utils.ParallelTxNumFlag, utils.ParallelTxDAGFlag, utils.ParallelTxDAGFileFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 49eeb9666..101293ddd 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1107,6 +1107,12 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. Category: flags.VMCategory, } + ParallelTxUnorderedMergeFlag = &cli.BoolFlag{ + Name: "parallel.unordered-merge", + Usage: "Enable unordered merge mode, during the parallel confirm phase, merge transaction execution results without following the transaction order.", + Category: flags.VMCategory, + } + ParallelTxNumFlag = &cli.IntFlag{ Name: "parallel.num", Usage: "Number of slot for transaction execution, only valid in parallel mode (runtime calculated, no fixed default value)", @@ -2049,6 +2055,13 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.ParallelTxMode = ctx.Bool(ParallelTxFlag.Name) } + if ctx.IsSet(ParallelTxUnorderedMergeFlag.Name) { + cfg.ParallelTxUnorderedMerge = ctx.Bool(ParallelTxUnorderedMergeFlag.Name) + if ctx.IsSet(ParallelTxLegacyFlag.Name) && ctx.Bool(ParallelTxLegacyFlag.Name) { + log.Warn("ParallelTxUnorderedMergeFlag does not have any effect in ParallelTxLegacy mode") + } + } + if ctx.IsSet(ParallelTxDAGFlag.Name) { cfg.EnableParallelTxDAG = ctx.Bool(ParallelTxDAGFlag.Name) } diff --git a/core/blockchain.go b/core/blockchain.go index eecf24d83..03ff7f80c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2755,7 +2755,7 @@ func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) { } func (bc *BlockChain) TxDAGEnabledWhenMine() bool { - return bc.enableTxDAG && bc.txDAGWriteCh == nil && bc.txDAGReader == nil && !bc.vmConfig.EnableParallelExec && !bc.vmConfig.EnableParallelExecV2 + return bc.enableTxDAG && bc.txDAGWriteCh == nil && bc.txDAGReader == nil && !bc.vmConfig.EnableParallelExec && !bc.vmConfig.EnableParallelExecLegacy } func (bc *BlockChain) SetupTxDAGGeneration(output string, readFile bool) { diff --git a/core/parallel_state_scheduler.go b/core/parallel_state_scheduler.go index b3def0db7..60d607064 100644 --- a/core/parallel_state_scheduler.go +++ b/core/parallel_state_scheduler.go @@ -101,7 +101,7 @@ func (cq *confirmQueue) collect(result *PEVMTxResult) error { return nil } -func (cq *confirmQueue) confirmWithTrust(level TxLevel, execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error) (error, int) { +func (cq *confirmQueue) confirmWithUnordered(level TxLevel, execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error) (error, int) { // find all able-to-confirm transactions, and try to confirm them for _, tx := range level { i := tx.txIndex @@ -189,13 +189,12 @@ func (cq *confirmQueue) rerun(i int, execute func(*PEVMTxRequest) *PEVMTxResult, // run runs the transactions in parallel // execute must return a non-nil result, otherwise it panics. -func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error) (error, int) { +func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error, unorderedMerge bool) (error, int) { toConfirm := &confirmQueue{ queue: make([]confirmation, tls.txCount()), confirmed: -1, } - trustDAG := true totalExecutionTime := int64(0) totalConfirmTime := int64(0) maxLevelTxCount := 0 @@ -228,8 +227,8 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func totalExecutionTime += time.Since(start).Nanoseconds() start = time.Now() // all transactions of current level are executed, now try to confirm. - if trustDAG { - if err, txIndex := toConfirm.confirmWithTrust(txLevel, execute, confirm); err != nil { + if unorderedMerge { + if err, txIndex := toConfirm.confirmWithUnordered(txLevel, execute, confirm); err != nil { // something very wrong, stop the process return err, txIndex } diff --git a/core/parallel_state_scheduler_test.go b/core/parallel_state_scheduler_test.go index 9b2496cc0..6c181e532 100644 --- a/core/parallel_state_scheduler_test.go +++ b/core/parallel_state_scheduler_test.go @@ -263,9 +263,7 @@ func setTxIndex(allReq []*PEVMTxRequest) { func TestTxLevelRun(t *testing.T) { // case 1: empty txs case1 := func() { - levels([]uint64{}, [][]int{}).Run( - func(*PEVMTxRequest) *PEVMTxResult { return nil }, - func(*PEVMTxResult) error { return nil }) + levels([]uint64{}, [][]int{}).Run(func(*PEVMTxRequest) *PEVMTxResult { return nil }, func(*PEVMTxResult) error { return nil }, false) } // case 2: 4 txs with no dependencies, no conflicts case2 := func() { @@ -287,7 +285,7 @@ func TestTxLevelRun(t *testing.T) { nil, nil, nil, nil, }) caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)} - err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm) + err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm, false) ok := checkMainDB(map[int]int{1: 0, 2: 0, 3: 0, 4: 0, 5: 11, 6: 21, 7: 31, 8: 41}) if err != nil { t.Fatalf("failed, err:%v", err) @@ -323,7 +321,7 @@ func TestTxLevelRun(t *testing.T) { nil, nil, {0}, {1}, }) caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)} - err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm) + err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm, false) ok := checkMainDB(map[int]int{1: 0, 2: 0, 3: 0, 4: 0, 5: 11, 6: 21}) if err != nil { t.Fatalf("failed, err:%v", err) @@ -359,7 +357,7 @@ func TestTxLevelRun(t *testing.T) { {0}, nil, {-1}, {-1}, }) caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)} - err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm) + err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm, false) ok := checkMainDB(map[int]int{1: 0, 2: 0, 3: 0, 4: 0, 5: 11, 6: 21}) if err != nil { t.Fatalf("failed, err:%v", err) @@ -407,7 +405,7 @@ func TestTxLevelRun(t *testing.T) { res[i+2000] = i } caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)} - err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm) + err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm, false) ok := checkMainDB(res) if err != nil { t.Fatalf("failed, err:%v", err) @@ -433,7 +431,7 @@ func TestTxLevelRun(t *testing.T) { } setTxIndex(allReqs) caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)} - err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm) + err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm, false) ok := checkMainDB(map[int]int{1: 5, 2: 20, 3: 10}) if err != nil { t.Fatalf("failed, err:%v", err) @@ -462,7 +460,7 @@ func TestTxLevelRun(t *testing.T) { } caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)} setTxIndex(allReqs) - err, _ := NewTxLevels(allReqs, dag).Run(caller.execute, caller.confirm) + err, _ := NewTxLevels(allReqs, dag).Run(caller.execute, caller.confirm, false) ok := checkMainDB(map[int]int{1: 5, 2: 20, 3: 10}) if err != nil { t.Fatalf("failed, err:%v", err) diff --git a/core/pevm_processor.go b/core/pevm_processor.go index 8862f57d7..d16046339 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -26,11 +26,13 @@ type PEVMProcessor struct { commonTxs []*types.Transaction receipts types.Receipts debugConflictRedoNum uint64 + unorderedMerge bool } func newPEVMProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *PEVMProcessor { processor := &PEVMProcessor{ StateProcessor: *NewStateProcessor(config, bc, engine), + unorderedMerge: bc.vmConfig.EnableParallelUnorderedMerge, } log.Info("Parallel execution mode is enabled", "Parallel Num", ParallelNum(), "CPUNum", runtime.NumCPU()) @@ -120,10 +122,15 @@ func (p *PEVMProcessor) executeInSlot(maindb *state.StateDB, txReq *PEVMTxReques // if it is in Stage 2 it is a likely result, not 100% sure func (p *PEVMProcessor) toConfirmTxIndexResult(txResult *PEVMTxResult) error { txReq := txResult.txReq - //if err := p.hasConflict(txResult); err != nil { - // log.Info(fmt.Sprintf("HasConflict!! block: %d, txIndex: %d\n", txResult.txReq.block.NumberU64(), txResult.txReq.txIndex)) - // return err - //} + if !p.unorderedMerge || !txReq.useDAG { + // If we do not use a DAG, then we need to check for conflicts to ensure correct execution. + // When we perform an unordered merge, we cannot conduct conflict checks + // and can only choose to trust that the DAG is correct and that conflicts do not exist. + if err := p.hasConflict(txResult); err != nil { + log.Info(fmt.Sprintf("HasConflict!! block: %d, txIndex: %d\n", txResult.txReq.block.NumberU64(), txResult.txReq.txIndex)) + return err + } + } // goroutine unsafe operation will be handled from here for safety gasConsumed := txReq.gasLimit - txResult.gpSlot.Gas() @@ -278,7 +285,7 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg }(time.Now()) log.Debug("pevm confirm", "txIndex", pr.txReq.txIndex) return p.confirmTxResult(statedb, gp, pr) - }) + }, p.unorderedMerge) parallelRunDuration := time.Since(start) - buildLevelsDuration if err != nil { tx := allTxs[txIndex] diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go index d8f56ff2a..2678f7460 100644 --- a/core/vm/interpreter.go +++ b/core/vm/interpreter.go @@ -30,16 +30,17 @@ type PrecompileOverrides func(params.Rules, PrecompiledContract, common.Address) // Config are the configuration options for the Interpreter type Config struct { - Tracer EVMLogger // Opcode logger - NoBaseFee bool // Forces the EIP-1559 baseFee to 0 (needed for 0 price calls) - EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages - ExtraEips []int // Additional EIPS that are to be enabled - EnableParallelExecLegacy bool // Whether to execute transaction in parallel mode when do full sync - EnableParallelExec bool // Whether to execute transaction in parallel mode when do full sync - ParallelTxNum int // Number of slot for transaction execution - OptimismPrecompileOverrides PrecompileOverrides // Precompile overrides for Optimism - EnableOpcodeOptimizations bool // Enable opcode optimization - TxDAG types.TxDAG + Tracer EVMLogger // Opcode logger + NoBaseFee bool // Forces the EIP-1559 baseFee to 0 (needed for 0 price calls) + EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages + ExtraEips []int // Additional EIPS that are to be enabled + EnableParallelExecLegacy bool // Whether to execute transaction in parallel mode when do full sync + EnableParallelExec bool // Whether to execute transaction in parallel mode when do full sync + ParallelTxNum int // Number of slot for transaction execution + OptimismPrecompileOverrides PrecompileOverrides // Precompile overrides for Optimism + EnableOpcodeOptimizations bool // Enable opcode optimization + TxDAG types.TxDAG + EnableParallelUnorderedMerge bool // Whether to enable unordered merge in parallel mode } // ScopeContext contains the things that are per-call, such as stack and memory, diff --git a/eth/backend.go b/eth/backend.go index fa536dc98..05956e60d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -223,11 +223,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { journalFilePath := fmt.Sprintf("%s/%s", stack.ResolvePath(ChainData), JournalFileName) var ( vmConfig = vm.Config{ - EnablePreimageRecording: config.EnablePreimageRecording, - EnableParallelExecLegacy: config.ParallelTxLegacyMode, - EnableParallelExec: config.ParallelTxMode, - ParallelTxNum: config.ParallelTxNum, - EnableOpcodeOptimizations: config.EnableOpcodeOptimizing, + EnablePreimageRecording: config.EnablePreimageRecording, + EnableParallelExecLegacy: config.ParallelTxLegacyMode, + EnableParallelExec: config.ParallelTxMode, + EnableParallelUnorderedMerge: config.ParallelTxUnorderedMerge, + ParallelTxNum: config.ParallelTxNum, + EnableOpcodeOptimizations: config.EnableOpcodeOptimizing, } cacheConfig = &core.CacheConfig{ TrieCleanLimit: config.TrieCleanCache, diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 1b6f1f345..39738a882 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -219,12 +219,13 @@ type Config struct { RollupDisableTxPoolAdmission bool RollupHaltOnIncompatibleProtocolVersion string - ParallelTxLegacyMode bool // Whether to execute transaction in parallel mode when do full sync - ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync - ParallelTxNum int // Number of slot for transaction execution - EnableOpcodeOptimizing bool - EnableParallelTxDAG bool - ParallelTxDAGFile string + ParallelTxLegacyMode bool // Whether to execute transaction in parallel mode when do full sync + ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync + ParallelTxNum int // Number of slot for transaction execution + EnableOpcodeOptimizing bool + EnableParallelTxDAG bool + ParallelTxDAGFile string + ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode } // CreateConsensusEngine creates a consensus engine for the given chain config. From 18dbe52061546125684133c714a9cf929417791c Mon Sep 17 00:00:00 2001 From: welkin22 Date: Mon, 21 Oct 2024 15:39:50 +0800 Subject: [PATCH 10/12] change ut --- core/types/dag_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/types/dag_test.go b/core/types/dag_test.go index 36a3403c9..f59ca51cc 100644 --- a/core/types/dag_test.go +++ b/core/types/dag_test.go @@ -2,7 +2,6 @@ package types import ( "encoding/hex" - "log" "testing" "time" @@ -23,11 +22,13 @@ var ( func TestEncodeTxDAGCalldata(t *testing.T) { tg := mockSimpleDAG() + originTg := tg data, err := EncodeTxDAGCalldata(tg) assert.Equal(t, nil, err) tg, err = DecodeTxDAGCalldata(data) assert.Equal(t, nil, err) assert.Equal(t, true, tg.TxCount() > 0) + assert.Equal(t, originTg, tg) _, err = DecodeTxDAGCalldata(nil) assert.NotEqual(t, nil, err) @@ -44,10 +45,12 @@ func TestDecodeCalldata(t *testing.T) { t.Errorf("Error decoding calldata: %s", err) return } - for i := 0; i < dagCalldata.TxCount(); i++ { - dep := dagCalldata.TxDep(i) - log.Printf("idx:%d,dep:%v", i, dep.TxIndexes) - } + //for i := 0; i < dagCalldata.TxCount(); i++ { + // dep := dagCalldata.TxDep(i) + // log.Printf("idx:%d,dep:%v", i, dep.TxIndexes) + //} + assert.Equal(t, true, dagCalldata.TxDep(186).Exist(82)) + assert.Equal(t, 0, dagCalldata.TxDep(187).Count()) } func TestTxDAG_SetTxDep(t *testing.T) { From 19035852710708f746320835ef5a1d37af931638 Mon Sep 17 00:00:00 2001 From: welkin22 Date: Mon, 21 Oct 2024 16:34:05 +0800 Subject: [PATCH 11/12] if txDAG is nil, disable unordered merge --- core/pevm_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/pevm_processor.go b/core/pevm_processor.go index d16046339..af030f931 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -285,7 +285,7 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg }(time.Now()) log.Debug("pevm confirm", "txIndex", pr.txReq.txIndex) return p.confirmTxResult(statedb, gp, pr) - }, p.unorderedMerge) + }, p.unorderedMerge && txDAG != nil) parallelRunDuration := time.Since(start) - buildLevelsDuration if err != nil { tx := allTxs[txIndex] From c5b6cee8b1fd7fda7dd75c199515f73b63174d69 Mon Sep 17 00:00:00 2001 From: welkin22 Date: Mon, 21 Oct 2024 18:03:47 +0800 Subject: [PATCH 12/12] add flag log --- core/pevm_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/pevm_processor.go b/core/pevm_processor.go index af030f931..339e4d717 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -35,7 +35,7 @@ func newPEVMProcessor(config *params.ChainConfig, bc *BlockChain, engine consens unorderedMerge: bc.vmConfig.EnableParallelUnorderedMerge, } log.Info("Parallel execution mode is enabled", "Parallel Num", ParallelNum(), - "CPUNum", runtime.NumCPU()) + "CPUNum", runtime.NumCPU(), "unorderedMerge", processor.unorderedMerge) return processor }