diff --git a/Cargo.toml b/Cargo.toml index 9fd2be0..6369b49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cccp" -version = "0.6.0" +version = "0.7.0" edition = "2021" build = "build.rs" repository = "https://github.com/chanderlud/cccp" @@ -10,7 +10,7 @@ authors = ["Chander Luderman "] [dependencies] clap = { version = "4.4", features = ["derive"] } -tokio = { version = "1.34", default-features = false, features = ["macros", "fs", "io-util"] } +tokio = { version = "1.35", default-features = false, features = ["macros", "fs", "io-util"] } futures = "0.3" log = { version = "0.4", features = ["std"] } async-ssh2-tokio = "0.8" @@ -20,7 +20,7 @@ dirs = "5.0" rpassword = "7.3" indicatif = "0.17" prost = "0.12" -bytesize = "1.3.0" +bytesize = "1.3" kanal = "0.1.0-pre8" blake3 = "1.5" chacha20 = "0.9" @@ -29,17 +29,16 @@ hex = "0.4" ctr = "0.9" aes = "0.8" itertools = "0.12" -libc = "0.2.151" [target.'cfg(unix)'.dependencies] -libc = "0.2.151" +nix = { version = "0.27", features = ["fs"] } [target.'cfg(windows)'.dependencies] -windows-sys = { version = "0.52.0", features = ["Win32_Storage_FileSystem", "Win32_Foundation"] } -widestring = "1.0.2" +windows-sys = { version = "0.52", features = ["Win32_Storage_FileSystem", "Win32_Foundation"] } +widestring = "1.0" [build-dependencies] -prost-build = "0.12.3" +prost-build = "0.12" [profile.release] opt-level = 3 diff --git a/src/error.rs b/src/error.rs index e3cb9f9..f52cd23 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,11 +23,14 @@ pub(crate) enum ErrorKind { #[cfg(windows)] ContainsNull(widestring::error::ContainsNul), #[cfg(unix)] - Nul(std::ffi::NulError), + Nix(nix::Error), + StripPrefix(std::path::StripPrefixError), MissingQueue, MaxRetries, + #[cfg(windows)] StatusError, Failure(u32), + EmptyPath, } impl From for Error { @@ -104,10 +107,18 @@ impl From> for Error { } #[cfg(unix)] -impl From for Error { - fn from(error: std::ffi::NulError) -> Self { +impl From for Error { + fn from(error: nix::Error) -> Self { + Self { + kind: ErrorKind::Nix(error), + } + } +} + +impl From for Error { + fn from(error: std::path::StripPrefixError) -> Self { Self { - kind: ErrorKind::Nul(error), + kind: ErrorKind::StripPrefix(error), } } } @@ -149,6 +160,13 @@ impl Error { } } + pub(crate) fn empty_path() -> Self { + Self { + kind: ErrorKind::EmptyPath, + } + } + + #[cfg(windows)] pub(crate) fn status_error() -> Self { Self { kind: ErrorKind::StatusError, diff --git a/src/main.rs b/src/main.rs index 259ff17..89e5905 100644 --- a/src/main.rs +++ b/src/main.rs @@ -195,7 +195,7 @@ impl Options { }; format!( - "cccp --mode {} -s {} -e {} -t {} -l {} -r \"{}\"{} --control-crypto {}{} \"{}\" \"{}\"", + "cccp --mode {} -s {} -e {} -t {} -l {} -r \"{}\"{} --control-crypto {}{}{} \"{}\" \"{}\"", mode, self.start_port, self.end_port, @@ -205,6 +205,7 @@ impl Options { stream_crypto, self.control_crypto, if self.overwrite { " -o" } else { "" }, + if self.verify { " -v" } else { "" }, self.source, self.destination ) @@ -480,7 +481,7 @@ async fn main() -> Result<()> { let sender = options.source.is_local(); let stats = TransferStats::default(); - match options.mode { + let result = match options.mode { Mode::Local => { let command = options.format_command(sender); @@ -586,9 +587,9 @@ async fn main() -> Result<()> { }; select! { - _ = command_future => {}, - _ = display_handle => {}, - result = main_future => result? + _ = command_future => Ok(()), + _ = display_handle => Ok(()), + result = main_future => result } } Mode::Remote(sender) => { @@ -603,15 +604,19 @@ async fn main() -> Result<()> { let remote_addr = remote_addr.ip(); if sender { - sender::main(options, stats, rts_stream, str_stream, remote_addr).await?; + sender::main(options, stats, rts_stream, str_stream, remote_addr).await } else { - receiver::main(options, stats, rts_stream, str_stream, remote_addr).await?; - }; + receiver::main(options, stats, rts_stream, str_stream, remote_addr).await + } } + }; + + if let Err(error) = &result { + error!("{:?}", error); } info!("exiting"); - Ok(()) + result } /// opens the sockets that will be used to send data @@ -719,16 +724,15 @@ async fn write_message, ) -> Result<()> { - let len = message.encoded_len(); - writer.write_u32(len as u32).await?; + let len = message.encoded_len(); // get the length of the message + writer.write_u32(len as u32).await?; // write the length of the message - let mut buffer = Vec::with_capacity(len); - message.encode(&mut buffer).unwrap(); - cipher.apply_keystream(&mut buffer[..]); + let mut buffer = Vec::with_capacity(len); // create a buffer to write the message into + message.encode(&mut buffer).unwrap(); // encode the message into the buffer (infallible) + cipher.apply_keystream(&mut buffer[..]); // encrypt the message - writer.write_all(&buffer).await?; + writer.write_all(&buffer).await?; // write the message to the writer - debug!("sent message: {:?}", message); Ok(()) } @@ -741,14 +745,13 @@ async fn read_message< reader: &mut R, cipher: &mut Box, ) -> Result { - let len = reader.read_u32().await? as usize; + let len = reader.read_u32().await? as usize; // read the length of the message - let mut buffer = vec![0; len]; - reader.read_exact(&mut buffer).await?; - cipher.apply_keystream(&mut buffer[..]); + let mut buffer = vec![0; len]; // create a buffer to read the message into + reader.read_exact(&mut buffer).await?; // read the message into the buffer + cipher.apply_keystream(&mut buffer[..]); // decrypt the message - let message = M::decode(&buffer[..])?; - debug!("received message: {:?}", message); + let message = M::decode(&buffer[..])?; // decode the message Ok(message) } diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs index 418db3d..3886e9d 100644 --- a/src/receiver/mod.rs +++ b/src/receiver/mod.rs @@ -1,13 +1,14 @@ -use futures::stream::iter; use std::collections::HashMap; +use std::env::current_dir; use std::mem; use std::net::IpAddr; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; use std::time::Duration; +use futures::stream::iter; use futures::StreamExt; use kanal::{AsyncReceiver, AsyncSender}; use log::{debug, error, info, warn}; @@ -51,7 +52,11 @@ pub(crate) async fn main( let manifest: Manifest = read_message(&mut str_stream, &mut str_cipher).await?; let is_dir = manifest.files.len() > 1; // if multiple files are being received, the destination should be a directory - debug!("received manifest | files={} dirs={}", manifest.files.len(), manifest.directories.len()); + debug!( + "received manifest | files={} dirs={}", + manifest.files.len(), + manifest.directories.len() + ); let completed = Arc::new(Mutex::new(Vec::new())); @@ -112,9 +117,12 @@ pub(crate) async fn main( .await; let completed = completed.lock().await.clone(); - debug!("processed files | files={} completed={}", files.len(), completed.len()); + debug!( + "processed files | files={} completed={}", + files.len(), + completed.len() + ); - debug!("trying to get free space..."); let free_space = free_space(&options.destination.file_path)?; debug!("free space: {}", free_space); @@ -198,9 +206,9 @@ pub(crate) async fn main( }; select! { - result = confirmation_handle => result?, - result = controller_handle => result?, - result = receiver_future => result, + result = confirmation_handle => { debug!("confirmation sender exited: {:?}", result); result? }, + result = controller_handle => { debug!("controller exited: {:?}", result); result? }, + result = receiver_future => { debug!("receivers exited: {:?}", result); result }, } } @@ -284,16 +292,11 @@ async fn controller( Some(message::Message::Done(_)) => { debug!("received done message"); message_sender.close(); - break; - } - _ => { - error!("received {:?}", message); - break; + break Ok(()); } + _ => unreachable!("controller received unexpected message: {:?}", message), } } - - Ok(()) } async fn send_confirmations( @@ -345,7 +348,7 @@ async fn send_confirmations( // propagate errors from the sender thread while executing the future select! { - result = sender_handle => result?, + result = sender_handle => { debug!("confirmation sender exited: {:?}", result); result? }, _ = future => Ok(()) } } @@ -363,25 +366,15 @@ async fn send_messages Result { - use std::ffi::CString; - use std::os::unix::ffi::OsStrExt; - - let dir = CString::new(path.as_os_str().as_bytes())?; + use nix::sys::statvfs::statvfs; - unsafe { - let mut buf: mem::MaybeUninit = mem::MaybeUninit::uninit(); - let result = libc::statvfs(dir.as_ptr(), buf.as_mut_ptr()); + let path = format_path(path)?; + debug!("getting free space for {:?}", path); + let stat = statvfs(&path)?; - if result == 0 { - let stat = buf.assume_init(); - Ok(stat.f_frsize as u64 * stat.f_bavail as u64) - } else { - Err(Error::status_error()) - } - } + Ok(stat.blocks_available() as u64 * stat.fragment_size()) } #[cfg(windows)] @@ -389,6 +382,7 @@ fn free_space(path: &Path) -> Result { use widestring::U16CString; use windows_sys::Win32::Storage::FileSystem; + let path = format_path(path)?; let path = U16CString::from_os_str(path)?; let mut free_bytes = 0_u64; @@ -409,3 +403,19 @@ fn free_space(path: &Path) -> Result { Ok(free_bytes) } } + +/// returns the absolute path of the first existing parent directory +fn format_path(path: &Path) -> Result { + let mut path = path.to_path_buf(); + + if !path.is_absolute() { + let working_dir = current_dir()?; + path = working_dir.join(path); + } + + while !path.exists() { + path = path.parent().ok_or(Error::empty_path())?.to_path_buf(); + } + + Ok(path) +} diff --git a/src/receiver/writer.rs b/src/receiver/writer.rs index 293ba49..3ba6097 100644 --- a/src/receiver/writer.rs +++ b/src/receiver/writer.rs @@ -167,6 +167,7 @@ pub(crate) async fn writer( } details.rename().await?; // rename the file + debug!("sending end message for {}", details.id); message_sender.send(Message::end(details.id)).await?; // send the end message Ok(()) diff --git a/src/sender/mod.rs b/src/sender/mod.rs index 25f762e..0c7e37c 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::net::IpAddr; use std::path::{Path, PathBuf}; @@ -7,6 +8,8 @@ use std::sync::Arc; use std::time::Duration; use aes::cipher::crypto_common::rand_core::OsRng; +use futures::stream::iter; +use futures::{StreamExt, TryStreamExt}; use kanal::{AsyncReceiver, AsyncSender}; use log::{debug, error, info, warn}; use rand::RngCore; @@ -49,14 +52,18 @@ pub(crate) async fn main( let rts_cipher = make_cipher(&options.control_crypto); let mut manifest = build_manifest( - options.source.file_path.clone(), + &options.source.file_path, options.verify, &stats.total_data, &options.stream_crypto, ) .await?; - debug!("sending manifest | files={} dirs={}", manifest.files.len(), manifest.directories.len()); + debug!( + "sending manifest | files={} dirs={}", + manifest.files.len(), + manifest.directories.len() + ); write_message(&mut str_stream, &manifest, &mut str_cipher).await?; let message: Message = read_message(&mut str_stream, &mut str_cipher).await?; @@ -75,7 +82,7 @@ pub(crate) async fn main( error!("received failure message {}", failure.reason); return Err(Error::failure(failure.reason)); } - _ => unreachable!(), + _ => unreachable!("received unexpected message: {:?}", message), } let sockets = socket_factory( @@ -156,10 +163,10 @@ pub(crate) async fn main( // propagate the first error select! { - result = confirmation_handle => result?, - result = controller_handle => result?, - result = sender_future => result, - result = receiver_handle => result?, + result = confirmation_handle => { debug!("confirmation receiver exited: {:?}", result); result? }, + result = controller_handle => { debug!("controller exited: {:?}", result); result? }, + result = sender_future => { debug!("senders exited: {:?}", result); result }, + result = receiver_handle => { debug!("message receiver exited: {:?}", result); result? }, } } @@ -198,7 +205,6 @@ async fn sender( } } -// TODO there is something wrong with the controller, seems to be starting the same file multiple times or something #[allow(clippy::too_many_arguments)] async fn controller( mut control_stream: TcpStream, @@ -247,40 +253,39 @@ async fn controller( debug!("received end message {} | active {}", end.id, active.len()); } } - Some(message::Message::Failure(failure)) => { - if failure.reason == 0 { - if let Some(details) = active.get(&failure.id) { - warn!( - "transfer {} failed signature verification, retrying...", - failure.id - ); - - confirmed_data.fetch_sub(details.size as usize, Relaxed); - - start_file_transfer( - &mut control_stream, - failure.id, - details, - &base_path, - &job_sender, - &read, - &confirmed_data, - &mut cipher, - ) - .await?; - } else { - warn!( - "received failure message {} for unknown file {}", - failure.reason, failure.id - ); - } + Some(message::Message::Failure(failure)) if failure.reason == 0 => { + if let Some(details) = active.get(&failure.id) { + warn!( + "transfer {} failed signature verification, retrying...", + failure.id + ); + + confirmed_data.fetch_sub(details.size as usize, Relaxed); + + start_file_transfer( + &mut control_stream, + failure.id, + details, + &base_path, + &job_sender, + &read, + &confirmed_data, + &mut cipher, + ) + .await?; } else { warn!( - "received unknown failure message {} for {}", + "received failure message {} for unknown file {}", failure.reason, failure.id ); } } + Some(message::Message::Failure(failure)) => { + warn!( + "received unknown failure message {} for file {}", + failure.reason, failure.id + ); + } _ => unreachable!(), // only end and failure messages are sent to this receiver } @@ -317,9 +322,15 @@ async fn start_file_transfer( let details = details.clone(); let base_path = base_path.to_path_buf(); + let path = if base_path.is_dir() { + base_path.join(&details.path) + } else { + base_path + }; + async move { let result = reader( - base_path.join(&details.path), + path, job_sender, read, start_index.index, @@ -479,7 +490,7 @@ async fn split_receiver( } async fn build_manifest( - source: PathBuf, + source: &PathBuf, verify: bool, total_data: &Arc, crypto: &Option, @@ -487,58 +498,58 @@ async fn build_manifest( // collect the files and directories to send let mut files = Vec::new(); let mut dirs = Vec::new(); - files_and_dirs(&source, &mut files, &mut dirs)?; - let mut file_map: HashMap = HashMap::with_capacity(files.len()); + files_and_dirs(source, &mut files, &mut dirs)?; + let files_len = files.len(); + debug!("found {} files & {} dirs", files_len, dirs.len()); - // TODO add concurrency - for (index, mut file) in files.into_iter().enumerate() { - let size = tokio::fs::metadata(&file).await?.len(); - total_data.fetch_add(size as usize, Relaxed); + let file_map: HashMap = iter(files.into_iter().enumerate()) + .map(|(index, mut file)| async move { + let size = tokio::fs::metadata(&file).await?.len(); + total_data.fetch_add(size as usize, Relaxed); - let signature = if verify { - let hash = hash_file(&file).await?; - Some(hash.as_bytes().to_vec()) - } else { - None - }; - - if file == source { - file = PathBuf::from(file.iter().last().unwrap()) - } else { - file = file.strip_prefix(&source).unwrap().to_path_buf(); - } + let signature = if verify { + let hash = hash_file(&file).await?; + Some(hash.as_bytes().to_vec()) + } else { + None + }; - // TODO windows only - let path = file.to_string_lossy().replace('\\', "/"); + if &file == source { + file = PathBuf::from(file.iter().last().ok_or(Error::empty_path())?); + } else { + file = file.strip_prefix(source)?.to_path_buf(); + } - let mut crypto = crypto.clone(); + let mut crypto = crypto.clone(); - if let Some(ref mut crypto) = crypto { - OsRng.fill_bytes(&mut crypto.iv); - } + if let Some(ref mut crypto) = crypto { + OsRng.fill_bytes(&mut crypto.iv); + } - file_map.insert( - index as u32, - FileDetail { - path, - size, - signature, - crypto, - }, - ); - } + Ok::<(u32, FileDetail), Error>(( + index as u32, + FileDetail { + path: format_dir(file.to_string_lossy()), + size, + signature, + crypto, + }, + )) + }) + .buffer_unordered(10) + .try_collect() + .await?; let directories = dirs .into_iter() .map(|dir| { - if let Ok(file_path) = dir.strip_prefix(&source) { - file_path.to_string_lossy().to_string() + if let Ok(file_path) = dir.strip_prefix(source) { + format_dir(file_path.to_string_lossy()) } else { - dir.to_string_lossy().to_string() + format_dir(dir.to_string_lossy()) } }) - .map(|dir| dir.replace('\\', "/")) // TODO windows only .collect(); let manifest = Manifest { @@ -548,3 +559,15 @@ async fn build_manifest( Ok(manifest) } + +#[cfg(windows)] +#[inline(always)] +fn format_dir(dir: Cow<'_, str>) -> String { + dir.replace('\\', "/") // replace the windows path separator with the unix one +} + +#[cfg(not(windows))] +#[inline(always)] +fn format_dir(dir: Cow<'_, str>) -> String { + dir.to_string() +}