From f04a05102104be9c6323d60be49fe6aa50984d0a Mon Sep 17 00:00:00 2001 From: chanderlud Date: Sun, 24 Dec 2023 09:57:27 +0000 Subject: [PATCH] bug fixes, improved error handling, stabilized some new options, and a lot of refactoring --- Cargo.toml | 2 +- src/error.rs | 29 ++++++++++++++++++++++ src/items.proto | 1 + src/items.rs | 8 ++++-- src/options.rs | 56 ++++++++++++++++++++++++------------------ src/receiver/mod.rs | 43 ++++++++++++++++++++++---------- src/receiver/writer.rs | 18 ++++++-------- src/sender/mod.rs | 28 ++++++++++++++++----- 8 files changed, 129 insertions(+), 56 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 92bc1d3..5bc70d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cccp" -version = "0.7.0" +version = "0.8.0" edition = "2021" build = "build.rs" repository = "https://github.com/chanderlud/cccp" diff --git a/src/error.rs b/src/error.rs index b6b2f22..a36fb4d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,5 @@ use std::array::TryFromSliceError; +use std::fmt::Formatter; use std::process::{ExitCode, Termination}; use kanal::{ReceiveError, SendError}; @@ -151,6 +152,34 @@ impl Termination for Error { } } +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self.kind { + ErrorKind::Io(ref error) => write!(f, "IO error: {}", error), + ErrorKind::AddrParse(ref error) => write!(f, "Address parse error: {}", error), + ErrorKind::Decode(ref error) => write!(f, "Decode error: {}", error), + ErrorKind::Join(ref error) => write!(f, "Join error: {}", error), + ErrorKind::Send(ref error) => write!(f, "Send error: {}", error), + ErrorKind::Receive(ref error) => write!(f, "Receive error: {}", error), + ErrorKind::Acquire(ref error) => write!(f, "Acquire error: {}", error), + ErrorKind::TryFromSlice(ref error) => write!(f, "TryFromSlice error: {}", error), + #[cfg(windows)] + ErrorKind::ContainsNull(ref error) => write!(f, "ContainsNull error: {}", error), + #[cfg(unix)] + ErrorKind::Nix(ref error) => write!(f, "Nix error: {}", error), + ErrorKind::StripPrefix(ref error) => write!(f, "StripPrefix error: {}", error), + ErrorKind::Ssh(ref error) => write!(f, "SSH error: {}", error), + ErrorKind::MissingQueue => write!(f, "Missing queue"), + ErrorKind::MaxRetries => write!(f, "Max retries"), + #[cfg(windows)] + ErrorKind::StatusError => write!(f, "Status error"), + ErrorKind::Failure(ref reason) => write!(f, "Failure: {}", reason), + ErrorKind::EmptyPath => write!(f, "Empty path"), + ErrorKind::InvalidExtension => write!(f, "Invalid extension"), + } + } +} + impl Error { pub(crate) fn missing_queue() -> Self { Self { diff --git a/src/items.proto b/src/items.proto index ce4ca4e..be77be8 100644 --- a/src/items.proto +++ b/src/items.proto @@ -74,6 +74,7 @@ message End { message Failure { uint32 id = 1; uint32 reason = 2; + optional string description = 3; } // signals the receiver that the sender won't start new transfers diff --git a/src/items.rs b/src/items.rs index 2840235..ebeaf85 100644 --- a/src/items.rs +++ b/src/items.rs @@ -28,9 +28,13 @@ impl Message { } } - pub(crate) fn failure(id: u32, reason: u32) -> Self { + pub(crate) fn failure(id: u32, reason: u32, description: Option) -> Self { Self { - message: Some(message::Message::Failure(Failure { id, reason })), + message: Some(message::Message::Failure(Failure { + id, + reason, + description, + })), } } diff --git a/src/options.rs b/src/options.rs index 2149511..c1f4bc6 100644 --- a/src/options.rs +++ b/src/options.rs @@ -33,11 +33,11 @@ pub(crate) struct Options { pub(crate) start_port: u16, /// The last port to use - #[clap(short, long, default_value_t = 50099)] + #[clap(short, long, default_value_t = 50009)] pub(crate) end_port: u16, /// The number of threads to use - #[clap(short, long, default_value_t = 98)] + #[clap(short, long, default_value_t = 8)] pub(crate) threads: usize, /// The log level [debug, info, warn, error] @@ -87,29 +87,37 @@ pub(crate) struct Options { impl Options { pub(crate) fn format_command(&self, sender: bool) -> String { - let mode = if sender { "rr" } else { "rs" }; + let mut arguments = vec![ + String::from("cccp"), + format!("--mode {}", if sender { "rr" } else { "rs" }), + format!("-s {}", self.start_port), + format!("-e {}", self.end_port), + format!("-t {}", self.threads), + format!("-l {}", self.log_level), + format!("-r \"{}\"", self.rate), + format!("--control-crypto {}", self.control_crypto), + ]; + + if let Some(ref crypto) = self.stream_crypto { + arguments.push(format!(" --stream-crypto {}", crypto)) + } - let stream_crypto = if let Some(ref crypto) = self.stream_crypto { - format!(" --stream-crypto {}", crypto) - } else { - String::new() - }; - - format!( - "cccp --mode {} -s {} -e {} -t {} -l {} -r \"{}\"{} --control-crypto {}{}{} \"{}\" \"{}\"", - mode, - self.start_port, - self.end_port, - self.threads, - self.log_level, - self.rate, - stream_crypto, - self.control_crypto, - if self.overwrite { " -o" } else { "" }, - if self.verify { " -v" } else { "" }, - self.source, - self.destination - ) + if self.overwrite { + arguments.push(String::from("-o")) + } + + if self.verify { + arguments.push(String::from("-v")) + } + + if self.recursive { + arguments.push(String::from("-R")) + } + + arguments.push(format!("\"{}\"", self.source)); + arguments.push(format!("\"{}\"", self.destination)); + + arguments.join(" ") } pub(crate) fn pps(&self) -> u64 { diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs index e35575b..eab36fa 100644 --- a/src/receiver/mod.rs +++ b/src/receiver/mod.rs @@ -12,7 +12,7 @@ use futures::stream::iter; use futures::{StreamExt, TryStreamExt}; use kanal::{AsyncReceiver, AsyncSender}; use log::{debug, error, info, warn}; -use tokio::fs::{create_dir_all, metadata}; +use tokio::fs::{create_dir, metadata}; use tokio::io::AsyncWrite; use tokio::net::{TcpStream, UdpSocket}; use tokio::select; @@ -62,12 +62,14 @@ pub(crate) async fn main( let filtered_files = manifest.files.into_iter().filter_map(|(id, details)| { // formats the path to the file locally - let path = if is_dir { + let path = if is_dir || options.destination.file_path.is_dir() { options.destination.file_path.join(&details.path) } else { options.destination.file_path.clone() }; + debug!("formatted local path {:?} for {:?}", path, details.path); + if path.exists() && !options.overwrite { completed.push(id); None @@ -89,7 +91,7 @@ pub(crate) async fn main( .to_owned() + ".partial" } else { - ".partial".to_string() + "partial".to_string() }; let partial_path = path.with_extension(partial_extension); @@ -132,7 +134,12 @@ pub(crate) async fn main( stats.total_data.load(Relaxed) ); - write_message(&mut str_stream, &Message::failure(0, 1), &mut str_cipher).await?; + write_message( + &mut str_stream, + &Message::failure(0, 1, None), + &mut str_cipher, + ) + .await?; return Err(Error::failure(1)); } @@ -148,15 +155,20 @@ pub(crate) async fn main( // if the destination is a directory, create it if is_dir { - debug!("creating directory {:?}", options.destination.file_path); - create_dir_all(&options.destination.file_path).await?; - } + if !options.destination.file_path.exists() { + debug!("creating directory {:?}", options.destination.file_path); + create_dir(&options.destination.file_path).await?; + } + + // create the local directories needed to write the files + for dir in &manifest.directories { + let local_dir = options.destination.file_path.join(dir); - // create the local directories needed to write the files - for dir in &manifest.directories { - let local_dir = options.destination.file_path.join(dir); - debug!("creating directory {:?}", local_dir); - create_dir_all(local_dir).await?; + if !local_dir.exists() { + debug!("creating directory {:?}", local_dir); + create_dir(local_dir).await?; + } + } } let sockets = socket_factory( @@ -248,6 +260,7 @@ async fn controller( mut str_cipher: Box, ) -> Result<()> { loop { + debug!("waiting for message"); let message: Message = read_message(&mut str_stream, &mut str_cipher).await?; match message.message { @@ -284,9 +297,13 @@ async fn controller( let details = details.clone(); async move { - let result = writer(details, queue, confirmation, message).await; + let result = writer(&details, queue, confirmation, &message).await; if let Err(error) = result { + message + .send(Message::failure(details.id, 2, Some(error.to_string()))) + .await + .unwrap(); error!("writer failed: {:?}", error); } } diff --git a/src/receiver/writer.rs b/src/receiver/writer.rs index 3ba6097..4d28777 100644 --- a/src/receiver/writer.rs +++ b/src/receiver/writer.rs @@ -40,17 +40,13 @@ impl SplitQueue { receivers.get(id).cloned() } - // TODO benchmark this pub(crate) async fn send(&self, job: Job, id: u32) -> Result<()> { - let sender_option = { + let sender = { let senders = self.senders.lock().await; - senders.get(&id).cloned() + senders.get(&id).ok_or(Error::missing_queue())?.clone() }; - if let Some(sender) = sender_option { - sender.send(job).await?; - } - + sender.send(job).await?; Ok(()) } } @@ -75,10 +71,10 @@ impl FileDetails { } pub(crate) async fn writer( - details: FileDetails, + details: &FileDetails, writer_queue: WriterQueue, confirmation_sender: AsyncSender<(u32, u64)>, - message_sender: AsyncSender, + message_sender: &AsyncSender, ) -> Result<()> { let file = OpenOptions::new() .write(true) @@ -158,7 +154,9 @@ pub(crate) async fn writer( let local_hash = hash_file(&details.partial_path).await?; // hash the file if local_hash.as_bytes() != &remote_signature[..] { - message_sender.send(Message::failure(details.id, 0)).await?; // notify the sender + message_sender + .send(Message::failure(details.id, 0, None)) + .await?; // notify the sender remove_file(&details.partial_path).await?; // remove the partial file return Err(Error::failure(0)); } else { diff --git a/src/sender/mod.rs b/src/sender/mod.rs index 553e722..99c0949 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -70,6 +70,12 @@ pub(crate) async fn main( stats.total_data.fetch_sub(details.size as usize, Relaxed); } } + + if manifest.files.is_empty() { + info!("all files completed"); + write_message(&mut str_stream, &Message::done(), &mut str_cipher).await?; + return Ok(()); + } } Some(message::Message::Failure(failure)) => { error!("received failure message {}", failure.reason); @@ -273,6 +279,19 @@ async fn controller( ); } } + Some(message::Message::Failure(failure)) if failure.reason == 2 => { + if active.remove(&failure.id).is_some() { + error!( + "remote writer failed {} [TRANSFER WILL NOT BE RETRIED]", + failure.description.unwrap() + ); + } else { + warn!( + "received writer failure message {} for unknown file {}", + failure.reason, failure.id + ); + } + } Some(message::Message::Failure(failure)) => { warn!( "received unknown failure message {} for file {}", @@ -546,13 +565,10 @@ fn files_and_dirs( let path = entry.path(); if path.is_dir() { - dirs.push(path.clone()); - - if dirs.len() > 1 && !recursive { - continue; + if recursive { + dirs.push(path.clone()); + files_and_dirs(&path, files, dirs, recursive)?; } - - files_and_dirs(&path, files, dirs, recursive)?; } else { files.push(path); }