Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
chanderlud committed Nov 29, 2023
1 parent 714b198 commit 47b0286
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 42 deletions.
105 changes: 66 additions & 39 deletions src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use std::time::Duration;

use log::{debug, error, info, warn};
use tokio::fs::rename;
use tokio::io;
use tokio::{io, select};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::Mutex;
use tokio::time::{interval, sleep};
use tokio::task::JoinHandle;
use tokio::time::{interval, timeout};

use crate::receiver::metadata::Metadata;
use crate::receiver::writer::writer;
Expand Down Expand Up @@ -72,28 +73,40 @@ pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<(
meta_data,
));

tokio::spawn(async {
if let Err(error) =
send_confirmations(control_stream, confirmation_queue, stats.confirmed_data).await
{
error!("confirmation sender failed: {}", error);
}
});
let confirmation_handle = tokio::spawn(
send_confirmations(
control_stream,
confirmation_queue,
stats.confirmed_data)
);

let _: Vec<_> = sockets
let handles: Vec<_> = sockets
.into_iter()
.map(|socket| tokio::spawn(receiver(writer_queue.clone(), socket)))
.collect();

let writer_result = writer_handle.await;
info!("writer finished with result {:?}", writer_result);
let receiver_future = async {
for handle in handles {
handle.await?;
}

// rename the partial file to the original file
rename(
&options.destination.file_path.with_extension("partial"),
&options.destination.file_path,
)
.await?;
Ok::<(), io::Error>(())
};

select! {
result = confirmation_handle => error!("confirmation sender failed {:?}", result),
result = writer_handle => {
info!("writer finished with result {:?}", result);

// rename the partial file to the original file
rename(
&options.destination.file_path.with_extension("partial"),
&options.destination.file_path,
)
.await?;
},
receiver_future = receiver_future => error!("receiver(s) failed {:?}", receiver_future),
}

Ok(())
}
Expand All @@ -103,18 +116,29 @@ pub(crate) async fn receiver(queue: Queue<Vec<u8>>, socket: UdpSocket) {
let mut retries = 0;

loop {
match socket.recv(&mut buf).await {
Ok(read) => {
retries = 0;

if read > 0 {
queue.push(buf[..read].to_vec());
} else {
warn!("0 byte read?");
match timeout(Duration::from_secs(5), socket.recv(&mut buf)).await {
Ok(recv_result) => match recv_result {
Ok(read) => {
retries = 0;

if read > 0 {
queue.push(buf[..read].to_vec());
} else {
warn!("0 byte read?");
}
}
}
Err(error) => {
error!("failed to receive data {}", error);
Err(error) => {
error!("failed to receive data {}", error);

if retries < MAX_RETRIES {
retries += 1;
} else {
break;
}
}
},
Err(_timeout) => {
error!("recv timed out");

if retries < MAX_RETRIES {
retries += 1;
Expand All @@ -133,7 +157,7 @@ async fn send_confirmations(
) -> io::Result<()> {
let data: Arc<Mutex<Vec<u64>>> = Default::default();

tokio::spawn({
let sender_handle: JoinHandle<io::Result<()>> = tokio::spawn({
let data = data.clone();

async move {
Expand All @@ -149,19 +173,22 @@ async fn send_confirmations(
}

let indexes = mem::take(&mut *data);

if let Err(error) = send_indexes(&mut control_stream, &indexes).await {
error!("failed to send indexes: {}", error);
sleep(Duration::from_millis(10)).await;
}
send_indexes(&mut control_stream, &indexes).await?;
}
}
});

loop {
let index = queue.pop().await;
confirmed_data.fetch_add(TRANSFER_BUFFER_SIZE, Relaxed);
data.lock().await.push(index);
let future = async {
loop {
let index = queue.pop().await;
confirmed_data.fetch_add(TRANSFER_BUFFER_SIZE, Relaxed);
data.lock().await.push(index);
}
};

select! {
result = sender_handle => result?,
_ = future => Ok(())
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::Duration;

use log::{debug, error, info, warn};
use log::{debug, error, info};
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpStream, UdpSocket};
use tokio::select;
Expand Down Expand Up @@ -93,8 +93,10 @@ pub(crate) async fn main(options: Options, stats: TransferStats) -> Result<()> {

let sender_future = async {
for handle in handles {
let _ = handle.await;
handle.await?;
}

Ok::<(), io::Error>(())
};

let reader_future = async {
Expand All @@ -110,7 +112,7 @@ pub(crate) async fn main(options: Options, stats: TransferStats) -> Result<()> {

select! {
_ = reader_future => {},
_ = sender_future => { warn!("senders exited") },
result = sender_future => error!("sender(s) failed {:?}", result),
result = confirmation_handle => {
// the confirmation receiver never exits unless an error occurs
error!("confirmation receiver exited with result {:?}", result);
Expand Down

0 comments on commit 47b0286

Please sign in to comment.