Skip to content

Commit

Permalink
fixed bugs, stabilized the free space feature on unix, improved error…
Browse files Browse the repository at this point in the history
… handling, reworked sender controller, and did a bit of refactoring
  • Loading branch information
chanderlud committed Dec 21, 2023
1 parent 733f828 commit 2be6466
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 141 deletions.
15 changes: 7 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cccp"
version = "0.6.0"
version = "0.7.0"
edition = "2021"
build = "build.rs"
repository = "https://github.com/chanderlud/cccp"
Expand All @@ -10,7 +10,7 @@ authors = ["Chander Luderman <me@chanchan.dev>"]

[dependencies]
clap = { version = "4.4", features = ["derive"] }
tokio = { version = "1.34", default-features = false, features = ["macros", "fs", "io-util"] }
tokio = { version = "1.35", default-features = false, features = ["macros", "fs", "io-util"] }
futures = "0.3"
log = { version = "0.4", features = ["std"] }
async-ssh2-tokio = "0.8"
Expand All @@ -20,7 +20,7 @@ dirs = "5.0"
rpassword = "7.3"
indicatif = "0.17"
prost = "0.12"
bytesize = "1.3.0"
bytesize = "1.3"
kanal = "0.1.0-pre8"
blake3 = "1.5"
chacha20 = "0.9"
Expand All @@ -29,17 +29,16 @@ hex = "0.4"
ctr = "0.9"
aes = "0.8"
itertools = "0.12"
libc = "0.2.151"

[target.'cfg(unix)'.dependencies]
libc = "0.2.151"
nix = { version = "0.27", features = ["fs"] }

[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.52.0", features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
widestring = "1.0.2"
windows-sys = { version = "0.52", features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
widestring = "1.0"

[build-dependencies]
prost-build = "0.12.3"
prost-build = "0.12"

[profile.release]
opt-level = 3
Expand Down
26 changes: 22 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ pub(crate) enum ErrorKind {
#[cfg(windows)]
ContainsNull(widestring::error::ContainsNul<u16>),
#[cfg(unix)]
Nul(std::ffi::NulError),
Nix(nix::Error),
StripPrefix(std::path::StripPrefixError),
MissingQueue,
MaxRetries,
#[cfg(windows)]
StatusError,
Failure(u32),
EmptyPath,
}

impl From<io::Error> for Error {
Expand Down Expand Up @@ -104,10 +107,18 @@ impl From<widestring::error::ContainsNul<u16>> for Error {
}

#[cfg(unix)]
impl From<std::ffi::NulError> for Error {
fn from(error: std::ffi::NulError) -> Self {
impl From<nix::Error> for Error {
fn from(error: nix::Error) -> Self {
Self {
kind: ErrorKind::Nix(error),
}
}
}

impl From<std::path::StripPrefixError> for Error {
fn from(error: std::path::StripPrefixError) -> Self {
Self {
kind: ErrorKind::Nul(error),
kind: ErrorKind::StripPrefix(error),
}
}
}
Expand Down Expand Up @@ -149,6 +160,13 @@ impl Error {
}
}

pub(crate) fn empty_path() -> Self {
Self {
kind: ErrorKind::EmptyPath,
}
}

#[cfg(windows)]
pub(crate) fn status_error() -> Self {
Self {
kind: ErrorKind::StatusError,
Expand Down
47 changes: 25 additions & 22 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl Options {
};

format!(
"cccp --mode {} -s {} -e {} -t {} -l {} -r \"{}\"{} --control-crypto {}{} \"{}\" \"{}\"",
"cccp --mode {} -s {} -e {} -t {} -l {} -r \"{}\"{} --control-crypto {}{}{} \"{}\" \"{}\"",
mode,
self.start_port,
self.end_port,
Expand All @@ -205,6 +205,7 @@ impl Options {
stream_crypto,
self.control_crypto,
if self.overwrite { " -o" } else { "" },
if self.verify { " -v" } else { "" },
self.source,
self.destination
)
Expand Down Expand Up @@ -480,7 +481,7 @@ async fn main() -> Result<()> {
let sender = options.source.is_local();
let stats = TransferStats::default();

match options.mode {
let result = match options.mode {
Mode::Local => {
let command = options.format_command(sender);

Expand Down Expand Up @@ -586,9 +587,9 @@ async fn main() -> Result<()> {
};

select! {
_ = command_future => {},
_ = display_handle => {},
result = main_future => result?
_ = command_future => Ok(()),
_ = display_handle => Ok(()),
result = main_future => result
}
}
Mode::Remote(sender) => {
Expand All @@ -603,15 +604,19 @@ async fn main() -> Result<()> {
let remote_addr = remote_addr.ip();

if sender {
sender::main(options, stats, rts_stream, str_stream, remote_addr).await?;
sender::main(options, stats, rts_stream, str_stream, remote_addr).await
} else {
receiver::main(options, stats, rts_stream, str_stream, remote_addr).await?;
};
receiver::main(options, stats, rts_stream, str_stream, remote_addr).await
}
}
};

if let Err(error) = &result {
error!("{:?}", error);
}

info!("exiting");
Ok(())
result
}

/// opens the sockets that will be used to send data
Expand Down Expand Up @@ -719,16 +724,15 @@ async fn write_message<W: AsyncWrite + Unpin, M: Message, C: StreamCipherExt + ?
message: &M,
cipher: &mut Box<C>,
) -> Result<()> {
let len = message.encoded_len();
writer.write_u32(len as u32).await?;
let len = message.encoded_len(); // get the length of the message
writer.write_u32(len as u32).await?; // write the length of the message

let mut buffer = Vec::with_capacity(len);
message.encode(&mut buffer).unwrap();
cipher.apply_keystream(&mut buffer[..]);
let mut buffer = Vec::with_capacity(len); // create a buffer to write the message into
message.encode(&mut buffer).unwrap(); // encode the message into the buffer (infallible)
cipher.apply_keystream(&mut buffer[..]); // encrypt the message

writer.write_all(&buffer).await?;
writer.write_all(&buffer).await?; // write the message to the writer

debug!("sent message: {:?}", message);
Ok(())
}

Expand All @@ -741,14 +745,13 @@ async fn read_message<
reader: &mut R,
cipher: &mut Box<C>,
) -> Result<M> {
let len = reader.read_u32().await? as usize;
let len = reader.read_u32().await? as usize; // read the length of the message

let mut buffer = vec![0; len];
reader.read_exact(&mut buffer).await?;
cipher.apply_keystream(&mut buffer[..]);
let mut buffer = vec![0; len]; // create a buffer to read the message into
reader.read_exact(&mut buffer).await?; // read the message into the buffer
cipher.apply_keystream(&mut buffer[..]); // decrypt the message

let message = M::decode(&buffer[..])?;
debug!("received message: {:?}", message);
let message = M::decode(&buffer[..])?; // decode the message

Ok(message)
}
Expand Down
72 changes: 41 additions & 31 deletions src/receiver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use futures::stream::iter;
use std::collections::HashMap;
use std::env::current_dir;
use std::mem;
use std::net::IpAddr;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::Duration;

use futures::stream::iter;
use futures::StreamExt;
use kanal::{AsyncReceiver, AsyncSender};
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -51,7 +52,11 @@ pub(crate) async fn main(

let manifest: Manifest = read_message(&mut str_stream, &mut str_cipher).await?;
let is_dir = manifest.files.len() > 1; // if multiple files are being received, the destination should be a directory
debug!("received manifest | files={} dirs={}", manifest.files.len(), manifest.directories.len());
debug!(
"received manifest | files={} dirs={}",
manifest.files.len(),
manifest.directories.len()
);

let completed = Arc::new(Mutex::new(Vec::new()));

Expand Down Expand Up @@ -112,9 +117,12 @@ pub(crate) async fn main(
.await;

let completed = completed.lock().await.clone();
debug!("processed files | files={} completed={}", files.len(), completed.len());
debug!(
"processed files | files={} completed={}",
files.len(),
completed.len()
);

debug!("trying to get free space...");
let free_space = free_space(&options.destination.file_path)?;
debug!("free space: {}", free_space);

Expand Down Expand Up @@ -198,9 +206,9 @@ pub(crate) async fn main(
};

select! {
result = confirmation_handle => result?,
result = controller_handle => result?,
result = receiver_future => result,
result = confirmation_handle => { debug!("confirmation sender exited: {:?}", result); result? },
result = controller_handle => { debug!("controller exited: {:?}", result); result? },
result = receiver_future => { debug!("receivers exited: {:?}", result); result },
}
}

Expand Down Expand Up @@ -284,16 +292,11 @@ async fn controller<C: StreamCipherExt + ?Sized>(
Some(message::Message::Done(_)) => {
debug!("received done message");
message_sender.close();
break;
}
_ => {
error!("received {:?}", message);
break;
break Ok(());
}
_ => unreachable!("controller received unexpected message: {:?}", message),
}
}

Ok(())
}

async fn send_confirmations(
Expand Down Expand Up @@ -345,7 +348,7 @@ async fn send_confirmations(

// propagate errors from the sender thread while executing the future
select! {
result = sender_handle => result?,
result = sender_handle => { debug!("confirmation sender exited: {:?}", result); result? },
_ = future => Ok(())
}
}
Expand All @@ -363,32 +366,23 @@ async fn send_messages<W: AsyncWrite + Unpin, M: prost::Message, C: StreamCipher
Ok(())
}

// TODO this is buggy af
#[cfg(unix)]
fn free_space(path: &Path) -> Result<u64> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;

let dir = CString::new(path.as_os_str().as_bytes())?;
use nix::sys::statvfs::statvfs;

unsafe {
let mut buf: mem::MaybeUninit<libc::statvfs> = mem::MaybeUninit::uninit();
let result = libc::statvfs(dir.as_ptr(), buf.as_mut_ptr());
let path = format_path(path)?;
debug!("getting free space for {:?}", path);
let stat = statvfs(&path)?;

if result == 0 {
let stat = buf.assume_init();
Ok(stat.f_frsize as u64 * stat.f_bavail as u64)
} else {
Err(Error::status_error())
}
}
Ok(stat.blocks_available() as u64 * stat.fragment_size())
}

#[cfg(windows)]
fn free_space(path: &Path) -> Result<u64> {
use widestring::U16CString;
use windows_sys::Win32::Storage::FileSystem;

let path = format_path(path)?;
let path = U16CString::from_os_str(path)?;

let mut free_bytes = 0_u64;
Expand All @@ -409,3 +403,19 @@ fn free_space(path: &Path) -> Result<u64> {
Ok(free_bytes)
}
}

/// returns the absolute path of the first existing parent directory
fn format_path(path: &Path) -> Result<PathBuf> {
let mut path = path.to_path_buf();

if !path.is_absolute() {
let working_dir = current_dir()?;
path = working_dir.join(path);
}

while !path.exists() {
path = path.parent().ok_or(Error::empty_path())?.to_path_buf();
}

Ok(path)
}
1 change: 1 addition & 0 deletions src/receiver/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ pub(crate) async fn writer(
}

details.rename().await?; // rename the file
debug!("sending end message for {}", details.id);
message_sender.send(Message::end(details.id)).await?; // send the end message

Ok(())
Expand Down
Loading

0 comments on commit 2be6466

Please sign in to comment.