Skip to content

Commit

Permalink
txdag: support write txdag to file;
Browse files Browse the repository at this point in the history
  • Loading branch information
galaio authored and welkin22 committed Nov 13, 2024
1 parent 43a4419 commit b6a55c3
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 6 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ var (
utils.RollupHaltOnIncompatibleProtocolVersionFlag,
utils.RollupSuperchainUpgradesFlag,
utils.ParallelTxDAGFlag,
utils.ParallelTxDAGFileFlag,
utils.ParallelTxDAGSenderPrivFlag,
configFileFlag,
utils.LogDebugFlag,
Expand Down
10 changes: 10 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,13 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Category: flags.VMCategory,
}

ParallelTxDAGFileFlag = &cli.StringFlag{
Name: "parallel.txdagfile",
Usage: "It indicates the TxDAG file path",
Value: "./parallel-txdag-output.csv",
Category: flags.VMCategory,
}

VMOpcodeOptimizeFlag = &cli.BoolFlag{
Name: "vm.opcode.optimize",
Usage: "enable opcode optimization",
Expand Down Expand Up @@ -1999,6 +2006,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(ParallelTxDAGFlag.Name) {
cfg.EnableParallelTxDAG = ctx.Bool(ParallelTxDAGFlag.Name)
}
if ctx.IsSet(ParallelTxDAGFileFlag.Name) {
cfg.ParallelTxDAGFile = ctx.String(ParallelTxDAGFileFlag.Name)
}

if ctx.IsSet(ParallelTxDAGSenderPrivFlag.Name) {
priHex := ctx.String(ParallelTxDAGSenderPrivFlag.Name)
Expand Down
55 changes: 52 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
package core

import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"math/big"
"os"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -299,7 +303,8 @@ type BlockChain struct {
vmConfig vm.Config

// parallel EVM related
enableTxDAG bool
enableTxDAG bool
txDAGWriteCh chan TxDAGOutputItem
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -2640,7 +2645,51 @@ func (bc *BlockChain) TxDAGEnabledWhenMine() bool {
return bc.enableTxDAG
}

func (bc *BlockChain) SetupTxDAGGeneration() {
log.Info("node enable TxDAG feature")
func (bc *BlockChain) SetupTxDAGGeneration(output string) {
log.Info("node enable TxDAG feature", "output", output)
bc.enableTxDAG = true
if len(output) == 0 {
return
}

// write handler
go func() {
writeHandle, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Error("OpenFile when open the txDAG output file", "file", output, "err", err)
return
}
bc.txDAGWriteCh = make(chan TxDAGOutputItem, 10000)
defer writeHandle.Close()
for {
select {
case <-bc.quit:
return
case item := <-bc.txDAGWriteCh:
if err := writeTxDAGToFile(writeHandle, item); err != nil {
log.Error("encode TxDAG err in OutputHandler", "err", err)
continue
}
}
}
}()
}

type TxDAGOutputItem struct {
blockNumber uint64
txDAG types.TxDAG
}

func writeTxDAGToFile(writeHandle *os.File, item TxDAGOutputItem) error {
var buf bytes.Buffer
buf.WriteString(strconv.FormatUint(item.blockNumber, 10))
buf.WriteByte(',')
enc, err := types.EncodeTxDAG(item.txDAG)
if err != nil {
return err
}
buf.WriteString(hex.EncodeToString(enc))
buf.WriteByte('\n')
_, err = writeHandle.Write(buf.Bytes())
return err
}
7 changes: 7 additions & 0 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
if metrics.EnabledExpensive {
go types.EvaluateTxDAGPerformance(dag)
}
// try to write txDAG into file
if p.bc.txDAGWriteCh != nil && dag != nil {
p.bc.txDAGWriteCh <- TxDAGOutputItem{
blockNumber: block.NumberU64(),
txDAG: dag,
}
}
} else {
log.Error("ResolveTxDAG err", "block", block.NumberU64(), "tx", len(block.Transactions()), "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
if config.EnableParallelTxDAG {
eth.blockchain.SetupTxDAGGeneration()
eth.blockchain.SetupTxDAGGeneration(config.ParallelTxDAGFile)
}
if chainConfig := eth.blockchain.Config(); chainConfig.Optimism != nil { // config.Genesis.Config.ChainID cannot be used because it's based on CLI flags only, thus default to mainnet L1
config.NetworkId = chainConfig.ChainID.Uint64() // optimism defaults eth network ID to chain ID
Expand Down
1 change: 1 addition & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ type Config struct {

EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down
2 changes: 1 addition & 1 deletion miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func generateTxDAGGaslessBlock(t *testing.T, enableMev, enableTxDAG bool) {
w, b := newTestWorker(t, &config, engine, db, 0, &cfg, &vmConfig)
defer w.close()
if enableTxDAG {
w.chain.SetupTxDAGGeneration()
w.chain.SetupTxDAGGeneration("")
}

// Ignore empty commit here for less noise.
Expand Down
2 changes: 1 addition & 1 deletion tests/block_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, tracer vm.EVMLogger, en
}
defer chain.Stop()
if enableTxDAG {
chain.SetupTxDAGGeneration()
chain.SetupTxDAGGeneration("")
}

validBlocks, err := t.insertBlocks(chain)
Expand Down

0 comments on commit b6a55c3

Please sign in to comment.