From ea31b60827027009fd5cd01006cad4d26b6b347a Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Tue, 29 Oct 2024 12:38:28 +0100 Subject: [PATCH] nonce management: query nonce on-chain before relaying tx (#1508) * nonce management: query nonce on-chain before relaying tx * fix comments * fmt * fix comment --- src/bin/hive_chain.rs | 14 +- src/main.rs | 2 +- src/pool/mempool.rs | 129 ++++-------------- .../eth_provider/starknet/relayer.rs | 50 ++++--- src/test_utils/eoa.rs | 49 ++----- tests/tests/eth_provider.rs | 63 ++------- 6 files changed, 85 insertions(+), 222 deletions(-) diff --git a/src/bin/hive_chain.rs b/src/bin/hive_chain.rs index 1c8ce0798..859ae257c 100644 --- a/src/bin/hive_chain.rs +++ b/src/bin/hive_chain.rs @@ -4,9 +4,8 @@ use alloy_primitives::bytes::{Buf, BytesMut}; use alloy_rlp::Decodable; use clap::Parser; use kakarot_rpc::{ - constants::STARKNET_CHAIN_ID, into_via_try_wrapper, - providers::{eth_provider::starknet::relayer::LockedRelayer, sn_provider::StarknetProvider}, + providers::{eth_provider::starknet::relayer::Relayer, sn_provider::StarknetProvider}, }; use reth_primitives::{Block, BlockBody}; use starknet::{ @@ -14,7 +13,7 @@ use starknet::{ providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider}, }; use std::{path::PathBuf, str::FromStr}; -use tokio::{fs::File, io::AsyncReadExt, sync::Mutex}; +use tokio::{fs::File, io::AsyncReadExt}; use tokio_stream::StreamExt; use tokio_util::codec::{Decoder, FramedRead}; use url::Url; @@ -72,13 +71,10 @@ async fn main() -> eyre::Result<()> { let relayer_balance = starknet_provider.balance_at(args.relayer_address, BlockId::Tag(BlockTag::Latest)).await?; let relayer_balance = into_via_try_wrapper!(relayer_balance)?; - let current_nonce = Mutex::new(Felt::ZERO); - let mut relayer = LockedRelayer::new( - current_nonce.lock().await, + let relayer = Relayer::new( args.relayer_address, relayer_balance, JsonRpcClient::new(HttpTransport::new(Url::from_str(STARKNET_RPC_URL)?)), - *STARKNET_CHAIN_ID, ); // Read the rlp file @@ -107,10 +103,6 @@ async fn main() -> eyre::Result<()> { for transaction in &body.transactions { relayer.relay_transaction(transaction).await?; tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - // Increase the relayer's nonce - let nonce = relayer.nonce_mut(); - *nonce += Felt::ONE; } } diff --git a/src/main.rs b/src/main.rs index f5342c21a..6ccf61e5f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,7 +63,7 @@ async fn main() -> Result<()> { // Start the relayer manager let addresses = var("RELAYERS_ADDRESSES")?.split(',').filter_map(|addr| Felt::from_str(addr).ok()).collect::>(); - AccountManager::from_addresses(addresses, Arc::clone(ð_client)).await?.start(); + AccountManager::new(addresses, Arc::clone(ð_client)).start(); // Start the maintenance of the mempool maintain_transaction_pool(Arc::clone(ð_client), PRUNE_DURATION); diff --git a/src/pool/mempool.rs b/src/pool/mempool.rs index 468fe7d0a..4e42bfff2 100644 --- a/src/pool/mempool.rs +++ b/src/pool/mempool.rs @@ -6,10 +6,9 @@ use crate::{ constants::KAKAROT_RPC_CONFIG, into_via_try_wrapper, pool::constants::ONE_TENTH_ETH, - providers::eth_provider::{database::state::EthDatabase, starknet::relayer::LockedRelayer, BlockProvider}, + providers::eth_provider::{database::state::EthDatabase, starknet::relayer::Relayer, BlockProvider}, }; use alloy_primitives::{Address, U256}; -use futures::future::select_all; use rand::{seq::SliceRandom, SeedableRng}; use reth_chainspec::ChainSpec; use reth_execution_types::ChangedAccount; @@ -20,11 +19,11 @@ use reth_transaction_pool::{ TransactionOrigin, TransactionPool, TransactionPoolExt, }; use starknet::{ - core::types::{requests::GetNonceRequest, BlockId, BlockTag, Felt}, - providers::{jsonrpc::HttpTransport, JsonRpcClient, ProviderRequestData, ProviderResponseData}, + core::types::{BlockTag, Felt}, + providers::{jsonrpc::HttpTransport, JsonRpcClient}, }; use std::{collections::HashMap, sync::Arc, time::Duration}; -use tokio::{sync::Mutex, time::Instant}; +use tokio::time::Instant; use tracing::instrument; /// A type alias for the Kakarot Transaction Validator. @@ -38,34 +37,22 @@ pub type TransactionOrdering = CoinbaseTipOrdering; /// A type alias for the Kakarot Sequencer Mempool. pub type KakarotPool = Pool, TransactionOrdering, NoopBlobStore>; -/// Manages a collection of accounts and their associated nonces, interfacing with an Ethereum client. +/// Manages a collection of accounts addresses, interfacing with an Ethereum client. /// /// This struct provides functionality to initialize account data from a file, monitor account balances, /// and process transactions for accounts with sufficient balance. #[derive(Debug)] pub struct AccountManager { - /// A shared, mutable collection of accounts and their nonces. - accounts: HashMap>>, + /// A collection of account addresses. + accounts: Vec, /// The Ethereum client used to interact with the blockchain. eth_client: Arc>, } impl AccountManager { /// Initialize the account manager with a set of passed accounts. - pub async fn from_addresses(addresses: Vec, eth_client: Arc>) -> eyre::Result { - let mut accounts = HashMap::new(); - - for add in addresses { - // Query the initial account_nonce for the account from the provider - let nonce = eth_client - .starknet_provider() - .get_nonce(starknet::core::types::BlockId::Tag(BlockTag::Pending), add) - .await - .unwrap_or_default(); - accounts.insert(add, Arc::new(Mutex::new(nonce))); - } - - Ok(Self { accounts, eth_client }) + pub const fn new(accounts: Vec, eth_client: Arc>) -> Self { + Self { accounts, eth_client } } /// Starts the account manager task that periodically checks account balances and processes transactions. @@ -73,9 +60,6 @@ impl AccountM pub fn start(self) { let this = Arc::new(self); - // Start the nonce updater in a separate task - this.clone().start_nonce_updater(); - tokio::spawn(async move { loop { // TODO: add a listener on the pool and only try to call [`best_transaction`] @@ -99,7 +83,7 @@ impl AccountM let manager = this.clone(); tokio::spawn(async move { // Lock the relayer account - let maybe_relayer = manager.lock_account().await; + let maybe_relayer = manager.get_relayer().await; if maybe_relayer.is_err() { // If we fail to fetch a relayer, we need to re-insert the transaction in the pool tracing::error!(target: "account_manager", err = ?maybe_relayer.unwrap_err(), "failed to fetch relayer"); @@ -110,10 +94,11 @@ impl AccountM .await; return; } - let mut relayer = maybe_relayer.expect("maybe_lock is not error"); + let relayer = maybe_relayer.expect("not error"); // Send the Ethereum transaction using the relayer let transaction_signed = transaction.to_recovered_transaction().into_signed(); + let res = relayer.relay_transaction(&transaction_signed).await; if res.is_err() { // If the relayer failed to relay the transaction, we need to reposition it in the mempool @@ -127,10 +112,6 @@ impl AccountM } tracing::info!(target: "account_manager", starknet_hash = ?res.expect("not error"), ethereum_hash = ?transaction_signed.hash()); - - // Increment account_nonce after sending a transaction - let nonce = relayer.nonce_mut(); - *nonce = *nonce + 1; }); } @@ -140,58 +121,43 @@ impl AccountM } /// Returns the next available account from the manager. - pub async fn lock_account(&self) -> eyre::Result>> + pub async fn get_relayer(&self) -> eyre::Result>> where SP: starknet::providers::Provider + Send + Sync + Clone + 'static, { // Use `StdRng` instead of `ThreadRng` as it is `Send` let mut rng = rand::rngs::StdRng::from_entropy(); - // Collect the accounts into a vector for shuffling - let mut accounts: Vec<_> = self.accounts.iter().collect(); + // Shuffle indices of accounts randomly + let mut account_indices: Vec<_> = (0..self.accounts.len()).collect(); + account_indices.shuffle(&mut rng); - // Shuffle the accounts randomly before iterating - accounts.shuffle(&mut rng); + for index in account_indices { + let account_address = self.accounts[index]; - loop { - if accounts.is_empty() { - return Err(eyre::eyre!("failed to fetch funded account")); - } - - // Create the future locks with indices for more efficient removal - // use [`select_all`] to poll an iterator over impl Future)> - // We use Box::pin because this Future doesn't implement `Unpin`. - let fut_locks = accounts - .iter() - .enumerate() - .map(|(index, (address, nonce))| Box::pin(async move { (index, *address, nonce.lock().await) })); - - // Select the first account that gets unlocked - let ((index, account_address, guard), _, _) = select_all(fut_locks).await; + // Retrieve the balance of the selected account + let balance = self.get_balance(account_address).await?; - // Fetch the balance of the selected account - let balance = self.get_balance(*account_address).await?; - - // If the balance is lower than the threshold, remove the account using swap_remove + // Skip accounts with insufficient balance if balance < U256::from(ONE_TENTH_ETH) { - accounts.swap_remove(index); continue; } + // Convert the balance to `Felt` let balance = into_via_try_wrapper!(balance)?; - let chain_id = self.eth_client.starknet_provider().chain_id().await?; - let account = LockedRelayer::new( - guard, - *account_address, + // Construct the `Relayer` with the account address and other relevant data + let account = Relayer::new( + account_address, balance, JsonRpcClient::new(HttpTransport::new(KAKAROT_RPC_CONFIG.network_url.clone())), - chain_id, ); - // Return the account address and the guard on the nonce + // Return the locked relayer instance return Ok(account); } + + Err(eyre::eyre!("failed to fetch funded account")) } /// Retrieves the balance of the specified account address for the [`BlockTag::Pending`] @@ -203,45 +169,6 @@ impl AccountM .await .map_err(Into::into) } - - /// Update the nonces for all accounts every minute. - pub fn start_nonce_updater(self: Arc) { - tokio::spawn(async move { - loop { - // Convert the account addresses into batch requests - let requests = self - .accounts - .keys() - .map(|add| { - ProviderRequestData::GetNonce(GetNonceRequest { - contract_address: *add, - block_id: BlockId::Tag(BlockTag::Pending), - }) - }) - .collect::>(); - - // Try to make the request to the provider. If it fails, display error and retry 1 minute later. - let maybe_new_nonces = self.eth_client.starknet_provider().batch_requests(requests).await; - if maybe_new_nonces.is_err() { - tracing::error!(target: "account_manager", err = ?maybe_new_nonces.unwrap_err(), "failed to get nonces"); - // Sleep for 1 minute before the next update - tokio::time::sleep(Duration::from_secs(60)).await; - continue; - } - let new_nonces = maybe_new_nonces.expect("not error"); - - for ((address, old_nonce), new_nonce) in self.accounts.iter().zip(new_nonces) { - if let ProviderResponseData::GetNonce(new_nonce) = new_nonce { - *old_nonce.lock().await = new_nonce; - tracing::info!(target: "account_manager", ?address, ?new_nonce); - }; - } - - // Sleep for 1 minute before the next update - tokio::time::sleep(Duration::from_secs(60)).await; - } - }); - } } #[derive(Default)] diff --git a/src/providers/eth_provider/starknet/relayer.rs b/src/providers/eth_provider/starknet/relayer.rs index 6b797607b..b090bc237 100644 --- a/src/providers/eth_provider/starknet/relayer.rs +++ b/src/providers/eth_provider/starknet/relayer.rs @@ -1,4 +1,5 @@ use crate::{ + constants::STARKNET_CHAIN_ID, models::transaction::transaction_data_to_starknet_calldata, providers::eth_provider::{ error::{SignatureError, TransactionError}, @@ -8,13 +9,12 @@ use crate::{ }; use reth_primitives::TransactionSigned; use starknet::{ - accounts::{Account, ExecutionEncoding, ExecutionV1, SingleOwnerAccount}, - core::types::{Felt, NonZeroFelt}, + accounts::{Account, ConnectedAccount, ExecutionEncoding, ExecutionV1, SingleOwnerAccount}, + core::types::{BlockTag, Felt, NonZeroFelt}, providers::Provider, signers::{LocalWallet, SigningKey}, }; use std::{env::var, ops::Deref, str::FromStr, sync::LazyLock}; -use tokio::sync::MutexGuard; /// Signer for all relayers static RELAYER_SIGNER: LazyLock = LazyLock::new(|| { @@ -24,31 +24,38 @@ static RELAYER_SIGNER: LazyLock = LazyLock::new(|| { )) }); -/// A relayer holding a lock on a mutex on an account and connected to the Starknet network. +/// A relayer holding an account and a balance. +/// /// The relayer is used to sign transactions and broadcast them on the network. #[derive(Debug)] -pub struct LockedRelayer<'a, SP: Provider + Send + Sync> { +pub struct Relayer { /// The account used to sign and broadcast the transaction account: SingleOwnerAccount, /// The balance of the relayer balance: Felt, - /// The locked nonce held by the relayer - nonce: MutexGuard<'a, Felt>, } -impl<'a, SP> LockedRelayer<'a, SP> +impl Relayer where SP: Provider + Send + Sync, { - /// Create a new relayer with the provided Starknet provider, address, balance and nonce. - pub fn new(lock: MutexGuard<'a, Felt>, address: Felt, balance: Felt, provider: SP, chain_id: Felt) -> Self { - let relayer = - SingleOwnerAccount::new(provider, RELAYER_SIGNER.clone(), address, chain_id, ExecutionEncoding::New); + /// Create a new relayer with the provided Starknet provider, address, balance. + pub fn new(address: Felt, balance: Felt, provider: SP) -> Self { + let relayer = SingleOwnerAccount::new( + provider, + RELAYER_SIGNER.clone(), + address, + *STARKNET_CHAIN_ID, + ExecutionEncoding::New, + ); - Self { account: relayer, balance, nonce: lock } + Self { account: relayer, balance } } /// Relay the provided Ethereum transaction on the Starknet network. + /// The relayer nonce is directly fetched from the chain to have the most up-to-date value. + /// This is a way to avoid nonce issues. + /// /// Returns the corresponding Starknet transaction hash. pub async fn relay_transaction(&self, transaction: &TransactionSigned) -> EthApiResult { // Transform the transaction's data to Starknet calldata @@ -62,7 +69,16 @@ where // Construct the call let call = starknet::core::types::Call { to: eoa_address, selector: *EXECUTE_FROM_OUTSIDE, calldata }; let mut execution = ExecutionV1::new(vec![call], &self.account); - execution = execution.nonce(*self.nonce); + + // Fetch the relayer nonce from the Starknet provider + let relayer_nonce = self + .account + .provider() + .get_nonce(starknet::core::types::BlockId::Tag(BlockTag::Pending), relayer_address) + .await + .unwrap_or_default(); + + execution = execution.nonce(relayer_nonce); // We set the max fee to the balance of the account / 5. This means that the account could // send up to 5 transactions before hitting a feeder gateway error. @@ -74,12 +90,12 @@ where Ok(res.transaction_hash) } - pub fn nonce_mut(&mut self) -> &mut Felt { - &mut self.nonce + pub fn address(&self) -> Felt { + self.account.address() } } -impl<'a, SP> Deref for LockedRelayer<'a, SP> +impl Deref for Relayer where SP: Provider + Send + Sync, { diff --git a/src/test_utils/eoa.rs b/src/test_utils/eoa.rs index f1592c0ea..6ff87a814 100644 --- a/src/test_utils/eoa.rs +++ b/src/test_utils/eoa.rs @@ -2,7 +2,7 @@ use crate::{ client::{EthClient, KakarotTransactions}, into_via_try_wrapper, providers::eth_provider::{ - starknet::{kakarot_core::starknet_address, relayer::LockedRelayer}, + starknet::{kakarot_core::starknet_address, relayer::Relayer}, ChainProvider, TransactionProvider, }, test_utils::{ @@ -28,7 +28,6 @@ use starknet::{ signers::LocalWallet, }; use std::sync::Arc; -use tokio::sync::Mutex; pub const TX_GAS_LIMIT: u64 = 5_000_000; pub const TX_GAS_PRICE: u64 = 10; @@ -157,26 +156,11 @@ impl KakarotEOA

{ .await?; let relayer_balance = into_via_try_wrapper!(relayer_balance)?; - let nonce = self - .eth_client - .starknet_provider() - .get_nonce(BlockId::Tag(BlockTag::Latest), self.relayer.address()) - .await - .unwrap_or_default(); - - let current_nonce = Mutex::new(nonce); - // Relay the transaction - let starknet_transaction_hash = LockedRelayer::new( - current_nonce.lock().await, - self.relayer.address(), - relayer_balance, - self.starknet_provider(), - self.starknet_provider().chain_id().await.expect("Failed to get chain id"), - ) - .relay_transaction(&tx_signed) - .await - .expect("Failed to relay transaction"); + let starknet_transaction_hash = Relayer::new(self.relayer.address(), relayer_balance, self.starknet_provider()) + .relay_transaction(&tx_signed) + .await + .expect("Failed to relay transaction"); watch_tx( self.eth_client.eth_provider().starknet_provider_inner(), @@ -241,26 +225,11 @@ impl KakarotEOA

{ .await?; let relayer_balance = into_via_try_wrapper!(relayer_balance)?; - let nonce = self - .eth_client - .starknet_provider() - .get_nonce(BlockId::Tag(BlockTag::Latest), self.relayer.address()) - .await - .unwrap_or_default(); - - let current_nonce = Mutex::new(nonce); - // Relay the transaction - let starknet_transaction_hash = LockedRelayer::new( - current_nonce.lock().await, - self.relayer.address(), - relayer_balance, - self.starknet_provider(), - self.starknet_provider().chain_id().await.expect("Failed to get chain id"), - ) - .relay_transaction(&tx_signed) - .await - .expect("Failed to relay transaction"); + let starknet_transaction_hash = Relayer::new(self.relayer.address(), relayer_balance, self.starknet_provider()) + .relay_transaction(&tx_signed) + .await + .expect("Failed to relay transaction"); watch_tx( self.eth_client.eth_provider().starknet_provider_inner(), diff --git a/tests/tests/eth_provider.rs b/tests/tests/eth_provider.rs index ec4934850..a4754fbfe 100644 --- a/tests/tests/eth_provider.rs +++ b/tests/tests/eth_provider.rs @@ -20,7 +20,7 @@ use kakarot_rpc::{ constant::{MAX_LOGS, STARKNET_MODULUS}, database::{ethereum::EthereumTransactionStore, types::transaction::StoredTransaction}, provider::EthereumProvider, - starknet::relayer::LockedRelayer, + starknet::relayer::Relayer, BlockProvider, ChainProvider, GasProvider, LogProvider, ReceiptProvider, StateProvider, TransactionProvider, }, test_utils::{ @@ -38,10 +38,8 @@ use rstest::*; use starknet::{ accounts::Account, core::types::{BlockId, BlockTag, Felt}, - providers::Provider, }; use std::sync::Arc; -use tokio::sync::Mutex; #[rstest] #[awt] @@ -748,25 +746,11 @@ async fn test_send_raw_transaction(#[future] katana_empty: Katana, _setup: ()) { .expect("Failed to get relayer balance"); let relayer_balance = into_via_try_wrapper!(relayer_balance).expect("Failed to convert balance"); - let nonce = eth_client - .starknet_provider() - .get_nonce(BlockId::Tag(BlockTag::Latest), katana.eoa.relayer.address()) - .await - .unwrap_or_default(); - - let current_nonce = Mutex::new(nonce); - // Relay the transaction - let _ = LockedRelayer::new( - current_nonce.lock().await, - katana.eoa.relayer.address(), - relayer_balance, - &(*(*eth_client.starknet_provider())), - eth_client.starknet_provider().chain_id().await.expect("Failed to get chain id"), - ) - .relay_transaction(&transaction_signed) - .await - .expect("Failed to relay transaction"); + let _ = Relayer::new(katana.eoa.relayer.address(), relayer_balance, &(*(*eth_client.starknet_provider()))) + .relay_transaction(&transaction_signed) + .await + .expect("Failed to relay transaction"); // Retrieve the current size of the mempool let mempool_size_after_send = eth_client.mempool().pool_size(); @@ -1005,26 +989,12 @@ async fn test_send_raw_transaction_pre_eip_155(#[future] katana_empty: Katana, _ .expect("Failed to get relayer balance"); let relayer_balance = into_via_try_wrapper!(relayer_balance).expect("Failed to convert balance"); - let nonce = katana - .eth_client - .starknet_provider() - .get_nonce(BlockId::Tag(BlockTag::Latest), katana.eoa.relayer.address()) - .await - .unwrap_or_default(); - - let current_nonce = Mutex::new(nonce); - // Relay the transaction - let starknet_transaction_hash = LockedRelayer::new( - current_nonce.lock().await, - katana.eoa.relayer.address(), - relayer_balance, - &(*(*katana.eth_client.starknet_provider())), - katana.eth_client.starknet_provider().chain_id().await.expect("Failed to get chain id"), - ) - .relay_transaction(&transaction_signed) - .await - .expect("Failed to relay transaction"); + let starknet_transaction_hash = + Relayer::new(katana.eoa.relayer.address(), relayer_balance, &(*(*katana.eth_client.starknet_provider()))) + .relay_transaction(&transaction_signed) + .await + .expect("Failed to relay transaction"); watch_tx( eth_provider.starknet_provider_inner(), @@ -1367,22 +1337,11 @@ async fn test_transaction_by_hash(#[future] katana_empty: Katana, _setup: ()) { .expect("Failed to get relayer balance"); let relayer_balance = into_via_try_wrapper!(relayer_balance).expect("Failed to convert balance"); - let nonce = katana_empty - .eth_client - .starknet_provider() - .get_nonce(BlockId::Tag(BlockTag::Latest), katana_empty.eoa.relayer.address()) - .await - .unwrap_or_default(); - - let current_nonce = Mutex::new(nonce); - // Relay the transaction - let _ = LockedRelayer::new( - current_nonce.lock().await, + let _ = Relayer::new( katana_empty.eoa.relayer.address(), relayer_balance, &(*(*katana_empty.eth_client.starknet_provider())), - katana_empty.eth_client.starknet_provider().chain_id().await.expect("Failed to get chain id"), ) .relay_transaction(&transaction_signed) .await