From 11e7ce358e2520279f5e45658450374367cb3c8a Mon Sep 17 00:00:00 2001 From: chanderlud Date: Tue, 28 Nov 2023 18:11:25 -0800 Subject: [PATCH] potentially fixes a bug where the receiver is still writing when it exits --- src/main.rs | 3 ++- src/receiver/mod.rs | 43 ++++++++++++++++++++++++++++-------------- src/receiver/writer.rs | 3 ++- src/sender/mod.rs | 9 ++++----- 4 files changed, 37 insertions(+), 21 deletions(-) diff --git a/src/main.rs b/src/main.rs index 209e7f8..9c02a02 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,3 @@ -use std::{error, process}; use std::fmt::{Display, Formatter}; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; @@ -7,6 +6,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; use std::time::Duration; +use std::{error, process}; use async_ssh2_tokio::{AuthMethod, Client, ServerCheckMethod}; use clap::Parser; @@ -317,6 +317,7 @@ async fn main() { let mut options = Options::parse(); // TODO choose a better log file location + // local client should log in execution location simple_logging::log_to_file("cccp.log", options.log_level).expect("failed to log"); if options.start_port > options.end_port { diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs index f1aeaff..7014c33 100644 --- a/src/receiver/mod.rs +++ b/src/receiver/mod.rs @@ -6,12 +6,12 @@ use std::time::Duration; use log::{debug, error, info, warn}; use tokio::fs::rename; -use tokio::{io, select}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio::sync::Mutex; use tokio::task::JoinHandle; -use tokio::time::{interval, timeout}; +use tokio::time::{interval, sleep, timeout}; +use tokio::{io, select}; use crate::receiver::metadata::Metadata; use crate::receiver::writer::writer; @@ -23,6 +23,8 @@ use crate::{ mod metadata; mod writer; +type WriterQueue = Arc>>; + pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<()> { if options.destination.file_path.is_dir() { info!("destination is a folder, reformatting path with target file"); @@ -62,7 +64,7 @@ pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<( info!("opened sockets"); - let writer_queue: Queue> = Default::default(); + let writer_queue: WriterQueue = Arc::new(deadqueue::limited::Queue::new(100)); let confirmation_queue: Queue = Default::default(); let writer_handle = tokio::spawn(writer( @@ -73,12 +75,23 @@ pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<( meta_data, )); - let confirmation_handle = tokio::spawn( - send_confirmations( - control_stream, - confirmation_queue, - stats.confirmed_data) - ); + let confirmation_handle = tokio::spawn({ + let writer_queue = writer_queue.clone(); + + async move { + let result = send_confirmations( + control_stream, + confirmation_queue, + stats.confirmed_data, + ).await; + + while !writer_queue.is_empty() { + sleep(Duration::from_secs(1)).await; + } + + result + } + }); let handles: Vec<_> = sockets .into_iter() @@ -87,10 +100,12 @@ pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<( let receiver_future = async { for handle in handles { - handle.await?; + _ = handle.await; } - Ok::<(), io::Error>(()) + while !writer_queue.is_empty() { + sleep(Duration::from_secs(1)).await; + } }; select! { @@ -105,13 +120,13 @@ pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<( ) .await?; }, - receiver_future = receiver_future => error!("receiver(s) failed {:?}", receiver_future), + _ = receiver_future => info!("receiver(s) exited"), } Ok(()) } -pub(crate) async fn receiver(queue: Queue>, socket: UdpSocket) { +pub(crate) async fn receiver(queue: WriterQueue, socket: UdpSocket) { let mut buf = [0; INDEX_SIZE + TRANSFER_BUFFER_SIZE]; let mut retries = 0; @@ -122,7 +137,7 @@ pub(crate) async fn receiver(queue: Queue>, socket: UdpSocket) { retries = 0; if read > 0 { - queue.push(buf[..read].to_vec()); + queue.push(buf[..read].to_vec()).await; } else { warn!("0 byte read?"); } diff --git a/src/receiver/writer.rs b/src/receiver/writer.rs index 772ddb6..c3ad4f3 100644 --- a/src/receiver/writer.rs +++ b/src/receiver/writer.rs @@ -8,11 +8,12 @@ use tokio::fs::OpenOptions; use tokio::io::{self, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; use crate::receiver::metadata::Metadata; +use crate::receiver::WriterQueue; use crate::{Queue, INDEX_SIZE, TRANSFER_BUFFER_SIZE}; pub(crate) async fn writer( path: PathBuf, - writer_queue: Queue>, + writer_queue: WriterQueue, file_size: u64, confirmation_queue: Queue, mut metadata: Metadata, diff --git a/src/sender/mod.rs b/src/sender/mod.rs index e0db29b..4840d35 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -186,12 +186,11 @@ async fn receive_confirmations( let mut cache = cache.write().await; // requeue and remove entries - for key in keys_to_remove { - if let Some(mut unconfirmed) = cache.remove(&key) { - if lost_confirmations.contains(&key) { + for index in keys_to_remove { + if let Some(mut unconfirmed) = cache.remove(&index) { + if lost_confirmations.contains(&index) { // the job is not requeued because it was confirmed while outside the cache - debug!("found lost confirmation for {}", key); - lost_confirmations.remove(&key); + lost_confirmations.remove(&index); } else { unconfirmed.cached_at = None; queue.push(unconfirmed);