Skip to content

Commit

Permalink
nonce management: query nonce on-chain before relaying tx (#1508)
Browse files Browse the repository at this point in the history
* nonce management: query nonce on-chain before relaying tx

* fix comments

* fmt

* fix comment
  • Loading branch information
tcoratger authored Oct 29, 2024
1 parent 6b6f25c commit ea31b60
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 222 deletions.
14 changes: 3 additions & 11 deletions src/bin/hive_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ use alloy_primitives::bytes::{Buf, BytesMut};
use alloy_rlp::Decodable;
use clap::Parser;
use kakarot_rpc::{
constants::STARKNET_CHAIN_ID,
into_via_try_wrapper,
providers::{eth_provider::starknet::relayer::LockedRelayer, sn_provider::StarknetProvider},
providers::{eth_provider::starknet::relayer::Relayer, sn_provider::StarknetProvider},
};
use reth_primitives::{Block, BlockBody};
use starknet::{
core::types::{BlockId, BlockTag, Felt},
providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider},
};
use std::{path::PathBuf, str::FromStr};
use tokio::{fs::File, io::AsyncReadExt, sync::Mutex};
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, FramedRead};
use url::Url;
Expand Down Expand Up @@ -72,13 +71,10 @@ async fn main() -> eyre::Result<()> {
let relayer_balance = starknet_provider.balance_at(args.relayer_address, BlockId::Tag(BlockTag::Latest)).await?;
let relayer_balance = into_via_try_wrapper!(relayer_balance)?;

let current_nonce = Mutex::new(Felt::ZERO);
let mut relayer = LockedRelayer::new(
current_nonce.lock().await,
let relayer = Relayer::new(
args.relayer_address,
relayer_balance,
JsonRpcClient::new(HttpTransport::new(Url::from_str(STARKNET_RPC_URL)?)),
*STARKNET_CHAIN_ID,
);

// Read the rlp file
Expand Down Expand Up @@ -107,10 +103,6 @@ async fn main() -> eyre::Result<()> {
for transaction in &body.transactions {
relayer.relay_transaction(transaction).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// Increase the relayer's nonce
let nonce = relayer.nonce_mut();
*nonce += Felt::ONE;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn main() -> Result<()> {
// Start the relayer manager
let addresses =
var("RELAYERS_ADDRESSES")?.split(',').filter_map(|addr| Felt::from_str(addr).ok()).collect::<Vec<_>>();
AccountManager::from_addresses(addresses, Arc::clone(&eth_client)).await?.start();
AccountManager::new(addresses, Arc::clone(&eth_client)).start();

// Start the maintenance of the mempool
maintain_transaction_pool(Arc::clone(&eth_client), PRUNE_DURATION);
Expand Down
129 changes: 28 additions & 101 deletions src/pool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use crate::{
constants::KAKAROT_RPC_CONFIG,
into_via_try_wrapper,
pool::constants::ONE_TENTH_ETH,
providers::eth_provider::{database::state::EthDatabase, starknet::relayer::LockedRelayer, BlockProvider},
providers::eth_provider::{database::state::EthDatabase, starknet::relayer::Relayer, BlockProvider},
};
use alloy_primitives::{Address, U256};
use futures::future::select_all;
use rand::{seq::SliceRandom, SeedableRng};
use reth_chainspec::ChainSpec;
use reth_execution_types::ChangedAccount;
Expand All @@ -20,11 +19,11 @@ use reth_transaction_pool::{
TransactionOrigin, TransactionPool, TransactionPoolExt,
};
use starknet::{
core::types::{requests::GetNonceRequest, BlockId, BlockTag, Felt},
providers::{jsonrpc::HttpTransport, JsonRpcClient, ProviderRequestData, ProviderResponseData},
core::types::{BlockTag, Felt},
providers::{jsonrpc::HttpTransport, JsonRpcClient},
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::{sync::Mutex, time::Instant};
use tokio::time::Instant;
use tracing::instrument;

/// A type alias for the Kakarot Transaction Validator.
Expand All @@ -38,44 +37,29 @@ pub type TransactionOrdering = CoinbaseTipOrdering<EthPooledTransaction>;
/// A type alias for the Kakarot Sequencer Mempool.
pub type KakarotPool<Client> = Pool<Validator<Client>, TransactionOrdering, NoopBlobStore>;

/// Manages a collection of accounts and their associated nonces, interfacing with an Ethereum client.
/// Manages a collection of accounts addresses, interfacing with an Ethereum client.
///
/// This struct provides functionality to initialize account data from a file, monitor account balances,
/// and process transactions for accounts with sufficient balance.
#[derive(Debug)]
pub struct AccountManager<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> {
/// A shared, mutable collection of accounts and their nonces.
accounts: HashMap<Felt, Arc<Mutex<Felt>>>,
/// A collection of account addresses.
accounts: Vec<Felt>,
/// The Ethereum client used to interact with the blockchain.
eth_client: Arc<EthClient<SP>>,
}

impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountManager<SP> {
/// Initialize the account manager with a set of passed accounts.
pub async fn from_addresses(addresses: Vec<Felt>, eth_client: Arc<EthClient<SP>>) -> eyre::Result<Self> {
let mut accounts = HashMap::new();

for add in addresses {
// Query the initial account_nonce for the account from the provider
let nonce = eth_client
.starknet_provider()
.get_nonce(starknet::core::types::BlockId::Tag(BlockTag::Pending), add)
.await
.unwrap_or_default();
accounts.insert(add, Arc::new(Mutex::new(nonce)));
}

Ok(Self { accounts, eth_client })
pub const fn new(accounts: Vec<Felt>, eth_client: Arc<EthClient<SP>>) -> Self {
Self { accounts, eth_client }
}

/// Starts the account manager task that periodically checks account balances and processes transactions.
#[instrument(skip_all, name = "mempool")]
pub fn start(self) {
let this = Arc::new(self);

// Start the nonce updater in a separate task
this.clone().start_nonce_updater();

tokio::spawn(async move {
loop {
// TODO: add a listener on the pool and only try to call [`best_transaction`]
Expand All @@ -99,7 +83,7 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
let manager = this.clone();
tokio::spawn(async move {
// Lock the relayer account
let maybe_relayer = manager.lock_account().await;
let maybe_relayer = manager.get_relayer().await;
if maybe_relayer.is_err() {
// If we fail to fetch a relayer, we need to re-insert the transaction in the pool
tracing::error!(target: "account_manager", err = ?maybe_relayer.unwrap_err(), "failed to fetch relayer");
Expand All @@ -110,10 +94,11 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
.await;
return;
}
let mut relayer = maybe_relayer.expect("maybe_lock is not error");
let relayer = maybe_relayer.expect("not error");

// Send the Ethereum transaction using the relayer
let transaction_signed = transaction.to_recovered_transaction().into_signed();

let res = relayer.relay_transaction(&transaction_signed).await;
if res.is_err() {
// If the relayer failed to relay the transaction, we need to reposition it in the mempool
Expand All @@ -127,10 +112,6 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
}

tracing::info!(target: "account_manager", starknet_hash = ?res.expect("not error"), ethereum_hash = ?transaction_signed.hash());

// Increment account_nonce after sending a transaction
let nonce = relayer.nonce_mut();
*nonce = *nonce + 1;
});
}

Expand All @@ -140,58 +121,43 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
}

/// Returns the next available account from the manager.
pub async fn lock_account(&self) -> eyre::Result<LockedRelayer<'_, JsonRpcClient<HttpTransport>>>
pub async fn get_relayer(&self) -> eyre::Result<Relayer<JsonRpcClient<HttpTransport>>>
where
SP: starknet::providers::Provider + Send + Sync + Clone + 'static,
{
// Use `StdRng` instead of `ThreadRng` as it is `Send`
let mut rng = rand::rngs::StdRng::from_entropy();

// Collect the accounts into a vector for shuffling
let mut accounts: Vec<_> = self.accounts.iter().collect();
// Shuffle indices of accounts randomly
let mut account_indices: Vec<_> = (0..self.accounts.len()).collect();
account_indices.shuffle(&mut rng);

// Shuffle the accounts randomly before iterating
accounts.shuffle(&mut rng);
for index in account_indices {
let account_address = self.accounts[index];

loop {
if accounts.is_empty() {
return Err(eyre::eyre!("failed to fetch funded account"));
}

// Create the future locks with indices for more efficient removal
// use [`select_all`] to poll an iterator over impl Future<Output = (Felt, MutexGuard<Felt>)>
// We use Box::pin because this Future doesn't implement `Unpin`.
let fut_locks = accounts
.iter()
.enumerate()
.map(|(index, (address, nonce))| Box::pin(async move { (index, *address, nonce.lock().await) }));

// Select the first account that gets unlocked
let ((index, account_address, guard), _, _) = select_all(fut_locks).await;
// Retrieve the balance of the selected account
let balance = self.get_balance(account_address).await?;

// Fetch the balance of the selected account
let balance = self.get_balance(*account_address).await?;

// If the balance is lower than the threshold, remove the account using swap_remove
// Skip accounts with insufficient balance
if balance < U256::from(ONE_TENTH_ETH) {
accounts.swap_remove(index);
continue;
}

// Convert the balance to `Felt`
let balance = into_via_try_wrapper!(balance)?;
let chain_id = self.eth_client.starknet_provider().chain_id().await?;

let account = LockedRelayer::new(
guard,
*account_address,
// Construct the `Relayer` with the account address and other relevant data
let account = Relayer::new(
account_address,
balance,
JsonRpcClient::new(HttpTransport::new(KAKAROT_RPC_CONFIG.network_url.clone())),
chain_id,
);

// Return the account address and the guard on the nonce
// Return the locked relayer instance
return Ok(account);
}

Err(eyre::eyre!("failed to fetch funded account"))
}

/// Retrieves the balance of the specified account address for the [`BlockTag::Pending`]
Expand All @@ -203,45 +169,6 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
.await
.map_err(Into::into)
}

/// Update the nonces for all accounts every minute.
pub fn start_nonce_updater(self: Arc<Self>) {
tokio::spawn(async move {
loop {
// Convert the account addresses into batch requests
let requests = self
.accounts
.keys()
.map(|add| {
ProviderRequestData::GetNonce(GetNonceRequest {
contract_address: *add,
block_id: BlockId::Tag(BlockTag::Pending),
})
})
.collect::<Vec<_>>();

// Try to make the request to the provider. If it fails, display error and retry 1 minute later.
let maybe_new_nonces = self.eth_client.starknet_provider().batch_requests(requests).await;
if maybe_new_nonces.is_err() {
tracing::error!(target: "account_manager", err = ?maybe_new_nonces.unwrap_err(), "failed to get nonces");
// Sleep for 1 minute before the next update
tokio::time::sleep(Duration::from_secs(60)).await;
continue;
}
let new_nonces = maybe_new_nonces.expect("not error");

for ((address, old_nonce), new_nonce) in self.accounts.iter().zip(new_nonces) {
if let ProviderResponseData::GetNonce(new_nonce) = new_nonce {
*old_nonce.lock().await = new_nonce;
tracing::info!(target: "account_manager", ?address, ?new_nonce);
};
}

// Sleep for 1 minute before the next update
tokio::time::sleep(Duration::from_secs(60)).await;
}
});
}
}

#[derive(Default)]
Expand Down
50 changes: 33 additions & 17 deletions src/providers/eth_provider/starknet/relayer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
constants::STARKNET_CHAIN_ID,
models::transaction::transaction_data_to_starknet_calldata,
providers::eth_provider::{
error::{SignatureError, TransactionError},
Expand All @@ -8,13 +9,12 @@ use crate::{
};
use reth_primitives::TransactionSigned;
use starknet::{
accounts::{Account, ExecutionEncoding, ExecutionV1, SingleOwnerAccount},
core::types::{Felt, NonZeroFelt},
accounts::{Account, ConnectedAccount, ExecutionEncoding, ExecutionV1, SingleOwnerAccount},
core::types::{BlockTag, Felt, NonZeroFelt},
providers::Provider,
signers::{LocalWallet, SigningKey},
};
use std::{env::var, ops::Deref, str::FromStr, sync::LazyLock};
use tokio::sync::MutexGuard;

/// Signer for all relayers
static RELAYER_SIGNER: LazyLock<LocalWallet> = LazyLock::new(|| {
Expand All @@ -24,31 +24,38 @@ static RELAYER_SIGNER: LazyLock<LocalWallet> = LazyLock::new(|| {
))
});

/// A relayer holding a lock on a mutex on an account and connected to the Starknet network.
/// A relayer holding an account and a balance.
///
/// The relayer is used to sign transactions and broadcast them on the network.
#[derive(Debug)]
pub struct LockedRelayer<'a, SP: Provider + Send + Sync> {
pub struct Relayer<SP: Provider + Send + Sync> {
/// The account used to sign and broadcast the transaction
account: SingleOwnerAccount<SP, LocalWallet>,
/// The balance of the relayer
balance: Felt,
/// The locked nonce held by the relayer
nonce: MutexGuard<'a, Felt>,
}

impl<'a, SP> LockedRelayer<'a, SP>
impl<SP> Relayer<SP>
where
SP: Provider + Send + Sync,
{
/// Create a new relayer with the provided Starknet provider, address, balance and nonce.
pub fn new(lock: MutexGuard<'a, Felt>, address: Felt, balance: Felt, provider: SP, chain_id: Felt) -> Self {
let relayer =
SingleOwnerAccount::new(provider, RELAYER_SIGNER.clone(), address, chain_id, ExecutionEncoding::New);
/// Create a new relayer with the provided Starknet provider, address, balance.
pub fn new(address: Felt, balance: Felt, provider: SP) -> Self {
let relayer = SingleOwnerAccount::new(
provider,
RELAYER_SIGNER.clone(),
address,
*STARKNET_CHAIN_ID,
ExecutionEncoding::New,
);

Self { account: relayer, balance, nonce: lock }
Self { account: relayer, balance }
}

/// Relay the provided Ethereum transaction on the Starknet network.
/// The relayer nonce is directly fetched from the chain to have the most up-to-date value.
/// This is a way to avoid nonce issues.
///
/// Returns the corresponding Starknet transaction hash.
pub async fn relay_transaction(&self, transaction: &TransactionSigned) -> EthApiResult<Felt> {
// Transform the transaction's data to Starknet calldata
Expand All @@ -62,7 +69,16 @@ where
// Construct the call
let call = starknet::core::types::Call { to: eoa_address, selector: *EXECUTE_FROM_OUTSIDE, calldata };
let mut execution = ExecutionV1::new(vec![call], &self.account);
execution = execution.nonce(*self.nonce);

// Fetch the relayer nonce from the Starknet provider
let relayer_nonce = self
.account
.provider()
.get_nonce(starknet::core::types::BlockId::Tag(BlockTag::Pending), relayer_address)
.await
.unwrap_or_default();

execution = execution.nonce(relayer_nonce);

// We set the max fee to the balance of the account / 5. This means that the account could
// send up to 5 transactions before hitting a feeder gateway error.
Expand All @@ -74,12 +90,12 @@ where
Ok(res.transaction_hash)
}

pub fn nonce_mut(&mut self) -> &mut Felt {
&mut self.nonce
pub fn address(&self) -> Felt {
self.account.address()
}
}

impl<'a, SP> Deref for LockedRelayer<'a, SP>
impl<SP> Deref for Relayer<SP>
where
SP: Provider + Send + Sync,
{
Expand Down
Loading

0 comments on commit ea31b60

Please sign in to comment.