diff --git a/src/main.rs b/src/main.rs index e45c506..7552a9e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,13 +53,9 @@ const WRITE_BUFFER_SIZE: usize = TRANSFER_BUFFER_SIZE * 100; const TRANSFER_BUFFER_SIZE: usize = 1024; const INDEX_SIZE: usize = std::mem::size_of::(); const ID_SIZE: usize = std::mem::size_of::(); -const MAX_RETRIES: usize = 10; // UDP header + ID + INDEX + DATA const PACKET_SIZE: usize = 8 + ID_SIZE + INDEX_SIZE + TRANSFER_BUFFER_SIZE; -// how long to wait for a job to be confirmed before requeuing it -const REQUEUE_INTERVAL: Duration = Duration::from_millis(1_000); - #[derive(Clone)] struct TransferStats { confirmed_packets: Arc, diff --git a/src/options.rs b/src/options.rs index 2e536d7..ce10242 100644 --- a/src/options.rs +++ b/src/options.rs @@ -78,7 +78,7 @@ pub(crate) struct Options { pub(crate) control_crypto: Crypto, /// Encrypt the data stream - #[clap(short = 'S', long, default_value = "NONE")] + #[clap(short = 'S', long, default_value = "CHACHA20")] pub(crate) stream_crypto: Crypto, /// Receive timeout in MS @@ -89,6 +89,14 @@ pub(crate) struct Options { #[clap(short, long, default_value_t = 1_000)] pub(crate) job_limit: usize, + /// Requeue interval in MS + #[clap(short = 'i', long, default_value_t = 1_000)] + pub(crate) requeue_interval: u64, + + /// Maximum number of send/receive retries + #[clap(short = 'M', long, default_value_t = 10)] + pub(crate) max_retries: usize, + /// Command to execute cccp #[clap(short = 'E', long, default_value = "cccp")] command: String, @@ -147,6 +155,8 @@ impl Options { format!("-j {}", self.job_limit), format!("-c {}", self.control_crypto), format!("-S {}", self.stream_crypto), + format!("-i {}", self.requeue_interval), + format!("-M {}", self.max_retries), format!("\"{}\"", self.source), format!("\"{}\"", self.destination), ]; diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs index 3f2e096..867f747 100644 --- a/src/receiver/mod.rs +++ b/src/receiver/mod.rs @@ -23,7 +23,7 @@ use crate::error::Error; use crate::items::{message, ConfirmationIndexes, Message, StartIndex}; use crate::receiver::writer::{writer, FileDetails, SplitQueue}; use crate::{ - socket_factory, CipherStream, Options, Result, TransferStats, ID_SIZE, INDEX_SIZE, MAX_RETRIES, + socket_factory, CipherStream, Options, Result, TransferStats, ID_SIZE, INDEX_SIZE, TRANSFER_BUFFER_SIZE, }; @@ -212,7 +212,14 @@ pub(crate) async fn main( let handles: Vec<_> = sockets .into_iter() - .map(|socket| tokio::spawn(receiver(writer_queue.clone(), socket, receive_timeout))) + .map(|socket| { + tokio::spawn(receiver( + writer_queue.clone(), + socket, + receive_timeout, + options.max_retries, + )) + }) .collect(); let receiver_future = async { @@ -232,11 +239,16 @@ pub(crate) async fn main( } } -async fn receiver(queue: WriterQueue, socket: UdpSocket, receive_timeout: Duration) -> Result<()> { +async fn receiver( + queue: WriterQueue, + socket: UdpSocket, + receive_timeout: Duration, + max_retries: usize, +) -> Result<()> { let mut buf = [0; ID_SIZE + INDEX_SIZE + TRANSFER_BUFFER_SIZE]; // buffer for receiving data let mut retries = 0; // counter to keep track of retries - while retries < MAX_RETRIES { + while retries < max_retries { match timeout(receive_timeout, socket.recv(&mut buf)).await { Ok(Ok(read)) if read > 0 => { retries = 0; // reset retries @@ -263,7 +275,7 @@ async fn receiver(queue: WriterQueue, socket: UdpSocket, receive_timeout: Durati } } - if retries == MAX_RETRIES { + if retries == max_retries { Err(Error::max_retries()) } else { Ok(()) diff --git a/src/sender/mod.rs b/src/sender/mod.rs index 2e36d18..9c7f211 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -22,7 +22,7 @@ use crate::items::{message, Confirmations, FileDetail, Manifest, Message, StartI use crate::sender::reader::reader; use crate::{ hash_file, socket_factory, CipherStream, Options, Result, TransferStats, ID_SIZE, INDEX_SIZE, - MAX_RETRIES, REQUEUE_INTERVAL, TRANSFER_BUFFER_SIZE, + TRANSFER_BUFFER_SIZE, }; mod reader; @@ -122,6 +122,7 @@ pub(crate) async fn main( job_sender.clone(), stats.confirmed_packets.clone(), read.clone(), + Duration::from_millis(options.requeue_interval), )); tokio::spawn(add_permits_at_rate(send.clone(), options.pps())); @@ -136,6 +137,7 @@ pub(crate) async fn main( cache.clone(), send.clone(), stats.sent_packets.clone(), + options.max_retries, )) }) .collect(); @@ -175,10 +177,11 @@ async fn sender( cache: JobCache, send: Arc, sent: Arc, + max_retries: usize, ) -> Result<()> { let mut retries = 0; - while retries < MAX_RETRIES { + while retries < max_retries { let permit = send.acquire().await?; // acquire a permit let mut job = job_receiver.recv().await?; // get a job from the queue @@ -198,7 +201,7 @@ async fn sender( permit.forget(); } - if retries == MAX_RETRIES { + if retries == max_retries { Err(Error::max_retries()) } else { Ok(()) @@ -350,6 +353,7 @@ async fn receive_confirmations( job_sender: AsyncSender, confirmed_packets: Arc, read: Arc, + requeue_interval: Duration, ) -> Result<()> { // this solves a problem where a confirmation is received after a job has already been requeued let lost_confirmations: Arc>> = Default::default(); @@ -373,7 +377,7 @@ async fn receive_confirmations( .await .iter() .filter(|(_, unconfirmed)| { - unconfirmed.cached_at.unwrap().elapsed() > REQUEUE_INTERVAL + unconfirmed.cached_at.unwrap().elapsed() > requeue_interval }) .map(|(key, _)| *key) .collect();