Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: custom block builder (#21) #23

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
336 changes: 330 additions & 6 deletions src/custom_reth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ use reth_node_builder::{
BuilderContext, FullNodeTypes, Node, NodeBuilder, PayloadBuilderConfig,
};
use reth_primitives::revm_primitives::{BlockEnv, CfgEnvWithHandlerCfg};
use reth_primitives::{Address, ChainSpec, Header, Withdrawals, B256};
use reth_primitives::{
proofs, Address, Block, ChainSpec, Header, IntoRecoveredTransaction, Receipt, Receipts,
Withdrawals, B256, EMPTY_OMMER_ROOT_HASH,
};
use std::sync::Arc;

use reth_basic_payload_builder::{
commit_withdrawals, is_better_payload, pre_block_beacon_root_contract_call,
BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig, BuildArguments, BuildOutcome,
PayloadBuilder, PayloadConfig,
PayloadBuilder, PayloadConfig, WithdrawalsOutcome,
};
use reth_db::init_db;
use reth_node_api::{
Expand All @@ -19,10 +23,10 @@ use reth_node_api::{
};
use reth_provider::{
providers::{BlockchainProvider, ProviderFactory},
CanonStateSubscriptions, StateProviderFactory,
BundleStateWithReceipts, CanonStateSubscriptions, StateProviderFactory,
};
use reth_tasks::TaskManager;
use reth_transaction_pool::TransactionPool;
use reth_transaction_pool::{BestTransactionsAttributes, TransactionPool};

use reth_node_ethereum::{
node::{EthereumNetworkBuilder, EthereumPoolBuilder},
Expand Down Expand Up @@ -50,6 +54,7 @@ use crate::custom_reth::eigen::EigenRpcExtApiServer;
use crate::db::Database as RollupDatabase;
use anyhow::{anyhow, Result};
use jsonrpsee::tracing;
use jsonrpsee::tracing::{debug, trace};
use reth_blockchain_tree::{
BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals,
};
Expand All @@ -58,7 +63,16 @@ use reth_interfaces::consensus::Consensus;
use reth_node_core::args::{DevArgs, RpcServerArgs};
use reth_node_core::dirs::{DataDirPath, MaybePlatformPath};
use reth_node_core::node_config::NodeConfig;
use reth_revm::EvmProcessorFactory;
use reth_node_core::primitives::U256;
use reth_primitives::constants::eip4844::MAX_DATA_GAS_PER_BLOCK;
use reth_primitives::constants::BEACON_NONCE;
use reth_primitives::eip4844::calculate_excess_blob_gas;
use reth_primitives::revm::env::tx_env_with_recovered;
use reth_revm::database::StateProviderDatabase;
use reth_revm::db::states::bundle_state::BundleRetention;
use reth_revm::{revm, EvmProcessorFactory, State};
use revm_primitives::db::DatabaseCommit;
use revm_primitives::{EVMError, EnvWithHandlerCfg, InvalidTransaction, ResultAndState};

pub(crate) mod eigen;

Expand Down Expand Up @@ -280,6 +294,8 @@ where
type Attributes = CustomPayloadBuilderAttributes;
type BuiltPayload = EthBuiltPayload;

// When the CL (Consensus Client) creates a new proposal, it accesses the EL (Execution Client) by calling the get_payload_v4 API to get the ExecutionPayload.
// The ExecutionPayload is built here by selecting high gas fee transactions from the transaction pool to construct a new block.
fn try_build(
&self,
args: BuildArguments<Pool, Client, Self::Attributes, Self::BuiltPayload>,
Expand All @@ -303,7 +319,24 @@ where

// This reuses the default EthereumPayloadBuilder to build the payload
// but any custom logic can be implemented here
reth_ethereum_payload_builder::EthereumPayloadBuilder::default().try_build(BuildArguments {
// reth_ethereum_payload_builder::EthereumPayloadBuilder::default().try_build(BuildArguments {
// client,
// pool,
// cached_reads,
// config: PayloadConfig {
// initialized_block_env,
// initialized_cfg,
// parent_block,
// extra_data,
// attributes: attributes.0,
// chain_spec,
// },
// cancel,
// best_payload,
// })

// we can customize the payload builder here, to control the block building process
custom_payload_builder(BuildArguments {
client,
pool,
cached_reads,
Expand Down Expand Up @@ -339,6 +372,297 @@ where
}
}

pub fn custom_payload_builder<Pool, Client>(
args: BuildArguments<Pool, Client, EthPayloadBuilderAttributes, EthBuiltPayload>,
) -> std::result::Result<BuildOutcome<EthBuiltPayload>, PayloadBuilderError>
where
Client: StateProviderFactory,
Pool: TransactionPool,
{
let BuildArguments {
client,
pool,
mut cached_reads,
config,
cancel,
best_payload,
} = args;

let state_provider = client.state_by_block_hash(config.parent_block.hash())?;
let state = StateProviderDatabase::new(&state_provider);
let mut db = State::builder()
.with_database_ref(cached_reads.as_db(&state))
.with_bundle_update()
.build();
let extra_data = config.extra_data();
let PayloadConfig {
initialized_block_env,
initialized_cfg,
parent_block,
attributes,
chain_spec,
..
} = config;

debug!(target: "payload_builder", id=%attributes.id, parent_hash = ?parent_block.hash(), parent_number = parent_block.number, "building new payload");
let mut cumulative_gas_used = 0;
let mut sum_blob_gas_used = 0;
let block_gas_limit: u64 = initialized_block_env
.gas_limit
.try_into()
.unwrap_or(u64::MAX);
let base_fee = initialized_block_env.basefee.to::<u64>();

let mut executed_txs = Vec::new();

let mut best_txs = pool.best_transactions_with_attributes(BestTransactionsAttributes::new(
base_fee,
initialized_block_env
.get_blob_gasprice()
.map(|gasprice| gasprice as u64),
));

let mut total_fees = U256::ZERO;

let block_number = initialized_block_env.number.to::<u64>();

// apply eip-4788 pre block contract call
pre_block_beacon_root_contract_call(
&mut db,
&chain_spec,
block_number,
&initialized_cfg,
&initialized_block_env,
&attributes,
)?;

let mut receipts = Vec::new();
while let Some(pool_tx) = best_txs.next() {
// ensure we still have capacity for this transaction
if cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit {
// we can't fit this transaction into the block, so we need to mark it as invalid
// which also removes all dependent transaction from the iterator before we can
// continue
best_txs.mark_invalid(&pool_tx);
continue;
}

// check if the job was cancelled, if so we can exit early
if cancel.is_cancelled() {
return Ok(BuildOutcome::Cancelled);
}

// convert tx to a signed transaction
let tx = pool_tx.to_recovered_transaction();

// There's only limited amount of blob space available per block, so we need to check if
// the EIP-4844 can still fit in the block
if let Some(blob_tx) = tx.transaction.as_eip4844() {
let tx_blob_gas = blob_tx.blob_gas();
if sum_blob_gas_used + tx_blob_gas > MAX_DATA_GAS_PER_BLOCK {
// we can't fit this _blob_ transaction into the block, so we mark it as
// invalid, which removes its dependent transactions from
// the iterator. This is similar to the gas limit condition
// for regular transactions above.
trace!(target: "payload_builder", tx=?tx.hash, ?sum_blob_gas_used, ?tx_blob_gas, "skipping blob transaction because it would exceed the max data gas per block");
best_txs.mark_invalid(&pool_tx);
continue;
}
}

// Configure the environment for the block.
let mut evm = revm::Evm::builder()
.with_db(&mut db)
.with_env_with_handler_cfg(EnvWithHandlerCfg::new_with_cfg_env(
initialized_cfg.clone(),
initialized_block_env.clone(),
tx_env_with_recovered(&tx),
))
.build();

let ResultAndState { result, state } = match evm.transact() {
Ok(res) => res,
Err(err) => {
match err {
EVMError::Transaction(err) => {
if matches!(err, InvalidTransaction::NonceTooLow { .. }) {
// if the nonce is too low, we can skip this transaction
trace!(target: "payload_builder", %err, ?tx, "skipping nonce too low transaction");
} else {
// if the transaction is invalid, we can skip it and all of its
// descendants
trace!(target: "payload_builder", %err, ?tx, "skipping invalid transaction and its descendants");
best_txs.mark_invalid(&pool_tx);
}

continue;
}
err => {
// this is an error that we should treat as fatal for this attempt
return Err(PayloadBuilderError::EvmExecutionError(err));
}
}
}
};
// drop evm so db is released.
drop(evm);
// commit changes
db.commit(state);

// add to the total blob gas used if the transaction successfully executed
if let Some(blob_tx) = tx.transaction.as_eip4844() {
let tx_blob_gas = blob_tx.blob_gas();
sum_blob_gas_used += tx_blob_gas;

// if we've reached the max data gas per block, we can skip blob txs entirely
if sum_blob_gas_used == MAX_DATA_GAS_PER_BLOCK {
best_txs.skip_blobs();
}
}

let gas_used = result.gas_used();

// add gas used by the transaction to cumulative gas used, before creating the receipt
cumulative_gas_used += gas_used;

// Push transaction changeset and calculate header bloom filter for receipt.
#[allow(clippy::needless_update)] // side-effect of optimism fields
receipts.push(Some(Receipt {
tx_type: tx.tx_type(),
success: result.is_success(),
cumulative_gas_used,
logs: result.into_logs().into_iter().map(Into::into).collect(),
..Default::default()
}));

// update add to total fees
let miner_fee = tx
.effective_tip_per_gas(Some(base_fee))
.expect("fee is always valid; execution succeeded");
total_fees += U256::from(miner_fee) * U256::from(gas_used);

// append transaction to the list of executed transactions
executed_txs.push(tx.into_signed());
}

// check if we have a better block
if !is_better_payload(best_payload.as_ref(), total_fees) {
// can skip building the block
return Ok(BuildOutcome::Aborted {
fees: total_fees,
cached_reads,
});
}

let WithdrawalsOutcome {
withdrawals_root,
withdrawals,
} = commit_withdrawals(
&mut db,
&chain_spec,
attributes.timestamp,
attributes.withdrawals,
)?;

// merge all transitions into bundle state, this would apply the withdrawal balance changes
// and 4788 contract call
db.merge_transitions(BundleRetention::PlainState);

let bundle = BundleStateWithReceipts::new(
db.take_bundle(),
Receipts::from_vec(vec![receipts]),
block_number,
);
let receipts_root = bundle
.receipts_root_slow(block_number)
.expect("Number is in range");
let logs_bloom = bundle
.block_logs_bloom(block_number)
.expect("Number is in range");

// calculate the state root
let state_root = state_provider.state_root(&bundle)?;

// create the block header
let transactions_root = proofs::calculate_transaction_root(&executed_txs);

// initialize empty blob sidecars at first. If cancun is active then this will
let mut blob_sidecars = Vec::new();
let mut excess_blob_gas = None;
let mut blob_gas_used = None;

// only determine cancun fields when active
if chain_spec.is_cancun_active_at_timestamp(attributes.timestamp) {
// grab the blob sidecars from the executed txs
blob_sidecars = pool.get_all_blobs_exact(
executed_txs
.iter()
.filter(|tx| tx.is_eip4844())
.map(|tx| tx.hash)
.collect(),
)?;

excess_blob_gas = if chain_spec.is_cancun_active_at_timestamp(parent_block.timestamp) {
let parent_excess_blob_gas = parent_block.excess_blob_gas.unwrap_or_default();
let parent_blob_gas_used = parent_block.blob_gas_used.unwrap_or_default();
Some(calculate_excess_blob_gas(
parent_excess_blob_gas,
parent_blob_gas_used,
))
} else {
// for the first post-fork block, both parent.blob_gas_used and
// parent.excess_blob_gas are evaluated as 0
Some(calculate_excess_blob_gas(0, 0))
};

blob_gas_used = Some(sum_blob_gas_used);
}

let header = Header {
parent_hash: parent_block.hash(),
ommers_hash: EMPTY_OMMER_ROOT_HASH,
beneficiary: initialized_block_env.coinbase,
state_root,
transactions_root,
receipts_root,
withdrawals_root,
logs_bloom,
timestamp: attributes.timestamp,
mix_hash: attributes.prev_randao,
nonce: BEACON_NONCE,
base_fee_per_gas: Some(base_fee),
number: parent_block.number + 1,
gas_limit: block_gas_limit,
difficulty: U256::ZERO,
gas_used: cumulative_gas_used,
extra_data,
parent_beacon_block_root: attributes.parent_beacon_block_root,
blob_gas_used,
excess_blob_gas,
};

// seal the block
let block = Block {
header,
body: executed_txs,
ommers: vec![],
withdrawals,
};

let sealed_block = block.seal_slow();
debug!(target: "payload_builder", ?sealed_block, "sealed built block");

let mut payload = EthBuiltPayload::new(attributes.id, sealed_block, total_fees);

// extend the payload with the blob sidecars from the executed txs
payload.extend_sidecars(blob_sidecars);

Ok(BuildOutcome::Better {
payload,
cached_reads,
})
}

pub async fn launch_custom_node(
mut stop_rx: tokio::sync::mpsc::Receiver<()>,
reth_started_signal_channel: tokio::sync::mpsc::Sender<()>,
Expand Down
Loading