Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[iota-faucet] Enable the use of batched mode on faucet #4110

Merged
merged 7 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions crates/iota-faucet/src/faucet/simple_faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ impl SimpleFaucet {
.map(|q| GasCoin::try_from(&q.1).unwrap())
.filter(|coin| coin.0.balance.value() >= (config.amount * config.num_coins as u64))
.collect::<Vec<GasCoin>>();

if coins.is_empty() {
return Err(FaucetError::NoGasCoinAvailable);
}

let metrics = FaucetMetrics::new(prometheus_registry);

let wal = WriteAheadLog::open(wal_path);
Expand All @@ -131,16 +136,19 @@ impl SimpleFaucet {
config.max_request_queue_length as usize,
);

// This is to handle the case where there is only 1 coin, we want it to go to
// the normal queue
let split_point = if coins.len() > 10 {
coins.len() / 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuel-rufi could you elaborate on how the gas pool handles gas tokens? Why were there multiple gas coins put into it before and why do we only put one now? Is the faucet capable of issuing concurrent transactions with different gas coins?

Copy link
Member Author

@samuel-rufi samuel-rufi Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question @lzpap !

So first of all each user request to the gas pool will be handled in a dedicated task spawn in

async fn request_gas(
.

Each task, will try to get a free gas object from the queue (protected by an Mutex to make sure each request gets a different gas object). The gas pool can serve multiple requests using multiple gas objects basically at the same time.

So how does it work in more detail? For each TX, it will try to get an available coin from this queue. This coin will be locked and unlocked (also called "recycled") once the transaction was successful. If the transaction fails, the coin will stay locked and the queue with available objects shrinks.

So if batch is activated, only one gas object is allocated to the normal gas pool and all others are added to the batch pool. If batch is deactivated, all gas coins are added to the gas pool. Do you think we should change this splitting logic?

In case we don't want to use batch mode, but still want to be able to serve multiple faucet requests (basically at the same time) we can make use of the gas pool, but need to make sure that it was initialized with multiple gas coins. Also for the batch mode, multiple gas coins are needed in order to handle multiple batches at the same time.

--

In case a request comes in and there is no gas coin available (for both modes), it will wait some seconds and then return an error if the queue is still empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the answers!
So currently on the testnet we have the non-batch mode and 2 objects in the gas pool (obj 1) and (obj 2), right?
@junwei0117 next time the faucet gets topped up would be nice to send the coins in smaller batches to it, so it has 5-10 coins and can serve concurrent requests as well.

// Split the coins eventually into two pools: one for the gas pool and one for
// the batch pool. The batch pool will only be populated if the batch feature is
// enabled.
let split_point = if config.batch_enabled {
if coins.len() > 1 {
1 // At least one coin goes to the gas pool the rest to the batch pool
Alex6323 marked this conversation as resolved.
Show resolved Hide resolved
} else {
0 // Only one coin available, all coins go to the batch pool. This is safe as we have already checked above that `coins` is not empty.
}
} else {
coins.len()
coins.len() // All coins go to the gas pool if batch is disabled
};
// Put half of the coins in the old faucet impl queue, and put half in the other
// queue for batch coins. In the test cases we create an account with 5
// coins so we just let this run with a minimum of 5 coins

for (coins_processed, coin) in coins.iter().enumerate() {
let coin_id = *coin.id();
if let Some(write_ahead_log::Entry {
Expand Down Expand Up @@ -946,6 +954,7 @@ impl Faucet for SimpleFaucet {
{
return Err(FaucetError::BatchSendQueueFull);
}

let mut task_map = self.task_id_cache.lock().await;
task_map.insert(
id,
Expand Down Expand Up @@ -1035,6 +1044,7 @@ pub async fn batch_transfer_gases(
"Batch transfer attempted of size: {:?}", total_requests
);
let total_iota_needed: u64 = requests.iter().flat_map(|(_, _, amounts)| amounts).sum();

// This loop is utilized to grab a coin that is large enough for the request
loop {
let gas_coin_response = faucet
Expand Down Expand Up @@ -1292,7 +1302,10 @@ mod tests {
#[tokio::test]
async fn test_batch_transfer_interface() {
let test_cluster = TestClusterBuilder::new().build().await;
let config: FaucetConfig = Default::default();
let config: FaucetConfig = FaucetConfig {
batch_enabled: true,
..Default::default()
};
let coin_amount = config.amount;
let prom_registry = Registry::new();
let tmp = tempfile::tempdir().unwrap();
Expand Down Expand Up @@ -1892,7 +1905,10 @@ mod tests {
#[tokio::test]
async fn test_amounts_transferred_on_batch() {
let test_cluster = TestClusterBuilder::new().build().await;
let config: FaucetConfig = Default::default();
let config: FaucetConfig = FaucetConfig {
batch_enabled: true,
..Default::default()
};
let address = test_cluster.get_address_0();
let mut context = test_cluster.wallet;
let gas_coins = context
Expand Down
270 changes: 267 additions & 3 deletions crates/iota/tests/cli_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ use iota_test_transaction_builder::batch_make_transfer_transactions;
use iota_types::{
base_types::{IotaAddress, ObjectID},
crypto::{
Ed25519IotaSignature, IotaKeyPair, IotaSignatureInner, Secp256k1IotaSignature,
SignatureScheme, get_key_pair,
AccountKeyPair, Ed25519IotaSignature, IotaKeyPair, IotaSignatureInner,
Secp256k1IotaSignature, SignatureScheme, get_key_pair,
},
error::IotaObjectResponseError,
gas_coin::GasCoin,
Expand Down Expand Up @@ -4168,8 +4168,10 @@ async fn test_faucet() -> Result<(), anyhow::Error> {
let wallet_config = test_cluster.swarm.dir().join(IOTA_CLIENT_CONFIG);
let mut context = WalletContext::new(&wallet_config, None, None)?;

let (address, _): (_, AccountKeyPair) = get_key_pair();

let faucet_result = IotaClientCommands::Faucet {
address: None,
address: Some(KeyIdentity::Address(address)),
url: Some("http://127.0.0.1:5003/gas".to_string()),
}
.execute(&mut context)
Expand All @@ -4180,6 +4182,268 @@ async fn test_faucet() -> Result<(), anyhow::Error> {
unreachable!("Invalid response");
};

sleep(Duration::from_secs(5)).await;

let gas_objects_after = context
.get_gas_objects_owned_by_address(address, None)
.await
.unwrap()
.len();
assert_eq!(gas_objects_after, 1);

samuel-rufi marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

#[sim_test]
async fn test_faucet_batch() -> Result<(), anyhow::Error> {
let test_cluster = TestClusterBuilder::new()
.with_fullnode_rpc_port(9000)
.build()
.await;

let context = test_cluster.wallet;

let tmp = tempfile::tempdir().unwrap();
let prom_registry = prometheus::Registry::new();
let config = iota_faucet::FaucetConfig {
batch_enabled: true,
..Default::default()
};

let prometheus_registry = prometheus::Registry::new();
let app_state = std::sync::Arc::new(iota_faucet::AppState {
faucet: iota_faucet::SimpleFaucet::new(
context,
&prometheus_registry,
&tmp.path().join("faucet.wal"),
config.clone(),
)
.await
.unwrap(),
config,
});
tokio::spawn(async move { iota_faucet::start_faucet(app_state, 10, &prom_registry).await });

// Wait for the faucet to be up
sleep(Duration::from_secs(1)).await;
let wallet_config = test_cluster.swarm.dir().join(IOTA_CLIENT_CONFIG);
let mut context = WalletContext::new(&wallet_config, None, None)?;

let (address_1, _): (_, AccountKeyPair) = get_key_pair();
let (address_2, _): (_, AccountKeyPair) = get_key_pair();
let (address_3, _): (_, AccountKeyPair) = get_key_pair();

assert_ne!(address_1, address_2);
assert_ne!(address_1, address_3);
assert_ne!(address_2, address_3);

for address in [address_1, address_2, address_3].iter() {
let gas_objects_after = context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len();
assert_eq!(gas_objects_after, 0);
}

for address in [address_1, address_2, address_3].iter() {
let faucet_result = IotaClientCommands::Faucet {
address: Some(KeyIdentity::Address(*address)),
url: Some("http://127.0.0.1:5003/v1/gas".to_string()),
}
.execute(&mut context)
.await?;

if let IotaClientCommandResult::NoOutput = faucet_result {
} else {
unreachable!("Invalid response");
};
}

// we need to wait a minimum of 10 seconds for gathering the batch + some time
// for transaction to be sequenced
sleep(Duration::from_secs(15)).await;

for address in [address_1, address_2, address_3].iter() {
let gas_objects_after = context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len();
assert_eq!(gas_objects_after, 1);
}

// try with a new batch
let (address_4, _): (_, AccountKeyPair) = get_key_pair();
let (address_5, _): (_, AccountKeyPair) = get_key_pair();
let (address_6, _): (_, AccountKeyPair) = get_key_pair();

assert_ne!(address_4, address_5);
assert_ne!(address_4, address_6);
assert_ne!(address_5, address_6);

for address in [address_4, address_5, address_6].iter() {
let gas_objects_after = context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len();
assert_eq!(gas_objects_after, 0);
}

for address in [address_4, address_5, address_6].iter() {
let faucet_result = IotaClientCommands::Faucet {
address: Some(KeyIdentity::Address(*address)),
url: Some("http://127.0.0.1:5003/v1/gas".to_string()),
}
.execute(&mut context)
.await?;

if let IotaClientCommandResult::NoOutput = faucet_result {
} else {
unreachable!("Invalid response");
};
}

// we need to wait a minimum of 10 seconds for gathering the batch + some time
// for transaction to be sequenced
sleep(Duration::from_secs(15)).await;

for address in [address_4, address_5, address_6].iter() {
let gas_objects_after = context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len();
assert_eq!(gas_objects_after, 1);
}

Ok(())
}

#[sim_test]
async fn test_faucet_batch_concurrent_requests() -> Result<(), anyhow::Error> {
let test_cluster = TestClusterBuilder::new()
.with_fullnode_rpc_port(9000)
.build()
.await;

let context = test_cluster.wallet;

let tmp = tempfile::tempdir().unwrap();
let prom_registry = prometheus::Registry::new();
let config = iota_faucet::FaucetConfig {
batch_enabled: true,
..Default::default()
};

let prometheus_registry = prometheus::Registry::new();
let app_state = std::sync::Arc::new(iota_faucet::AppState {
faucet: iota_faucet::SimpleFaucet::new(
context,
&prometheus_registry,
&tmp.path().join("faucet.wal"),
config.clone(),
)
.await
.unwrap(),
config,
});
tokio::spawn(async move { iota_faucet::start_faucet(app_state, 10, &prom_registry).await });

// Wait for the faucet to be up
sleep(Duration::from_secs(1)).await;

let wallet_config = test_cluster.swarm.dir().join(IOTA_CLIENT_CONFIG);
let context = WalletContext::new(&wallet_config, None, None)?; // Use immutable context

// Generate multiple addresses
let addresses: Vec<_> = (0..6)
.map(|_| get_key_pair::<AccountKeyPair>().0)
.collect::<Vec<IotaAddress>>();

// Ensure all addresses have zero gas objects initially
for address in &addresses {
assert_eq!(
context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len(),
0
);
}

// First batch: send faucet requests concurrently for all addresses
let first_batch_results: Vec<_> = futures::future::join_all(addresses.iter().map(|address| {
let wallet_config = wallet_config.clone();
async move {
let mut context = WalletContext::new(&wallet_config, None, None)?; // Use mutable context (for faucet requests)
IotaClientCommands::Faucet {
address: Some(KeyIdentity::Address(*address)),
url: Some("http://127.0.0.1:5003/v1/gas".to_string()),
}
.execute(&mut context)
.await
}
}))
.await;

// Ensure all results are `NoOutput` indicating requests were batched
for result in first_batch_results {
assert!(matches!(result, Ok(IotaClientCommandResult::NoOutput)));
}

// Wait for the first batch to complete
sleep(Duration::from_secs(15)).await;

// Validate gas objects after the first batch
for address in &addresses {
assert_eq!(
context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len(),
1
);
}

// Second batch: send faucet requests again for all addresses
let second_batch_results: Vec<_> = futures::future::join_all(addresses.iter().map(|address| {
let wallet_config = wallet_config.clone();
async move {
let mut context = WalletContext::new(&wallet_config, None, None)?; // Use mutable context
IotaClientCommands::Faucet {
address: Some(KeyIdentity::Address(*address)),
url: Some("http://127.0.0.1:5003/v1/gas".to_string()),
}
.execute(&mut context)
.await
}
}))
.await;

// Ensure all results are `NoOutput` for the second batch
for result in second_batch_results {
assert!(matches!(result, Ok(IotaClientCommandResult::NoOutput)));
}

// Wait for the second batch to complete
sleep(Duration::from_secs(15)).await;

// Validate gas objects after the second batch
for address in &addresses {
assert_eq!(
context
.get_gas_objects_owned_by_address(*address, None)
.await
.unwrap()
.len(),
2
);
}

Ok(())
}

Expand Down
Loading