diff --git a/src/main.rs b/src/main.rs index 3f8ba13..a67fd61 100644 --- a/src/main.rs +++ b/src/main.rs @@ -731,6 +731,7 @@ async fn write_message Result<()> { async fn controller( mut str_stream: TcpStream, - mut files: HashMap, + files: HashMap, writer_queue: WriterQueue, confirmation_sender: AsyncSender<(u32, u64)>, confirmed_data: Arc, @@ -227,7 +227,13 @@ async fn controller( Some(message::Message::Start(message)) => { debug!("received start message: {:?}", message); - let details = files.remove(&message.id).unwrap(); + let details = match files.get(&message.id) { + Some(details) => details, + None => { + error!("received start message for unknown id {}", message.id); + continue; + } + }; let start_index = if details.partial_path.exists() { info!("partial file exists, resuming transfer"); @@ -254,6 +260,7 @@ async fn controller( let writer_queue = writer_queue.clone(); let confirmation_sender = confirmation_sender.clone(); let message_sender = message_sender.clone(); + let details = details.clone(); async move { let result = writer::( diff --git a/src/receiver/writer.rs b/src/receiver/writer.rs index 4443f9f..23fc6ad 100644 --- a/src/receiver/writer.rs +++ b/src/receiver/writer.rs @@ -56,6 +56,7 @@ impl SplitQueue { } /// stores file details for writer +#[derive(Clone)] pub(crate) struct FileDetails { pub(crate) path: PathBuf, pub(crate) partial_path: PathBuf, diff --git a/src/sender/mod.rs b/src/sender/mod.rs index 6c0008e..789e24a 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -212,29 +212,27 @@ async fn controller( mut cipher: Box, ) -> Result<()> { let mut id = 0; - let mut active = 0; + let mut active: HashMap = HashMap::with_capacity(max); loop { - if !files.is_empty() { - while active < max { - match files.get(&id) { - None => id += 1, - Some(details) => { - start_file_transfer( - &mut control_stream, - id, - details, - &base_path, - &job_sender, - &read, - &confirmed_data, - &mut cipher, - ) - .await?; - - active += 1; - id += 1 - } + while active.len() < max && !files.is_empty() { + match files.remove(&id) { + None => id += 1, + Some(details) => { + start_file_transfer( + &mut control_stream, + id, + &details, + &base_path, + &job_sender, + &read, + &confirmed_data, + &mut cipher, + ) + .await?; + + active.insert(id, details); + id += 1 } } } @@ -243,14 +241,15 @@ async fn controller( match controller_receiver.recv().await?.message { Some(message::Message::End(end)) => { - debug!("received end message {} | active {}", end.id, active); - - files.remove(&end.id); - active -= 1; + if active.remove(&end.id).is_none() { + warn!("received end message for unknown file {}", end.id); + } else { + debug!("received end message {} | active {}", end.id, active.len()); + } } Some(message::Message::Failure(failure)) => { if failure.reason == 0 { - if let Some(details) = files.get(&failure.id) { + if let Some(details) = active.get(&failure.id) { warn!( "transfer {} failed signature verification, retrying...", failure.id @@ -277,7 +276,7 @@ async fn controller( } } else { warn!( - "received failure message {} for unknown file {}", + "received unknown failure message {} for {}", failure.reason, failure.id ); } @@ -285,7 +284,7 @@ async fn controller( _ => unreachable!(), // only end and failure messages are sent to this receiver } - if files.is_empty() && active == 0 { + if files.is_empty() && active.is_empty() { break; } } @@ -312,14 +311,27 @@ async fn start_file_transfer( let start_index: StartIndex = read_message(&mut control_stream, cipher).await?; confirmed_data.fetch_add(start_index.index as usize, Relaxed); - tokio::spawn(reader( - base_path.join(&details.path), - job_sender.clone(), - read.clone(), - start_index.index, - id, - details.crypto.as_ref().map(make_cipher), - )); + tokio::spawn({ + let job_sender = job_sender.clone(); + let read = read.clone(); + let details = details.clone(); + let base_path = base_path.to_path_buf(); + + async move { + let result = reader( + base_path.join(&details.path), + job_sender, + read, + start_index.index, + id, + details.crypto.as_ref().map(make_cipher), + ).await; + + if let Err(error) = result { + error!("reader failed: {:?}", error); + } + } + }); Ok(()) }