diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ccf3d848f..ebe04b22e 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -2030,6 +2030,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.ParallelTxUnorderedMerge = ctx.Bool(ParallelTxUnorderedMergeFlag.Name) } + if ctx.IsSet(ParallelTxNumFlag.Name) { + cfg.ParallelTxNum = ctx.Int(ParallelTxNumFlag.Name) + } + if ctx.IsSet(ParallelTxDAGFlag.Name) { cfg.EnableParallelTxDAG = ctx.Bool(ParallelTxDAGFlag.Name) } diff --git a/core/parallel_state_scheduler.go b/core/parallel_state_scheduler.go index 60d607064..963a39ba8 100644 --- a/core/parallel_state_scheduler.go +++ b/core/parallel_state_scheduler.go @@ -13,10 +13,12 @@ import ( var runner chan func() -func init() { - cpuNum := runtime.NumCPU() - runner = make(chan func(), cpuNum) - for i := 0; i < cpuNum; i++ { +func initParallelRunner(targetNum int) { + if targetNum == 0 { + targetNum = runtime.GOMAXPROCS(0) + } + runner = make(chan func(), targetNum) + for i := 0; i < targetNum; i++ { go func() { for f := range runner { f() @@ -187,6 +189,8 @@ func (cq *confirmQueue) rerun(i int, execute func(*PEVMTxRequest) *PEVMTxResult, return nil } +var goMaxProcs = runtime.GOMAXPROCS(0) + // run runs the transactions in parallel // execute must return a non-nil result, otherwise it panics. func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error, unorderedMerge bool) (error, int) { @@ -207,7 +211,7 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func maxLevelTxCount = len(txLevel) } wait := sync.WaitGroup{} - trunks := txLevel.Split(runtime.NumCPU()) + trunks := txLevel.Split(goMaxProcs) wait.Add(len(trunks)) // split tx into chunks, to save the cost of channel communication for _, txs := range trunks { diff --git a/core/pevm_processor.go b/core/pevm_processor.go index 3acfbcae8..83edcb56e 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -34,8 +34,9 @@ func newPEVMProcessor(config *params.ChainConfig, bc *BlockChain, engine consens StateProcessor: *NewStateProcessor(config, bc, engine), unorderedMerge: bc.vmConfig.EnableParallelUnorderedMerge, } + initParallelRunner(bc.vmConfig.ParallelTxNum) log.Info("Parallel execution mode is enabled", "Parallel Num", ParallelNum(), - "CPUNum", runtime.NumCPU(), "unorderedMerge", processor.unorderedMerge) + "CPUNum", runtime.GOMAXPROCS(0), "unorderedMerge", processor.unorderedMerge) return processor }