Skip to content

Commit

Permalink
added max retries and requeue interval to options, changed default st…
Browse files Browse the repository at this point in the history
…ream crypto to CHACHA8
  • Loading branch information
chanderlud committed Jan 5, 2024
1 parent 93d7314 commit 87093a4
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 14 deletions.
4 changes: 0 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,9 @@ const WRITE_BUFFER_SIZE: usize = TRANSFER_BUFFER_SIZE * 100;
const TRANSFER_BUFFER_SIZE: usize = 1024;
const INDEX_SIZE: usize = std::mem::size_of::<u64>();
const ID_SIZE: usize = std::mem::size_of::<u32>();
const MAX_RETRIES: usize = 10;
// UDP header + ID + INDEX + DATA
const PACKET_SIZE: usize = 8 + ID_SIZE + INDEX_SIZE + TRANSFER_BUFFER_SIZE;

// how long to wait for a job to be confirmed before requeuing it
const REQUEUE_INTERVAL: Duration = Duration::from_millis(1_000);

#[derive(Clone)]
struct TransferStats {
confirmed_packets: Arc<AtomicUsize>,
Expand Down
12 changes: 11 additions & 1 deletion src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub(crate) struct Options {
pub(crate) control_crypto: Crypto,

/// Encrypt the data stream
#[clap(short = 'S', long, default_value = "NONE")]
#[clap(short = 'S', long, default_value = "CHACHA20")]
pub(crate) stream_crypto: Crypto,

/// Receive timeout in MS
Expand All @@ -89,6 +89,14 @@ pub(crate) struct Options {
#[clap(short, long, default_value_t = 1_000)]
pub(crate) job_limit: usize,

/// Requeue interval in MS
#[clap(short = 'i', long, default_value_t = 1_000)]
pub(crate) requeue_interval: u64,

/// Maximum number of send/receive retries
#[clap(short = 'M', long, default_value_t = 10)]
pub(crate) max_retries: usize,

/// Command to execute cccp
#[clap(short = 'E', long, default_value = "cccp")]
command: String,
Expand Down Expand Up @@ -147,6 +155,8 @@ impl Options {
format!("-j {}", self.job_limit),
format!("-c {}", self.control_crypto),
format!("-S {}", self.stream_crypto),
format!("-i {}", self.requeue_interval),
format!("-M {}", self.max_retries),
format!("\"{}\"", self.source),
format!("\"{}\"", self.destination),
];
Expand Down
22 changes: 17 additions & 5 deletions src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::error::Error;
use crate::items::{message, ConfirmationIndexes, Message, StartIndex};
use crate::receiver::writer::{writer, FileDetails, SplitQueue};
use crate::{
socket_factory, CipherStream, Options, Result, TransferStats, ID_SIZE, INDEX_SIZE, MAX_RETRIES,
socket_factory, CipherStream, Options, Result, TransferStats, ID_SIZE, INDEX_SIZE,
TRANSFER_BUFFER_SIZE,
};

Expand Down Expand Up @@ -212,7 +212,14 @@ pub(crate) async fn main(

let handles: Vec<_> = sockets
.into_iter()
.map(|socket| tokio::spawn(receiver(writer_queue.clone(), socket, receive_timeout)))
.map(|socket| {
tokio::spawn(receiver(
writer_queue.clone(),
socket,
receive_timeout,
options.max_retries,
))
})
.collect();

let receiver_future = async {
Expand All @@ -232,11 +239,16 @@ pub(crate) async fn main(
}
}

async fn receiver(queue: WriterQueue, socket: UdpSocket, receive_timeout: Duration) -> Result<()> {
async fn receiver(
queue: WriterQueue,
socket: UdpSocket,
receive_timeout: Duration,
max_retries: usize,
) -> Result<()> {
let mut buf = [0; ID_SIZE + INDEX_SIZE + TRANSFER_BUFFER_SIZE]; // buffer for receiving data
let mut retries = 0; // counter to keep track of retries

while retries < MAX_RETRIES {
while retries < max_retries {
match timeout(receive_timeout, socket.recv(&mut buf)).await {
Ok(Ok(read)) if read > 0 => {
retries = 0; // reset retries
Expand All @@ -263,7 +275,7 @@ async fn receiver(queue: WriterQueue, socket: UdpSocket, receive_timeout: Durati
}
}

if retries == MAX_RETRIES {
if retries == max_retries {
Err(Error::max_retries())
} else {
Ok(())
Expand Down
12 changes: 8 additions & 4 deletions src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::items::{message, Confirmations, FileDetail, Manifest, Message, StartI
use crate::sender::reader::reader;
use crate::{
hash_file, socket_factory, CipherStream, Options, Result, TransferStats, ID_SIZE, INDEX_SIZE,
MAX_RETRIES, REQUEUE_INTERVAL, TRANSFER_BUFFER_SIZE,
TRANSFER_BUFFER_SIZE,
};

mod reader;
Expand Down Expand Up @@ -122,6 +122,7 @@ pub(crate) async fn main(
job_sender.clone(),
stats.confirmed_packets.clone(),
read.clone(),
Duration::from_millis(options.requeue_interval),
));

tokio::spawn(add_permits_at_rate(send.clone(), options.pps()));
Expand All @@ -136,6 +137,7 @@ pub(crate) async fn main(
cache.clone(),
send.clone(),
stats.sent_packets.clone(),
options.max_retries,
))
})
.collect();
Expand Down Expand Up @@ -175,10 +177,11 @@ async fn sender(
cache: JobCache,
send: Arc<Semaphore>,
sent: Arc<AtomicUsize>,
max_retries: usize,
) -> Result<()> {
let mut retries = 0;

while retries < MAX_RETRIES {
while retries < max_retries {
let permit = send.acquire().await?; // acquire a permit
let mut job = job_receiver.recv().await?; // get a job from the queue

Expand All @@ -198,7 +201,7 @@ async fn sender(
permit.forget();
}

if retries == MAX_RETRIES {
if retries == max_retries {
Err(Error::max_retries())
} else {
Ok(())
Expand Down Expand Up @@ -350,6 +353,7 @@ async fn receive_confirmations(
job_sender: AsyncSender<Job>,
confirmed_packets: Arc<AtomicUsize>,
read: Arc<Semaphore>,
requeue_interval: Duration,
) -> Result<()> {
// this solves a problem where a confirmation is received after a job has already been requeued
let lost_confirmations: Arc<Mutex<HashSet<(u32, u64)>>> = Default::default();
Expand All @@ -373,7 +377,7 @@ async fn receive_confirmations(
.await
.iter()
.filter(|(_, unconfirmed)| {
unconfirmed.cached_at.unwrap().elapsed() > REQUEUE_INTERVAL
unconfirmed.cached_at.unwrap().elapsed() > requeue_interval
})
.map(|(key, _)| *key)
.collect();
Expand Down

0 comments on commit 87093a4

Please sign in to comment.