Skip to content

Commit

Permalink
Merge branch 'main' into feat/transaction-propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
greged93 authored Oct 29, 2024
2 parents 0cae07e + ea31b60 commit 13283e7
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 257 deletions.
38 changes: 9 additions & 29 deletions docker-compose.staging.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
services:
starknet:
image: nethermind/juno:v0.11.1
image: nethermind/juno:v0.12.2
ports:
- 6060:6060
volumes:
- ${HOME}/code/kkrt-labs/snapshots/juno_sepolia:/var/lib/juno
- $HOME/snapshots/juno_sepolia:/var/lib/juno
command: >
--pending-poll-interval "1s" --http --http-host 0.0.0.0 --http-port 6060
--db-path /var/lib/juno --eth-node ${ETH_NODE_WS} --cn-name KKRT_BETA
--cn-feeder-url https://gateway-beta.kakarot.sw-dev.io/feeder_gateway/
--cn-gateway-url https://gateway-beta.kakarot.sw-dev.io/gateway/
--cn-l1-chain-id 11155111 --cn-l2-chain-id kkrt --cn-core-contract-address
0xc7c9ea7fD0921Cb6EDd9a3184F88cF1b821aA82B --cn-unverifiable-range
0,1000000
--db-path /var/lib/juno --eth-node ${ETH_NODE_WS:?} --network sepolia
networks:
- internal_staging
starknet-explorer:
Expand Down Expand Up @@ -115,29 +110,14 @@ services:
# These values are unique to Sepolia Testnet
# They'll need to be manually changed in case the testnet is reset
# To add robustness: parse the `deployments/starknet-sepolia` folder in `kkrt-labs/kakarot` repo
- KAKAROT_ADDRESS=0x2824d6ed6759ac4c4a54a39b78d04c0e48be8937237026bf8c3bf46a8bea722
- UNINITIALIZED_ACCOUNT_CLASS_HASH=0x600f6862938312a05a0cfecba0dcaf37693efc9e4075a6adfb62e196022678e
- ACCOUNT_CONTRACT_CLASS_HASH=0x1276d0b017701646f8646b69de6c3b3584edce71879678a679f28c07a9971cf
- KAKAROT_ADDRESS=0x48fc2888aad166304e63af35a48e00e32da9831e49fe30a22148fdecdb7e66f
- UNINITIALIZED_ACCOUNT_CLASS_HASH=0x7b2de5e73ff7eb338d76c967dd5aa3f3004574d326b8c1402bb819d4983b8b6
- ACCOUNT_CONTRACT_CLASS_HASH=0x25d4b4889979f3df8171991f6a0163b42b756daebf313bcd0dec74cacf903f9
- MAX_FELTS_IN_CALLDATA=30000
- MAX_LOGS=10000
- WHITE_LISTED_EIP_155_TRANSACTION_HASHES=0xeddf9e61fb9d8f5111840daef55e5fde0041f5702856532cdbb5a02998033d26,0xb6274b80bc7cda162df89894c7748a5cb7ba2eaa6004183c41a1837c3b072f1e,0x07471adfe8f4ec553c1199f495be97fc8be8e0626ae307281c22534460184ed1,0xb95343413e459a0f97461812111254163ae53467855c0d73e0f1e7c5b8442fa3
- RELAYER_PRIVATE_KEY=0x1234
- RELAYERS_ADDRESSES:>
0x6f644133991e779509cb319fd4c9416f949950f07a8bc3cb10916966ec1dca,0xc6d61dd26ec6175554977f61198b28f58a7c5f46228b7fb7b44f6f619d800,
0x6faca0d5ce2437933257a91d95536c7cf3bfa12808f314741efd104e9a0c2d9,0x2cced3d1628b867e1b69216df87d869e7f5c8dd066a9114d44e2d1af3fbf2c4,
0x30ee97c478d8a845d0d6ec0c447650ee6161817199f3d5c2a84f8a033b960b2,0x13b09830d3bef686db99c570df79ddfd813502cfa1d7c640ed871a0de6637ba,
0x1e8f2ef3d53eb800432031fc3000b69b5132a2c4c0df821c20173ac8a91de6d,0x2523ffb01dad3d8803a4bae7e2990d74d92b201f5425f921cec2ccb92ee6f18,
0xf6413b17b8ae645ce13e635df7dcb573086ff9558d8d9d7d2e343a250e4885,0x765e5f1ae2f6d796f359ea7c47c88387046d371aef1f02337921fe326fbacc2,
0x21466caac9f9e2605100c76a0ab15e2cbc72942b876f4665fa490dd83f7a00f,0x28d165b48e0b523044bd790bae4cc7096d8747565b1d58691683662e12b6402,
0x7297a3b129e3d3101bb9956338c9c64b7055556146c7ff2497caad4a831c11d,0x8d9da3628af39d3bb9a2155b4eb63af7791da1fd3864f93bb57130fa2b59d2,
0x4dc2ebca0fa6de9e7f67e821e425a0f9a53cb300cc44cbd93d69c307ac536eb,0x6ea69b53281261da1cfde205a5e8c02db26bfa51df77419fef71014c53e03f1,
0x3709159b0a92aefacba33401098c5b3c09240b67dd5c1d5ee9a778e6205028b,0x3ea725fcff7ddaffbdc1134be5ae164bcbf8155135de1d8663c5efb106c9b08,
0x3135b4bb94b1b7c0d7a7074291d0f1d92ab57d1f61bddd0b7195f70207752ae,0x1eb1e028ffa31652cecb7d18311aac1a281610a535f20e7a3a82c13db226b0c,
0x4f57bf04228bf2d2db2e0a8573c6594558a5f0b487c1229b5fe7723473afa58,0x2f24013ebb0911d5f27a2a2c3d07b8c73b2f36239d8325187ed25db1ce794b5,
0x10e3a8574c22d19951d8e8715cc98a26b9ed9fe39f8e002cc82f49c189ea56c,0x6f823719688a9c3e54f7528d4e2a464773170365c2e8b97d699aa2cc519875e,
0x669774f839f462c3267b1d28e9c31264e1436a48033ee0f71e77bef153cfb81,0x6ad46e2c0b5f52412560bb12ad77e2958d53e5f30fc5bc995101dd700e22597,
0xe07332a9bf6f9d3ced6f3f1cb60607d01fba1f6be9da1b3650c58c8d683e06,0x3c320f3726b90a8e21d200eeb956f863c59c9d3744e5f5d9280c648225c7775,
0xdc68dd15efb6b43a02a8679e4a21a9b8195cd44e6134d5034131520c55dbfa,0x594aa6e8d8d5ea9ea4a86681222f65975ed38c4e0f20e8493c0ad9887b3c3f4
- RELAYER_PRIVATE_KEY=${RELAYERS_PRIVATE_KEY:?}
- RELAYERS_ADDRESSES=${RELAYERS_ADDRESSES:?}
restart: on-failure
volumes:
# Mount the indexer code
Expand Down Expand Up @@ -196,7 +176,7 @@ services:
- STARKNET_NETWORK=http://starknet:6060
- ALLOW_NET=
- MONGO_REPLACE_DATA_INSIDE_TRANSACTION=false
- KAKAROT_ADDRESS=0x2824d6ed6759ac4c4a54a39b78d04c0e48be8937237026bf8c3bf46a8bea722
- KAKAROT_ADDRESS=0x48fc2888aad166304e63af35a48e00e32da9831e49fe30a22148fdecdb7e66f
- DEFAULT_BLOCK_GAS_LIMIT=7000000
restart: on-failure
volumes:
Expand Down
13 changes: 7 additions & 6 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ services:
restart: on-failure

starknet:
image: ghcr.io/dojoengine/dojo:v1.0.0-alpha.14
image: ghcr.io/dojoengine/dojo:v1.0.0-alpha.16
command:
- katana
- --disable-fee
- --host
- 0.0.0.0
- --validate-max-steps
- "16777216"
- --invoke-max-steps
Expand All @@ -40,7 +42,7 @@ services:
restart: on-failure

kakarot-deployer:
image: ghcr.io/kkrt-labs/kakarot/deployer:v0.8.5
image: ghcr.io/kkrt-labs/kakarot/deployer:v0.9.4
# Always pull the latest image, until we use release tags
pull_policy: always
environment:
Expand All @@ -58,8 +60,7 @@ services:
starknet:
condition: service_started
restart: on-failure
networks:
- internal
network_mode: service:anvil

deployments-parser:
image: apteno/alpine-jq:2023-07-24
Expand All @@ -70,8 +71,8 @@ services:
# First line overrides an existing .env, if any.
# This is to make sure that it is clean even though docker volume was not cleaned.
- |
echo "KAKAROT_ADDRESS=$$(jq -r '.kakarot.address' /deployments/katana/deployments.json)" > /deployments/.env;
echo "DEPLOYER_ACCOUNT_ADDRESS=$$(jq -r '.deployer_account.address' /deployments/katana/deployments.json)" >> /deployments/.env;
echo "KAKAROT_ADDRESS=$$(jq -r '.kakarot' /deployments/katana/deployments.json)" > /deployments/.env;
echo "DEPLOYER_ACCOUNT_ADDRESS=$$(jq -r '.deployer_account' /deployments/katana/deployments.json)" >> /deployments/.env;
echo "UNINITIALIZED_ACCOUNT_CLASS_HASH=$$(jq -r '.uninitialized_account' /deployments/katana/declarations.json)" >> /deployments/.env
echo "ACCOUNT_CONTRACT_CLASS_HASH=$$(jq -r '.account_contract' /deployments/katana/declarations.json)" >> /deployments/.env
volumes:
Expand Down
14 changes: 3 additions & 11 deletions src/bin/hive_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ use alloy_primitives::bytes::{Buf, BytesMut};
use alloy_rlp::Decodable;
use clap::Parser;
use kakarot_rpc::{
constants::STARKNET_CHAIN_ID,
into_via_try_wrapper,
providers::{eth_provider::starknet::relayer::LockedRelayer, sn_provider::StarknetProvider},
providers::{eth_provider::starknet::relayer::Relayer, sn_provider::StarknetProvider},
};
use reth_primitives::{Block, BlockBody};
use starknet::{
core::types::{BlockId, BlockTag, Felt},
providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider},
};
use std::{path::PathBuf, str::FromStr};
use tokio::{fs::File, io::AsyncReadExt, sync::Mutex};
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, FramedRead};
use url::Url;
Expand Down Expand Up @@ -72,13 +71,10 @@ async fn main() -> eyre::Result<()> {
let relayer_balance = starknet_provider.balance_at(args.relayer_address, BlockId::Tag(BlockTag::Latest)).await?;
let relayer_balance = into_via_try_wrapper!(relayer_balance)?;

let current_nonce = Mutex::new(Felt::ZERO);
let mut relayer = LockedRelayer::new(
current_nonce.lock().await,
let relayer = Relayer::new(
args.relayer_address,
relayer_balance,
JsonRpcClient::new(HttpTransport::new(Url::from_str(STARKNET_RPC_URL)?)),
*STARKNET_CHAIN_ID,
);

// Read the rlp file
Expand Down Expand Up @@ -107,10 +103,6 @@ async fn main() -> eyre::Result<()> {
for transaction in &body.transactions {
relayer.relay_transaction(transaction).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// Increase the relayer's nonce
let nonce = relayer.nonce_mut();
*nonce += Felt::ONE;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn main() -> Result<()> {
// Start the relayer manager
let addresses =
var("RELAYERS_ADDRESSES")?.split(',').filter_map(|addr| Felt::from_str(addr).ok()).collect::<Vec<_>>();
AccountManager::from_addresses(addresses, Arc::clone(&eth_client)).await?.start();
AccountManager::new(addresses, Arc::clone(&eth_client)).start();

// Start the maintenance of the mempool
maintain_transaction_pool(Arc::clone(&eth_client), PRUNE_DURATION);
Expand Down
129 changes: 28 additions & 101 deletions src/pool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use crate::{
constants::KAKAROT_RPC_CONFIG,
into_via_try_wrapper,
pool::constants::ONE_TENTH_ETH,
providers::eth_provider::{database::state::EthDatabase, starknet::relayer::LockedRelayer, BlockProvider},
providers::eth_provider::{database::state::EthDatabase, starknet::relayer::Relayer, BlockProvider},
};
use alloy_primitives::{Address, U256};
use futures::future::select_all;
use rand::{seq::SliceRandom, SeedableRng};
use reth_chainspec::ChainSpec;
use reth_execution_types::ChangedAccount;
Expand All @@ -20,11 +19,11 @@ use reth_transaction_pool::{
TransactionOrigin, TransactionPool, TransactionPoolExt,
};
use starknet::{
core::types::{requests::GetNonceRequest, BlockId, BlockTag, Felt},
providers::{jsonrpc::HttpTransport, JsonRpcClient, ProviderRequestData, ProviderResponseData},
core::types::{BlockTag, Felt},
providers::{jsonrpc::HttpTransport, JsonRpcClient},
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::{sync::Mutex, time::Instant};
use tokio::time::Instant;
use tracing::instrument;

/// A type alias for the Kakarot Transaction Validator.
Expand All @@ -38,44 +37,29 @@ pub type TransactionOrdering = CoinbaseTipOrdering<EthPooledTransaction>;
/// A type alias for the Kakarot Sequencer Mempool.
pub type KakarotPool<Client> = Pool<Validator<Client>, TransactionOrdering, NoopBlobStore>;

/// Manages a collection of accounts and their associated nonces, interfacing with an Ethereum client.
/// Manages a collection of accounts addresses, interfacing with an Ethereum client.
///
/// This struct provides functionality to initialize account data from a file, monitor account balances,
/// and process transactions for accounts with sufficient balance.
#[derive(Debug)]
pub struct AccountManager<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> {
/// A shared, mutable collection of accounts and their nonces.
accounts: HashMap<Felt, Arc<Mutex<Felt>>>,
/// A collection of account addresses.
accounts: Vec<Felt>,
/// The Ethereum client used to interact with the blockchain.
eth_client: Arc<EthClient<SP>>,
}

impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountManager<SP> {
/// Initialize the account manager with a set of passed accounts.
pub async fn from_addresses(addresses: Vec<Felt>, eth_client: Arc<EthClient<SP>>) -> eyre::Result<Self> {
let mut accounts = HashMap::new();

for add in addresses {
// Query the initial account_nonce for the account from the provider
let nonce = eth_client
.starknet_provider()
.get_nonce(starknet::core::types::BlockId::Tag(BlockTag::Pending), add)
.await
.unwrap_or_default();
accounts.insert(add, Arc::new(Mutex::new(nonce)));
}

Ok(Self { accounts, eth_client })
pub const fn new(accounts: Vec<Felt>, eth_client: Arc<EthClient<SP>>) -> Self {
Self { accounts, eth_client }
}

/// Starts the account manager task that periodically checks account balances and processes transactions.
#[instrument(skip_all, name = "mempool")]
pub fn start(self) {
let this = Arc::new(self);

// Start the nonce updater in a separate task
this.clone().start_nonce_updater();

tokio::spawn(async move {
loop {
// TODO: add a listener on the pool and only try to call [`best_transaction`]
Expand All @@ -99,7 +83,7 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
let manager = this.clone();
tokio::spawn(async move {
// Lock the relayer account
let maybe_relayer = manager.lock_account().await;
let maybe_relayer = manager.get_relayer().await;
if maybe_relayer.is_err() {
// If we fail to fetch a relayer, we need to re-insert the transaction in the pool
tracing::error!(target: "account_manager", err = ?maybe_relayer.unwrap_err(), "failed to fetch relayer");
Expand All @@ -110,10 +94,11 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
.await;
return;
}
let mut relayer = maybe_relayer.expect("maybe_lock is not error");
let relayer = maybe_relayer.expect("not error");

// Send the Ethereum transaction using the relayer
let transaction_signed = transaction.to_recovered_transaction().into_signed();

let res = relayer.relay_transaction(&transaction_signed).await;
if res.is_err() {
// If the relayer failed to relay the transaction, we need to reposition it in the mempool
Expand All @@ -127,10 +112,6 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
}

tracing::info!(target: "account_manager", starknet_hash = ?res.expect("not error"), ethereum_hash = ?transaction_signed.hash());

// Increment account_nonce after sending a transaction
let nonce = relayer.nonce_mut();
*nonce = *nonce + 1;
});
}

Expand All @@ -140,58 +121,43 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
}

/// Returns the next available account from the manager.
pub async fn lock_account(&self) -> eyre::Result<LockedRelayer<'_, JsonRpcClient<HttpTransport>>>
pub async fn get_relayer(&self) -> eyre::Result<Relayer<JsonRpcClient<HttpTransport>>>
where
SP: starknet::providers::Provider + Send + Sync + Clone + 'static,
{
// Use `StdRng` instead of `ThreadRng` as it is `Send`
let mut rng = rand::rngs::StdRng::from_entropy();

// Collect the accounts into a vector for shuffling
let mut accounts: Vec<_> = self.accounts.iter().collect();
// Shuffle indices of accounts randomly
let mut account_indices: Vec<_> = (0..self.accounts.len()).collect();
account_indices.shuffle(&mut rng);

// Shuffle the accounts randomly before iterating
accounts.shuffle(&mut rng);
for index in account_indices {
let account_address = self.accounts[index];

loop {
if accounts.is_empty() {
return Err(eyre::eyre!("failed to fetch funded account"));
}

// Create the future locks with indices for more efficient removal
// use [`select_all`] to poll an iterator over impl Future<Output = (Felt, MutexGuard<Felt>)>
// We use Box::pin because this Future doesn't implement `Unpin`.
let fut_locks = accounts
.iter()
.enumerate()
.map(|(index, (address, nonce))| Box::pin(async move { (index, *address, nonce.lock().await) }));

// Select the first account that gets unlocked
let ((index, account_address, guard), _, _) = select_all(fut_locks).await;
// Retrieve the balance of the selected account
let balance = self.get_balance(account_address).await?;

// Fetch the balance of the selected account
let balance = self.get_balance(*account_address).await?;

// If the balance is lower than the threshold, remove the account using swap_remove
// Skip accounts with insufficient balance
if balance < U256::from(ONE_TENTH_ETH) {
accounts.swap_remove(index);
continue;
}

// Convert the balance to `Felt`
let balance = into_via_try_wrapper!(balance)?;
let chain_id = self.eth_client.starknet_provider().chain_id().await?;

let account = LockedRelayer::new(
guard,
*account_address,
// Construct the `Relayer` with the account address and other relevant data
let account = Relayer::new(
account_address,
balance,
JsonRpcClient::new(HttpTransport::new(KAKAROT_RPC_CONFIG.network_url.clone())),
chain_id,
);

// Return the account address and the guard on the nonce
// Return the locked relayer instance
return Ok(account);
}

Err(eyre::eyre!("failed to fetch funded account"))
}

/// Retrieves the balance of the specified account address for the [`BlockTag::Pending`]
Expand All @@ -203,45 +169,6 @@ impl<SP: starknet::providers::Provider + Send + Sync + Clone + 'static> AccountM
.await
.map_err(Into::into)
}

/// Update the nonces for all accounts every minute.
pub fn start_nonce_updater(self: Arc<Self>) {
tokio::spawn(async move {
loop {
// Convert the account addresses into batch requests
let requests = self
.accounts
.keys()
.map(|add| {
ProviderRequestData::GetNonce(GetNonceRequest {
contract_address: *add,
block_id: BlockId::Tag(BlockTag::Pending),
})
})
.collect::<Vec<_>>();

// Try to make the request to the provider. If it fails, display error and retry 1 minute later.
let maybe_new_nonces = self.eth_client.starknet_provider().batch_requests(requests).await;
if maybe_new_nonces.is_err() {
tracing::error!(target: "account_manager", err = ?maybe_new_nonces.unwrap_err(), "failed to get nonces");
// Sleep for 1 minute before the next update
tokio::time::sleep(Duration::from_secs(60)).await;
continue;
}
let new_nonces = maybe_new_nonces.expect("not error");

for ((address, old_nonce), new_nonce) in self.accounts.iter().zip(new_nonces) {
if let ProviderResponseData::GetNonce(new_nonce) = new_nonce {
*old_nonce.lock().await = new_nonce;
tracing::info!(target: "account_manager", ?address, ?new_nonce);
};
}

// Sleep for 1 minute before the next update
tokio::time::sleep(Duration::from_secs(60)).await;
}
});
}
}

#[derive(Default)]
Expand Down
Loading

0 comments on commit 13283e7

Please sign in to comment.