Skip to content

Commit

Permalink
worker: fix TxDAG generation issues when mining block; (bnb-chain#43)
Browse files Browse the repository at this point in the history
* blockchain: avoid enable txdag generation when pevm is enabled;
mvstates: add timeout timer for async loop;

worker: change append TxDAG position;

worker: fix append TxDAG missing issue;

* blockchain: opt mining txdag generation logic;

---------

Co-authored-by: galaio <galaio@users.noreply.github.com>
  • Loading branch information
2 people authored and sunny2022da committed Sep 25, 2024
1 parent 9c9bdca commit c96af03
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 54 deletions.
6 changes: 1 addition & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2702,11 +2702,7 @@ func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) {
}

func (bc *BlockChain) TxDAGEnabledWhenMine() bool {
return bc.enableTxDAG && bc.txDAGWriteCh == nil
}

func (bc *BlockChain) TxDAGFileOpened() bool {
return bc.txDAGWriteCh != nil
return bc.enableTxDAG && bc.txDAGWriteCh == nil && bc.txDAGReader == nil
}

func (bc *BlockChain) SetupTxDAGGeneration(output string, readFile bool) {
Expand Down
2 changes: 1 addition & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
ProcessBeaconBlockRoot(*beaconRoot, vmenv, statedb)
}
statedb.MarkFullProcessed()
if p.bc.enableTxDAG {
if p.bc.enableTxDAG && !p.bc.vmConfig.EnableParallelExec {
statedb.ResetMVStates(len(block.Transactions()))
}
// Iterate over and process the individual transactions
Expand Down
5 changes: 5 additions & 0 deletions core/types/mvstates.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -354,6 +355,7 @@ func (s *MVStates) stopAsyncDepGen() {
}

func (s *MVStates) asyncDepGenLoop() {
timeout := time.After(3 * time.Second)
for {
select {
case tx := <-s.depsGenChan:
Expand All @@ -362,6 +364,9 @@ func (s *MVStates) asyncDepGenLoop() {
s.lock.Unlock()
case <-s.stopChan:
return
case <-timeout:
log.Warn("asyncDepGenLoop exit by timeout")
return
}
}
}
Expand Down
76 changes: 28 additions & 48 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,36 +912,10 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac
}
var coalescedLogs []*types.Log

//append the tx DAG transaction to the block
appendTxDAG := func() {
// whether enable TxDAG
if !w.chain.TxDAGEnabledWhenMine() {
return
}
// whether export to file
if w.chain.TxDAGFileOpened() {
return
}
// TODO this is a placeholder for the tx DAG data that will be generated by the stateDB
txForDAG, err := w.generateDAGTx(env.signer, env.tcount, env.coinbase)
if err != nil {
log.Warn("failed to generate DAG tx", "err", err)
return
}
logs, err := w.commitTransaction(env, txForDAG)
if err != nil {
log.Warn("failed to commit DAG tx", "err", err)
return
}
coalescedLogs = append(coalescedLogs, logs...)
env.tcount++
}

for {
// Check interruption signal and abort building if it's fired.
if interrupt != nil {
if signal := interrupt.Load(); signal != commitInterruptNone {
appendTxDAG()
return signalToErr(signal)
}
}
Expand Down Expand Up @@ -1040,7 +1014,6 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac
txErrUnknownMeter.Mark(1)
}
}
appendTxDAG()
if !w.isRunning() && len(coalescedLogs) > 0 {
// We don't push the pendingLogsEvent while we are sealing. The reason is that
// when we are sealing, the worker will regenerate a sealing block every 3 seconds.
Expand All @@ -1060,56 +1033,57 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac
}

// generateDAGTx generates a DAG transaction for the block
func (w *worker) generateDAGTx(signer types.Signer, txIndex int, coinbase common.Address) (*types.Transaction, error) {
statedb, err := w.chain.State()
if err != nil {
return nil, fmt.Errorf("failed to get state db, err: %v", err)
func (w *worker) generateDAGTx(env *environment) error {
// get txDAG data from the stateDB
txDAG, err := env.state.ResolveTxDAG(env.tcount, []common.Address{env.coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient})
if txDAG == nil || err != nil {
return err
}
// txIndex is the index of this txDAG transaction
txDAG.SetTxDep(env.tcount, types.TxDep{Flags: &types.NonDependentRelFlag})

if signer == nil {
return nil, fmt.Errorf("current signer is nil")
if env.signer == nil {
return fmt.Errorf("current signer is nil")
}

//privateKey, err := crypto.HexToECDSA(privateKeyHex)
sender := w.config.ParallelTxDAGSenderPriv
receiver := DefaultTxDAGAddress
if sender == nil {
return nil, fmt.Errorf("missing sender private key")
}

// get txDAG data from the stateDB
txDAG, err := statedb.ResolveTxDAG(txIndex, []common.Address{coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient})
if txDAG == nil {
return nil, err
return fmt.Errorf("missing sender private key")
}
// txIndex is the index of this txDAG transaction
txDAG.SetTxDep(txIndex, types.TxDep{Flags: &types.NonDependentRelFlag})

publicKey := sender.Public()
publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
if !ok {
return nil, fmt.Errorf("error casting public key to ECDSA")
return fmt.Errorf("error casting public key to ECDSA")
}
fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA)

// get nonce from the
nonce := statedb.GetNonce(fromAddress)
nonce := env.state.GetNonce(fromAddress)

data, err := types.EncodeTxDAGCalldata(txDAG)
if err != nil {
return nil, fmt.Errorf("failed to encode txDAG, err: %v", err)
return fmt.Errorf("failed to encode txDAG, err: %v", err)
}

// Create the transaction
tx := types.NewTransaction(nonce, receiver, big.NewInt(0), 21100, big.NewInt(0), data)

// Sign the transaction with the private key
signedTx, err := types.SignTx(tx, signer, sender)
signedTx, err := types.SignTx(tx, env.signer, sender)
if err != nil {
return nil, fmt.Errorf("failed to sign transaction, err: %v", err)
return fmt.Errorf("failed to sign transaction, err: %v", err)
}

return signedTx, nil
_, err = w.commitTransaction(env, signedTx)
if err != nil {
log.Warn("failed to commit DAG tx", "err", err)
return err
}
env.tcount++
return nil
}

// generateParams wraps various of settings for generating sealing task.
Expand Down Expand Up @@ -1421,6 +1395,12 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult {
if intr := genParams.interrupt; intr != nil && genParams.isUpdate && intr.Load() != commitInterruptNone {
return &newPayloadResult{err: errInterruptedUpdate}
}
//append the tx DAG transaction to the block
if w.chain.TxDAGEnabledWhenMine() {
if err := w.generateDAGTx(work); err != nil {
log.Warn("failed to generate DAG tx", "err", err)
}
}

start = time.Now()
block, err := w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, nil, work.receipts, genParams.withdrawals)
Expand Down

0 comments on commit c96af03

Please sign in to comment.