Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nonce management: query nonce on-chain before relaying tx #1508

Merged
merged 4 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions src/bin/hive_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ use alloy_primitives::bytes::{Buf, BytesMut};
use alloy_rlp::Decodable;
use clap::Parser;
use kakarot_rpc::{
constants::STARKNET_CHAIN_ID,
into_via_try_wrapper,
providers::{eth_provider::starknet::relayer::LockedRelayer, sn_provider::StarknetProvider},
providers::{eth_provider::starknet::relayer::Relayer, sn_provider::StarknetProvider},
};
use reth_primitives::{Block, BlockBody};
use starknet::{
core::types::{BlockId, BlockTag, Felt},
providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider},
};
use std::{path::PathBuf, str::FromStr};
use tokio::{fs::File, io::AsyncReadExt, sync::Mutex};
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, FramedRead};
use url::Url;
Expand Down Expand Up @@ -72,13 +71,10 @@ async fn main() -> eyre::Result<()> {
let relayer_balance = starknet_provider.balance_at(args.relayer_address, BlockId::Tag(BlockTag::Latest)).await?;
let relayer_balance = into_via_try_wrapper!(relayer_balance)?;

let current_nonce = Mutex::new(Felt::ZERO);
let mut relayer = LockedRelayer::new(
current_nonce.lock().await,
let relayer = Relayer::new(
args.relayer_address,
relayer_balance,
JsonRpcClient::new(HttpTransport::new(Url::from_str(STARKNET_RPC_URL)?)),
*STARKNET_CHAIN_ID,
);

// Read the rlp file
Expand Down Expand Up @@ -107,10 +103,6 @@ async fn main() -> eyre::Result<()> {
for transaction in &body.transactions {
relayer.relay_transaction(transaction).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// Increase the relayer's nonce
let nonce = relayer.nonce_mut();
*nonce += Felt::ONE;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn main() -> Result<()> {
// Start the relayer manager
let addresses =
var("RELAYERS_ADDRESSES")?.split(',').filter_map(|addr| Felt::from_str(addr).ok()).collect::<Vec<_>>();
AccountManager::from_addresses(addresses, Arc::clone(&eth_client)).await?.start();
AccountManager::new(addresses, Arc::clone(&eth_client)).start();

// Start the maintenance of the mempool
maintain_transaction_pool(Arc::clone(&eth_client), PRUNE_DURATION);
Expand Down
129 changes: 28 additions & 101 deletions src/pool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use crate::{
constants::KAKAROT_RPC_CONFIG,
into_via_try_wrapper,
pool::constants::ONE_TENTH_ETH,
providers::eth_provider::{database::state::EthDatabase, starknet::relayer::LockedRelayer, BlockProvider},
providers::eth_provider::{database::state::EthDatabase, starknet::relayer::Relayer, BlockProvider},
};
use alloy_primitives::{Address, U256};
use futures::future::select_all;
use rand::{seq::SliceRandom, SeedableRng};
use reth_chainspec::ChainSpec;
use reth_execution_types::ChangedAccount;
Expand All @@ -20,11 +19,11 @@ use reth_transaction_pool::{
TransactionOrigin, TransactionPool, TransactionPoolExt,
};
use starknet::{
core::types::{requests::GetNonceRequest, BlockId, BlockTag, Felt},
providers::{jsonrpc::HttpTransport, JsonRpcClient, ProviderRequestData, ProviderResponseData},
core::types::{BlockTag, Felt},
providers::{jsonrpc::HttpTransport, JsonRpcClient},
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::{sync::Mutex, time::Instant};
use tokio::time::Instant;
use tracing::instrument;

/// A type alias for the Kakarot Transaction Validator.
Expand All @@ -38,44 +37,29 @@ pub type TransactionOrdering = CoinbaseTipOrdering<EthPooledTransaction>;
/// A type alias for the Kakarot Sequencer Mempool.
pub type KakarotPool<Client> = Pool<Validator<Client>, TransactionOrdering, NoopBlobStore>;

/// Manages a collection of accounts and their associated nonces, interfacing with an Ethereum client.
/// Manages a collection of accounts addresses, interfacing with an Ethereum client.
///
/// This struct provides functionality to initialize account data from a file, monitor account balances,
/// and process transactions for accounts with sufficient balance.
#[derive(Debug)]
pub struct AccountManager<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> {
/// A shared, mutable collection of accounts and their nonces.
accounts: HashMap<Felt, Arc<Mutex<Felt>>>,
/// A collection of account addresses.
accounts: Vec<Felt>,
/// The Ethereum client used to interact with the blockchain.
eth_client: Arc<EthClient<SP>>,
}

impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountManager<SP> {
/// Initialize the account manager with a set of passed accounts.
pub async fn from_addresses(addresses: Vec<Felt>, eth_client: Arc<EthClient<SP>>) -> eyre::Result<Self> {
let mut accounts = HashMap::new();

for add in addresses {
// Query the initial account_nonce for the account from the provider
let nonce = eth_client
.starknet_provider()
.get_nonce(starknet::core::types::BlockId::Tag(BlockTag::Pending), add)
.await
.unwrap_or_default();
accounts.insert(add, Arc::new(Mutex::new(nonce)));
}

Ok(Self { accounts, eth_client })
pub const fn new(accounts: Vec<Felt>, eth_client: Arc<EthClient<SP>>) -> Self {
Self { accounts, eth_client }
}

/// Starts the account manager task that periodically checks account balances and processes transactions.
#[instrument(skip_all, name = "mempool")]
pub fn start(self) {
let this = Arc::new(self);

// Start the nonce updater in a separate task
this.clone().start_nonce_updater();

tokio::spawn(async move {
loop {
// TODO: add a listener on the pool and only try to call [`best_transaction`]
Expand All @@ -99,7 +83,7 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
let manager = this.clone();
tokio::spawn(async move {
// Lock the relayer account
let maybe_relayer = manager.lock_account().await;
let maybe_relayer = manager.get_relayer().await;
if maybe_relayer.is_err() {
// If we fail to fetch a relayer, we need to re-insert the transaction in the pool
tracing::error!(target: "account_manager", err = ?maybe_relayer.unwrap_err(), "failed to fetch relayer");
Expand All @@ -110,10 +94,11 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
.await;
return;
}
let mut relayer = maybe_relayer.expect("maybe_lock is not error");
let relayer = maybe_relayer.expect("not error");

// Send the Ethereum transaction using the relayer
let transaction_signed = transaction.to_recovered_transaction().into_signed();

let res = relayer.relay_transaction(&transaction_signed).await;
if res.is_err() {
// If the relayer failed to relay the transaction, we need to reposition it in the mempool
Expand All @@ -127,10 +112,6 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
}

tracing::info!(target: "account_manager", starknet_hash = ?res.expect("not error"), ethereum_hash = ?transaction_signed.hash());

// Increment account_nonce after sending a transaction
let nonce = relayer.nonce_mut();
*nonce = *nonce + 1;
});
}

Expand All @@ -140,58 +121,43 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
}

/// Returns the next available account from the manager.
pub async fn lock_account(&self) -> eyre::Result<LockedRelayer<'_, JsonRpcClient<HttpTransport>>>
pub async fn get_relayer(&self) -> eyre::Result<Relayer<JsonRpcClient<HttpTransport>>>
where
SP: starknet::providers::Provider + Send + Sync + Clone + 'static,
{
// Use `StdRng` instead of `ThreadRng` as it is `Send`
let mut rng = rand::rngs::StdRng::from_entropy();

// Collect the accounts into a vector for shuffling
let mut accounts: Vec<_> = self.accounts.iter().collect();
// Shuffle indices of accounts randomly
let mut account_indices: Vec<_> = (0..self.accounts.len()).collect();
account_indices.shuffle(&mut rng);

// Shuffle the accounts randomly before iterating
accounts.shuffle(&mut rng);
for index in account_indices {
let account_address = self.accounts[index];

loop {
if accounts.is_empty() {
return Err(eyre::eyre!("failed to fetch funded account"));
}

// Create the future locks with indices for more efficient removal
// use [`select_all`] to poll an iterator over impl Future<Output = (Felt, MutexGuard<Felt>)>
// We use Box::pin because this Future doesn't implement `Unpin`.
let fut_locks = accounts
.iter()
.enumerate()
.map(|(index, (address, nonce))| Box::pin(async move { (index, *address, nonce.lock().await) }));

// Select the first account that gets unlocked
let ((index, account_address, guard), _, _) = select_all(fut_locks).await;
// Retrieve the balance of the selected account
let balance = self.get_balance(account_address).await?;

// Fetch the balance of the selected account
let balance = self.get_balance(*account_address).await?;

// If the balance is lower than the threshold, remove the account using swap_remove
// Skip accounts with insufficient balance
if balance < U256::from(ONE_TENTH_ETH) {
accounts.swap_remove(index);
continue;
}

// Convert the balance to `Felt`
let balance = into_via_try_wrapper!(balance)?;
let chain_id = self.eth_client.starknet_provider().chain_id().await?;

let account = LockedRelayer::new(
guard,
*account_address,
// Construct the `Relayer` with the account address and other relevant data
let account = Relayer::new(
account_address,
balance,
JsonRpcClient::new(HttpTransport::new(KAKAROT_RPC_CONFIG.network_url.clone())),
chain_id,
);

// Return the account address and the guard on the nonce
// Return the locked relayer instance
return Ok(account);
}

Err(eyre::eyre!("failed to fetch funded account"))
}

/// Retrieves the balance of the specified account address for the [`BlockTag::Pending`]
Expand All @@ -203,45 +169,6 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
.await
.map_err(Into::into)
}

/// Update the nonces for all accounts every minute.
pub fn start_nonce_updater(self: Arc<Self>) {
tokio::spawn(async move {
loop {
// Convert the account addresses into batch requests
let requests = self
.accounts
.keys()
.map(|add| {
ProviderRequestData::GetNonce(GetNonceRequest {
contract_address: *add,
block_id: BlockId::Tag(BlockTag::Pending),
})
})
.collect::<Vec<_>>();

// Try to make the request to the provider. If it fails, display error and retry 1 minute later.
let maybe_new_nonces = self.eth_client.starknet_provider().batch_requests(requests).await;
if maybe_new_nonces.is_err() {
tracing::error!(target: "account_manager", err = ?maybe_new_nonces.unwrap_err(), "failed to get nonces");
// Sleep for 1 minute before the next update
tokio::time::sleep(Duration::from_secs(60)).await;
continue;
}
let new_nonces = maybe_new_nonces.expect("not error");

for ((address, old_nonce), new_nonce) in self.accounts.iter().zip(new_nonces) {
if let ProviderResponseData::GetNonce(new_nonce) = new_nonce {
*old_nonce.lock().await = new_nonce;
tracing::info!(target: "account_manager", ?address, ?new_nonce);
};
}

// Sleep for 1 minute before the next update
tokio::time::sleep(Duration::from_secs(60)).await;
}
});
}
}

#[derive(Default)]
Expand Down
50 changes: 33 additions & 17 deletions src/providers/eth_provider/starknet/relayer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
constants::STARKNET_CHAIN_ID,
models::transaction::transaction_data_to_starknet_calldata,
providers::eth_provider::{
error::{SignatureError, TransactionError},
Expand All @@ -8,13 +9,12 @@ use crate::{
};
use reth_primitives::TransactionSigned;
use starknet::{
accounts::{Account, ExecutionEncoding, ExecutionV1, SingleOwnerAccount},
core::types::{Felt, NonZeroFelt},
accounts::{Account, ConnectedAccount, ExecutionEncoding, ExecutionV1, SingleOwnerAccount},
core::types::{BlockTag, Felt, NonZeroFelt},
providers::Provider,
signers::{LocalWallet, SigningKey},
};
use std::{env::var, ops::Deref, str::FromStr, sync::LazyLock};
use tokio::sync::MutexGuard;

/// Signer for all relayers
static RELAYER_SIGNER: LazyLock<LocalWallet> = LazyLock::new(|| {
Expand All @@ -24,31 +24,38 @@ static RELAYER_SIGNER: LazyLock<LocalWallet> = LazyLock::new(|| {
))
});

/// A relayer holding a lock on a mutex on an account and connected to the Starknet network.
/// A relayer holding an account and a balance.
///
/// The relayer is used to sign transactions and broadcast them on the network.
#[derive(Debug)]
pub struct LockedRelayer<'a, SP: Provider + Send + Sync> {
pub struct Relayer<SP: Provider + Send + Sync> {
/// The account used to sign and broadcast the transaction
account: SingleOwnerAccount<SP, LocalWallet>,
/// The balance of the relayer
balance: Felt,
/// The locked nonce held by the relayer
nonce: MutexGuard<'a, Felt>,
}

impl<'a, SP> LockedRelayer<'a, SP>
impl<SP> Relayer<SP>
where
SP: Provider + Send + Sync,
{
/// Create a new relayer with the provided Starknet provider, address, balance and nonce.
pub fn new(lock: MutexGuard<'a, Felt>, address: Felt, balance: Felt, provider: SP, chain_id: Felt) -> Self {
let relayer =
SingleOwnerAccount::new(provider, RELAYER_SIGNER.clone(), address, chain_id, ExecutionEncoding::New);
/// Create a new relayer with the provided Starknet provider, address, balance.
pub fn new(address: Felt, balance: Felt, provider: SP) -> Self {
let relayer = SingleOwnerAccount::new(
provider,
RELAYER_SIGNER.clone(),
address,
*STARKNET_CHAIN_ID,
ExecutionEncoding::New,
);

Self { account: relayer, balance, nonce: lock }
Self { account: relayer, balance }
}

/// Relay the provided Ethereum transaction on the Starknet network.
/// The relayer nonce is directly fetched from the chain to have the most up-to-date value.
/// This is a way to avoid nonce issues.
///
/// Returns the corresponding Starknet transaction hash.
pub async fn relay_transaction(&self, transaction: &TransactionSigned) -> EthApiResult<Felt> {
// Transform the transaction's data to Starknet calldata
Expand All @@ -62,7 +69,16 @@ where
// Construct the call
let call = starknet::core::types::Call { to: eoa_address, selector: *EXECUTE_FROM_OUTSIDE, calldata };
let mut execution = ExecutionV1::new(vec![call], &self.account);
execution = execution.nonce(*self.nonce);

// Fetch the relayer nonce from the Starknet provider
let relayer_nonce = self
.account
.provider()
.get_nonce(starknet::core::types::BlockId::Tag(BlockTag::Pending), relayer_address)
.await
.unwrap_or_default();

execution = execution.nonce(relayer_nonce);

// We set the max fee to the balance of the account / 5. This means that the account could
// send up to 5 transactions before hitting a feeder gateway error.
Expand All @@ -74,12 +90,12 @@ where
Ok(res.transaction_hash)
}

pub fn nonce_mut(&mut self) -> &mut Felt {
&mut self.nonce
pub fn address(&self) -> Felt {
self.account.address()
}
}

impl<'a, SP> Deref for LockedRelayer<'a, SP>
impl<SP> Deref for Relayer<SP>
where
SP: Provider + Send + Sync,
{
Expand Down
Loading
Loading