Skip to content

Commit

Permalink
Mining fixes brought to 2.60 (#1528)
Browse files Browse the repository at this point in the history
* Fix/txpool pending (#1477)

* no overflow when configured and mining logic change

* remove mined transactions from the pool per block

* discard transactions from the pool that error during execution (#1488)

* mined transactions removed from inclusion list (#1503)

* do not build empty blocks on detecting an overflow (#1522)

also stop processing transactions when the block timer has ticked to
give more consistent block times
  • Loading branch information
hexoscott authored Dec 3, 2024
1 parent 64fa041 commit 7f77d5f
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 23 deletions.
4 changes: 4 additions & 0 deletions core/vm/zk_batch_counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func (bcc *BatchCounterCollector) processBatchLevelData() error {

// CheckForOverflow returns true in the case that any counter has less than 0 remaining
func (bcc *BatchCounterCollector) CheckForOverflow(verifyMerkleProof bool) (bool, error) {
// unlimited counters shouldn't overflow
if bcc.unlimitedCounters {
return false, nil
}
combined, err := bcc.CombineCollectors(verifyMerkleProof)
if err != nil {
return false, err
Expand Down
12 changes: 12 additions & 0 deletions zk/debug_tools/test-contracts/contracts/GasBurner.sol
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.15;

contract GasBurner {
constructor() {
//dynamic array
uint[] memory a = new uint[](12000);
for (uint i = 0; i < 2000; i++) {
a[i%10000] = i;
}
}
}
3 changes: 2 additions & 1 deletion zk/debug_tools/test-contracts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
"chainCall:local": "npx hardhat compile && npx hardhat run scripts/chain-call.js --network local",
"chainCall:sepolia": "npx hardhat compile && npx hardhat run scripts/chain-call.js --network sepolia",
"create:local": "npx hardhat compile && npx hardhat run scripts/create.js --network local",
"keccak:local": "npx hardhat compile && npx hardhat run scripts/keccak-loop.js --network local"
"keccak:local": "npx hardhat compile && npx hardhat run scripts/keccak-loop.js --network local",
"gasBurner:local": "npx hardhat compile && npx hardhat run scripts/gas-burner.js --network local"
},
"keywords": [],
"author": "",
Expand Down
26 changes: 26 additions & 0 deletions zk/debug_tools/test-contracts/scripts/gas-burner.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
async function main() {
try {
// Get the ContractFactory of your BigLoopContract
const GasBurnerContract = await hre.ethers.getContractFactory("GasBurner");

// Deploy the contract
const contract = await GasBurnerContract.deploy();
// Wait for the deployment transaction to be mined
await contract.waitForDeployment();

console.log(`GasBurner deployed to: ${await contract.getAddress()}`);

// const result = await contract.bigLoop(10000);
// console.log(result);
} catch (error) {
console.error(error);
process.exit(1);
}
}

main()
.then(() => process.exit(0))
.catch(error => {
console.error(error);
process.exit(1);
});
96 changes: 84 additions & 12 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,16 +307,22 @@ func sequencingBatchStep(
log.Info(fmt.Sprintf("[%s] Waiting for txs from the pool...", logPrefix))
}

LOOP_TRANSACTIONS:
innerBreak := false
emptyBlockOverflow := false

OuterLoopTransactions:
for {
if innerBreak {
break
}
select {
case <-logTicker.C:
if !batchState.isAnyRecovery() {
log.Info(fmt.Sprintf("[%s] Waiting some more for txs from the pool...", logPrefix))
}
case <-blockTicker.C:
if !batchState.isAnyRecovery() {
break LOOP_TRANSACTIONS
break OuterLoopTransactions
}
case <-batchTicker.C:
if !batchState.isAnyRecovery() {
Expand Down Expand Up @@ -346,12 +352,21 @@ func sequencingBatchStep(
return err
}
} else if !batchState.isL1Recovery() {

var allConditionsOK bool
batchState.blockState.transactionsForInclusion, allConditionsOK, err = getNextPoolTransactions(ctx, cfg, executionAt, batchState.forkId, batchState.yieldedTransactions)
var newTransactions []types.Transaction
var newIds []common.Hash

newTransactions, newIds, allConditionsOK, err = getNextPoolTransactions(ctx, cfg, executionAt, batchState.forkId, batchState.yieldedTransactions)
if err != nil {
return err
}

batchState.blockState.transactionsForInclusion = append(batchState.blockState.transactionsForInclusion, newTransactions...)
for idx, tx := range newTransactions {
batchState.blockState.transactionHashesToSlots[tx.Hash()] = newIds[idx]
}

if len(batchState.blockState.transactionsForInclusion) == 0 {
if allConditionsOK {
time.Sleep(batchContext.cfg.zk.SequencerTimeoutOnEmptyTxPool)
Expand All @@ -369,7 +384,21 @@ func sequencingBatchStep(
log.Trace(fmt.Sprintf("[%s] Yielded transactions from the pool", logPrefix), "txCount", len(batchState.blockState.transactionsForInclusion))
}

badTxHashes := make([]common.Hash, 0)
minedTxHashes := make([]common.Hash, 0)

InnerLoopTransactions:
for i, transaction := range batchState.blockState.transactionsForInclusion {
// quick check if we should stop handling transactions
select {
case <-blockTicker.C:
if !batchState.isAnyRecovery() {
innerBreak = true
break InnerLoopTransactions
}
default:
}

txHash := transaction.Hash()
effectiveGas := batchState.blockState.getL1EffectiveGases(cfg, i)

Expand Down Expand Up @@ -403,9 +432,12 @@ func sequencingBatchStep(
continue
}

// if running in normal operation mode and error != nil then just allow the code to continue
// It is safe because this approach ensures that the problematic transaction (the one that caused err != nil to be returned) is kept in yielded
// Each transaction in yielded will be reevaluated at the end of each batch
// if we have an error at this point something has gone wrong, either in the pool or otherwise
// to stop the pool growing and hampering further processing of good transactions here
// we mark it for being discarded
log.Warn(fmt.Sprintf("[%s] error adding transaction to batch, discarding from pool", logPrefix), "hash", txHash, "err", err)
badTxHashes = append(badTxHashes, txHash)
batchState.blockState.transactionsToDiscard = append(batchState.blockState.transactionsToDiscard, batchState.blockState.transactionHashesToSlots[txHash])
}

switch anyOverflow {
Expand Down Expand Up @@ -443,7 +475,10 @@ func sequencingBatchStep(
if batchState.reachedOverflowTransactionLimit() || cfg.zk.SealBatchImmediatelyOnOverflow {
log.Info(fmt.Sprintf("[%s] closing batch due to counters", logPrefix), "counters: ", batchState.overflowTransactions, "immediate", cfg.zk.SealBatchImmediatelyOnOverflow)
runLoopBlocks = false
break LOOP_TRANSACTIONS
if len(batchState.blockState.builtBlockElements.transactions) == 0 {
emptyBlockOverflow = true
}
break OuterLoopTransactions
}
}

Expand All @@ -460,13 +495,14 @@ func sequencingBatchStep(
}
log.Info(fmt.Sprintf("[%s] gas overflowed adding transaction to block", logPrefix), "block", blockNumber, "tx-hash", txHash)
runLoopBlocks = false
break LOOP_TRANSACTIONS
break OuterLoopTransactions
case overflowNone:
}

if err == nil {
blockDataSizeChecker = &backupDataSizeChecker
batchState.onAddedTransaction(transaction, receipt, execResult, effectiveGas)
minedTxHashes = append(minedTxHashes, txHash)
}

// We will only update the processed index in resequence job if there isn't overflow
Expand All @@ -479,40 +515,69 @@ func sequencingBatchStep(
if len(batchState.blockState.transactionsForInclusion) == 0 {
// We need to jump to the next block here if there are no transactions in current block
batchState.resequenceBatchJob.UpdateLastProcessedTx(batchState.resequenceBatchJob.CurrentBlock().L2Blockhash)
break LOOP_TRANSACTIONS
break OuterLoopTransactions
}

if batchState.resequenceBatchJob.AtNewBlockBoundary() {
// We need to jump to the next block here if we are at the end of the current block
break LOOP_TRANSACTIONS
break OuterLoopTransactions
} else {
if cfg.zk.SequencerResequenceStrict {
return fmt.Errorf("strict mode enabled, but resequenced batch %d has transactions that overflowed counters or failed transactions", batchState.batchNumber)
}
}
}

// remove bad and mined transactions from the list for inclusion
for i := len(batchState.blockState.transactionsForInclusion) - 1; i >= 0; i-- {
tx := batchState.blockState.transactionsForInclusion[i]
hash := tx.Hash()
for _, badHash := range badTxHashes {
if badHash == hash {
batchState.blockState.transactionsForInclusion = removeInclusionTransaction(batchState.blockState.transactionsForInclusion, i)
break
}
}

for _, minedHash := range minedTxHashes {
if minedHash == hash {
batchState.blockState.transactionsForInclusion = removeInclusionTransaction(batchState.blockState.transactionsForInclusion, i)
break
}
}
}

if batchState.isL1Recovery() {
// just go into the normal loop waiting for new transactions to signal that the recovery
// has finished as far as it can go
if !batchState.isThereAnyTransactionsToRecover() {
log.Info(fmt.Sprintf("[%s] L1 recovery no more transactions to recover", logPrefix))
}

break LOOP_TRANSACTIONS
break OuterLoopTransactions
}

if batchState.isLimboRecovery() {
runLoopBlocks = false
break LOOP_TRANSACTIONS
break OuterLoopTransactions
}
}
}

// we do not want to commit this block if it has no transactions and we detected an overflow - essentially the batch is too
// full to get any more transactions in it and we don't want to commit an empty block
if emptyBlockOverflow {
log.Info(fmt.Sprintf("[%s] Block %d overflow detected with no transactions added, skipping block for next batch", logPrefix, blockNumber))
break
}

if block, err = doFinishBlockAndUpdateState(batchContext, ibs, header, parentBlock, batchState, ger, l1BlockHash, l1TreeUpdateIndex, infoTreeIndexProgress, batchCounters); err != nil {
return err
}

cfg.txPool.RemoveMinedTransactions(batchState.blockState.builtBlockElements.txSlots)
cfg.txPool.RemoveMinedTransactions(batchState.blockState.transactionsToDiscard)

if batchState.isLimboRecovery() {
stateRoot := block.Root()
cfg.txPool.UpdateLimboRootByTxHash(batchState.limboRecoveryData.limboTxHash, &stateRoot)
Expand Down Expand Up @@ -588,3 +653,10 @@ func sequencingBatchStep(

return sdb.tx.Commit()
}

func removeInclusionTransaction(orig []types.Transaction, index int) []types.Transaction {
if index < 0 || index >= len(orig) {
return orig
}
return append(orig[:index], orig[index+1:]...)
}
16 changes: 13 additions & 3 deletions zk/stages/stage_sequence_execute_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ func (bs *BatchState) getCoinbase(cfg *SequenceBlockCfg) common.Address {
}

func (bs *BatchState) onAddedTransaction(transaction types.Transaction, receipt *types.Receipt, execResult *core.ExecutionResult, effectiveGas uint8) {
bs.blockState.builtBlockElements.onFinishAddingTransaction(transaction, receipt, execResult, effectiveGas)
slotId, ok := bs.blockState.transactionHashesToSlots[transaction.Hash()]
if !ok {
log.Warn("[batchState] transaction hash not found in transaction hashes to slots map", "hash", transaction.Hash())
}
bs.blockState.builtBlockElements.onFinishAddingTransaction(transaction, receipt, execResult, effectiveGas, slotId)
bs.hasAnyTransactionsInThisBatch = true
}

Expand Down Expand Up @@ -250,12 +254,16 @@ func newLimboRecoveryData(limboHeaderTimestamp uint64, limboTxHash *common.Hash)
// TYPE BLOCK STATE
type BlockState struct {
transactionsForInclusion []types.Transaction
transactionHashesToSlots map[common.Hash]common.Hash
builtBlockElements BuiltBlockElements
blockL1RecoveryData *zktx.DecodedBatchL2Data
transactionsToDiscard []common.Hash
}

func newBlockState() *BlockState {
return &BlockState{}
return &BlockState{
transactionHashesToSlots: make(map[common.Hash]common.Hash),
}
}

func (bs *BlockState) hasAnyTransactionForInclusion() bool {
Expand Down Expand Up @@ -294,6 +302,7 @@ type BuiltBlockElements struct {
receipts types.Receipts
effectiveGases []uint8
executionResults []*core.ExecutionResult
txSlots []common.Hash
}

func (bbe *BuiltBlockElements) resetBlockBuildingArrays() {
Expand All @@ -303,11 +312,12 @@ func (bbe *BuiltBlockElements) resetBlockBuildingArrays() {
bbe.executionResults = []*core.ExecutionResult{}
}

func (bbe *BuiltBlockElements) onFinishAddingTransaction(transaction types.Transaction, receipt *types.Receipt, execResult *core.ExecutionResult, effectiveGas uint8) {
func (bbe *BuiltBlockElements) onFinishAddingTransaction(transaction types.Transaction, receipt *types.Receipt, execResult *core.ExecutionResult, effectiveGas uint8, slotId common.Hash) {
bbe.transactions = append(bbe.transactions, transaction)
bbe.receipts = append(bbe.receipts, receipt)
bbe.executionResults = append(bbe.executionResults, execResult)
bbe.effectiveGases = append(bbe.effectiveGases, effectiveGas)
bbe.txSlots = append(bbe.txSlots, slotId)
}

type resequenceTxMetadata struct {
Expand Down
18 changes: 11 additions & 7 deletions zk/stages/stage_sequence_execute_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
"github.com/ledgerwatch/log/v3"
)

func getNextPoolTransactions(ctx context.Context, cfg SequenceBlockCfg, executionAt, forkId uint64, alreadyYielded mapset.Set[[32]byte]) ([]types.Transaction, bool, error) {
func getNextPoolTransactions(ctx context.Context, cfg SequenceBlockCfg, executionAt, forkId uint64, alreadyYielded mapset.Set[[32]byte]) ([]types.Transaction, []common.Hash, bool, error) {
cfg.txPool.LockFlusher()
defer cfg.txPool.UnlockFlusher()

var ids []common.Hash
var transactions []types.Transaction
var allConditionsOk bool
var err error
Expand All @@ -37,20 +38,21 @@ func getNextPoolTransactions(ctx context.Context, cfg SequenceBlockCfg, executio
if allConditionsOk, _, err = cfg.txPool.YieldBest(cfg.yieldSize, &slots, poolTx, executionAt, gasLimit, 0, alreadyYielded); err != nil {
return err
}
yieldedTxs, toRemove, err := extractTransactionsFromSlot(&slots)
yieldedTxs, yieldedIds, toRemove, err := extractTransactionsFromSlot(&slots)
if err != nil {
return err
}
for _, txId := range toRemove {
cfg.txPool.MarkForDiscardFromPendingBest(txId)
}
transactions = append(transactions, yieldedTxs...)
ids = append(ids, yieldedIds...)
return nil
}); err != nil {
return nil, allConditionsOk, err
return nil, nil, allConditionsOk, err
}

return transactions, allConditionsOk, err
return transactions, ids, allConditionsOk, err
}

func getLimboTransaction(ctx context.Context, cfg SequenceBlockCfg, txHash *common.Hash) ([]types.Transaction, error) {
Expand All @@ -68,7 +70,7 @@ func getLimboTransaction(ctx context.Context, cfg SequenceBlockCfg, txHash *comm
if slots != nil {
// ignore the toRemove value here, we know the RLP will be sound as we had to read it from the pool
// in the first place to get it into limbo
transactions, _, err = extractTransactionsFromSlot(slots)
transactions, _, _, err = extractTransactionsFromSlot(slots)
if err != nil {
return err
}
Expand All @@ -82,7 +84,8 @@ func getLimboTransaction(ctx context.Context, cfg SequenceBlockCfg, txHash *comm
return transactions, nil
}

func extractTransactionsFromSlot(slot *types2.TxsRlp) ([]types.Transaction, []common.Hash, error) {
func extractTransactionsFromSlot(slot *types2.TxsRlp) ([]types.Transaction, []common.Hash, []common.Hash, error) {
ids := make([]common.Hash, 0, len(slot.TxIds))
transactions := make([]types.Transaction, 0, len(slot.Txs))
toRemove := make([]common.Hash, 0)
for idx, txBytes := range slot.Txs {
Expand All @@ -101,8 +104,9 @@ func extractTransactionsFromSlot(slot *types2.TxsRlp) ([]types.Transaction, []co
copy(sender[:], slot.Senders.At(idx))
transaction.SetSender(sender)
transactions = append(transactions, transaction)
ids = append(ids, slot.TxIds[idx])
}
return transactions, toRemove, nil
return transactions, ids, toRemove, nil
}

type overflowType uint8
Expand Down
Loading

0 comments on commit 7f77d5f

Please sign in to comment.