Skip to content

Commit

Permalink
Limit retries on dispense (#63)
Browse files Browse the repository at this point in the history
* limit retries and add better logging

* make in progress explicit

* Minimize chance of miss behaviour

---------

Co-authored-by: xgreenx <xgreenx9999@gmail.com>
  • Loading branch information
MujkicA and xgreenx authored Mar 16, 2024
1 parent ee618bc commit 7c261fa
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 67 deletions.
62 changes: 27 additions & 35 deletions src/dispense_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,30 @@
use std::{
cmp::Ordering,
collections::{BinaryHeap, HashMap, HashSet},
};
use std::collections::BTreeMap;
use std::collections::{HashMap, HashSet};
use std::time::{SystemTime, UNIX_EPOCH};

use fuel_types::Address;

#[derive(Debug, Eq, PartialEq)]
pub struct Entry {
address: Address,
timestamp: u64,
}

impl Ord for Entry {
fn cmp(&self, other: &Self) -> Ordering {
self.timestamp.cmp(&other.timestamp)
}
}

impl PartialOrd for Entry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

pub trait Clock: std::fmt::Debug + Send + Sync {
fn now(&self) -> u64;
}

#[derive(Debug)]
pub struct TokioTime {}
pub struct StdTime {}

impl Clock for TokioTime {
impl Clock for StdTime {
fn now(&self) -> u64 {
tokio::time::Instant::now().elapsed().as_secs()
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
since_the_epoch.as_secs()
}
}

#[derive(Debug)]
pub struct DispenseTracker {
tracked: HashMap<Address, u64>,
queue: BinaryHeap<Entry>,
queue: BTreeMap<u64, Vec<Address>>,
in_progress: HashSet<Address>,
clock: Box<dyn Clock>,
}
Expand All @@ -48,9 +33,9 @@ impl Default for DispenseTracker {
fn default() -> Self {
Self {
tracked: HashMap::default(),
queue: BinaryHeap::default(),
queue: Default::default(),
in_progress: HashSet::default(),
clock: Box::new(TokioTime {}),
clock: Box::new(StdTime {}),
}
}
}
Expand All @@ -59,7 +44,7 @@ impl DispenseTracker {
pub fn new(clock: impl Clock + 'static) -> Self {
Self {
tracked: HashMap::new(),
queue: BinaryHeap::new(),
queue: Default::default(),
in_progress: HashSet::new(),
clock: Box::new(clock),
}
Expand All @@ -70,7 +55,7 @@ impl DispenseTracker {

let timestamp = self.clock.now();
self.tracked.insert(address, timestamp);
self.queue.push(Entry { address, timestamp });
self.queue.entry(timestamp).or_default().push(address);
}

pub fn mark_in_progress(&mut self, address: Address) {
Expand All @@ -84,17 +69,24 @@ impl DispenseTracker {
pub fn evict_expired_entries(&mut self, eviction_duration: u64) {
let now = self.clock.now();

while let Some(oldest_entry) = self.queue.peek() {
if now - oldest_entry.timestamp > eviction_duration {
let removed_entry = self.queue.pop().unwrap();
self.tracked.remove(&removed_entry.address);
while let Some(oldest_entry) = self.queue.first_entry() {
if now - oldest_entry.key() > eviction_duration {
let (_, addresses) = oldest_entry.remove_entry();

for address in addresses {
self.tracked.remove(&address);
}
} else {
break;
}
}
}

pub fn has_tracked(&self, address: &Address) -> bool {
self.tracked.get(address).is_some() || self.in_progress.contains(address)
self.tracked.get(address).is_some()
}

pub fn is_in_progress(&self, address: &Address) -> bool {
self.in_progress.contains(address)
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ mod dispense_tracker;
mod recaptcha;
mod routes;

pub use dispense_tracker::{Clock, TokioTime};
pub use dispense_tracker::{Clock, StdTime};
pub use routes::THE_BIGGEST_AMOUNT;

#[derive(Debug)]
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use fuel_faucet::{config::Config, start_server, TokioTime};
use fuel_faucet::{config::Config, start_server, StdTime};
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() {
let config = Config::default();
init_logger(&config);
let clock = TokioTime {};
let clock = StdTime {};
let (_, task) = start_server(config, clock).await;
let _ = task.await.unwrap();
}
Expand Down
10 changes: 10 additions & 0 deletions src/models.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::{self, Display, Formatter};

use reqwest::StatusCode;
use serde::{Deserialize, Serialize};

Expand All @@ -24,3 +26,11 @@ pub struct DispenseError {
pub status: StatusCode,
pub error: String,
}

impl Display for DispenseError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{self:?}")
}
}

impl std::error::Error for DispenseError {}
83 changes: 54 additions & 29 deletions src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ fn check_and_mark_dispense_limit(
));
}

if tracker.is_in_progress(&address) {
return Err(error(
"Account is already in the process of receiving assets".to_string(),
StatusCode::TOO_MANY_REQUESTS,
));
}

tracker.mark_in_progress(address);
Ok(())
}
Expand Down Expand Up @@ -238,27 +245,37 @@ pub async fn dispense_tokens(
}

check_and_mark_dispense_limit(&dispense_tracker, address, config.dispense_limit_interval)?;
let cleanup = || {

struct CleanUpper<Fn>(Fn)
where
Fn: FnMut();

impl<Fn> Drop for CleanUpper<Fn>
where
Fn: FnMut(),
{
fn drop(&mut self) {
self.0();
}
}

// We want to remove the address from `in_progress` regardless of the outcome of the transaction.
let _cleanup = CleanUpper(|| {
dispense_tracker
.lock()
.unwrap()
.remove_in_progress(&address);
};
});

let provider = wallet.provider().expect("client provider");
let mut tx_id;

loop {
let mut tx_id = None;
for _ in 0..5 {
let mut guard = state.lock().await;
let coin_output = if let Some(previous_coin_output) = &guard.last_output {
*previous_coin_output
} else {
get_coin_output(&wallet, config.dispense_amount)
.await
.map_err(|e| {
cleanup();
e
})?
get_coin_output(&wallet, config.dispense_amount).await?
};

let coin_type = CoinType::Coin(Coin {
Expand Down Expand Up @@ -296,55 +313,63 @@ pub async fn dispense_tokens(
.fee_checked_from_tx(&network_config.network_info.consensus_parameters)
.expect("Should be able to calculate fee");

tx_id = script.id(network_config.network_info.consensus_parameters.chain_id);
let id = script.id(network_config.network_info.consensus_parameters.chain_id);
let result = tokio::time::timeout(
Duration::from_secs(config.timeout),
provider.send_transaction(script),
)
.await
.map(|r| {
.map_err(|_| {
error(
format!("Timeout while submitting transaction for address: {address:X}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
})
.and_then(|r| {
r.map_err(|e| {
error(
format!("Failed to submit transaction: {e}"),
format!(
"Failed to submit transaction for address: {address:X} with error: {}",
e
),
StatusCode::INTERNAL_SERVER_ERROR,
)
})
})
.map_err(|e| {
error(
format!("Timeout while submitting transaction: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
});

match result {
Ok(Ok(_)) => {
Ok(_) => {
guard.last_output = Some(CoinOutput {
utxo_id: UtxoId::new(tx_id, 1),
utxo_id: UtxoId::new(id, 1),
owner: coin_output.owner,
amount: coin_output.amount - total_fee.min_fee() - config.dispense_amount,
});
tx_id = Some(id);
break;
}
_ => {
Err(e) => {
tracing::warn!("{}", e);
guard.last_output = None;
}
};
}

submit_tx_with_timeout(&client, &tx_id, config.timeout)
.await
.map_err(|e| {
cleanup();
e
})?;
let Some(tx_id) = tx_id else {
return Err(error(
"Failed to submit transaction".to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
));
};

submit_tx_with_timeout(&client, &tx_id, config.timeout).await?;

info!(
"dispensed {} tokens to {:#x}",
config.dispense_amount, &address
);

dispense_tracker.lock().unwrap().track(address);
let mut tracker = dispense_tracker.lock().unwrap();
tracker.track(address);

Ok(DispenseResponse {
status: "Success".to_string(),
Expand Down

0 comments on commit 7c261fa

Please sign in to comment.