Skip to content

Commit

Permalink
bug fixes, improved error handling, stabilized some new options, and …
Browse files Browse the repository at this point in the history
…a lot of refactoring
  • Loading branch information
chanderlud committed Dec 24, 2023
1 parent 82a2a7e commit f04a051
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cccp"
version = "0.7.0"
version = "0.8.0"
edition = "2021"
build = "build.rs"
repository = "https://github.com/chanderlud/cccp"
Expand Down
29 changes: 29 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::array::TryFromSliceError;
use std::fmt::Formatter;
use std::process::{ExitCode, Termination};

use kanal::{ReceiveError, SendError};
Expand Down Expand Up @@ -151,6 +152,34 @@ impl Termination for Error {
}
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self.kind {
ErrorKind::Io(ref error) => write!(f, "IO error: {}", error),
ErrorKind::AddrParse(ref error) => write!(f, "Address parse error: {}", error),
ErrorKind::Decode(ref error) => write!(f, "Decode error: {}", error),
ErrorKind::Join(ref error) => write!(f, "Join error: {}", error),
ErrorKind::Send(ref error) => write!(f, "Send error: {}", error),
ErrorKind::Receive(ref error) => write!(f, "Receive error: {}", error),
ErrorKind::Acquire(ref error) => write!(f, "Acquire error: {}", error),
ErrorKind::TryFromSlice(ref error) => write!(f, "TryFromSlice error: {}", error),
#[cfg(windows)]
ErrorKind::ContainsNull(ref error) => write!(f, "ContainsNull error: {}", error),
#[cfg(unix)]
ErrorKind::Nix(ref error) => write!(f, "Nix error: {}", error),
ErrorKind::StripPrefix(ref error) => write!(f, "StripPrefix error: {}", error),
ErrorKind::Ssh(ref error) => write!(f, "SSH error: {}", error),
ErrorKind::MissingQueue => write!(f, "Missing queue"),
ErrorKind::MaxRetries => write!(f, "Max retries"),
#[cfg(windows)]
ErrorKind::StatusError => write!(f, "Status error"),
ErrorKind::Failure(ref reason) => write!(f, "Failure: {}", reason),
ErrorKind::EmptyPath => write!(f, "Empty path"),
ErrorKind::InvalidExtension => write!(f, "Invalid extension"),
}
}
}

impl Error {
pub(crate) fn missing_queue() -> Self {
Self {
Expand Down
1 change: 1 addition & 0 deletions src/items.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ message End {
message Failure {
uint32 id = 1;
uint32 reason = 2;
optional string description = 3;
}

// signals the receiver that the sender won't start new transfers
Expand Down
8 changes: 6 additions & 2 deletions src/items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ impl Message {
}
}

pub(crate) fn failure(id: u32, reason: u32) -> Self {
pub(crate) fn failure(id: u32, reason: u32, description: Option<String>) -> Self {
Self {
message: Some(message::Message::Failure(Failure { id, reason })),
message: Some(message::Message::Failure(Failure {
id,
reason,
description,
})),
}
}

Expand Down
56 changes: 32 additions & 24 deletions src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ pub(crate) struct Options {
pub(crate) start_port: u16,

/// The last port to use
#[clap(short, long, default_value_t = 50099)]
#[clap(short, long, default_value_t = 50009)]
pub(crate) end_port: u16,

/// The number of threads to use
#[clap(short, long, default_value_t = 98)]
#[clap(short, long, default_value_t = 8)]
pub(crate) threads: usize,

/// The log level [debug, info, warn, error]
Expand Down Expand Up @@ -87,29 +87,37 @@ pub(crate) struct Options {

impl Options {
pub(crate) fn format_command(&self, sender: bool) -> String {
let mode = if sender { "rr" } else { "rs" };
let mut arguments = vec![
String::from("cccp"),
format!("--mode {}", if sender { "rr" } else { "rs" }),
format!("-s {}", self.start_port),
format!("-e {}", self.end_port),
format!("-t {}", self.threads),
format!("-l {}", self.log_level),
format!("-r \"{}\"", self.rate),
format!("--control-crypto {}", self.control_crypto),
];

if let Some(ref crypto) = self.stream_crypto {
arguments.push(format!(" --stream-crypto {}", crypto))
}

let stream_crypto = if let Some(ref crypto) = self.stream_crypto {
format!(" --stream-crypto {}", crypto)
} else {
String::new()
};

format!(
"cccp --mode {} -s {} -e {} -t {} -l {} -r \"{}\"{} --control-crypto {}{}{} \"{}\" \"{}\"",
mode,
self.start_port,
self.end_port,
self.threads,
self.log_level,
self.rate,
stream_crypto,
self.control_crypto,
if self.overwrite { " -o" } else { "" },
if self.verify { " -v" } else { "" },
self.source,
self.destination
)
if self.overwrite {
arguments.push(String::from("-o"))
}

if self.verify {
arguments.push(String::from("-v"))
}

if self.recursive {
arguments.push(String::from("-R"))
}

arguments.push(format!("\"{}\"", self.source));
arguments.push(format!("\"{}\"", self.destination));

arguments.join(" ")
}

pub(crate) fn pps(&self) -> u64 {
Expand Down
43 changes: 30 additions & 13 deletions src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::stream::iter;
use futures::{StreamExt, TryStreamExt};
use kanal::{AsyncReceiver, AsyncSender};
use log::{debug, error, info, warn};
use tokio::fs::{create_dir_all, metadata};
use tokio::fs::{create_dir, metadata};
use tokio::io::AsyncWrite;
use tokio::net::{TcpStream, UdpSocket};
use tokio::select;
Expand Down Expand Up @@ -62,12 +62,14 @@ pub(crate) async fn main(

let filtered_files = manifest.files.into_iter().filter_map(|(id, details)| {
// formats the path to the file locally
let path = if is_dir {
let path = if is_dir || options.destination.file_path.is_dir() {
options.destination.file_path.join(&details.path)
} else {
options.destination.file_path.clone()
};

debug!("formatted local path {:?} for {:?}", path, details.path);

if path.exists() && !options.overwrite {
completed.push(id);
None
Expand All @@ -89,7 +91,7 @@ pub(crate) async fn main(
.to_owned()
+ ".partial"
} else {
".partial".to_string()
"partial".to_string()
};

let partial_path = path.with_extension(partial_extension);
Expand Down Expand Up @@ -132,7 +134,12 @@ pub(crate) async fn main(
stats.total_data.load(Relaxed)
);

write_message(&mut str_stream, &Message::failure(0, 1), &mut str_cipher).await?;
write_message(
&mut str_stream,
&Message::failure(0, 1, None),
&mut str_cipher,
)
.await?;

return Err(Error::failure(1));
}
Expand All @@ -148,15 +155,20 @@ pub(crate) async fn main(

// if the destination is a directory, create it
if is_dir {
debug!("creating directory {:?}", options.destination.file_path);
create_dir_all(&options.destination.file_path).await?;
}
if !options.destination.file_path.exists() {
debug!("creating directory {:?}", options.destination.file_path);
create_dir(&options.destination.file_path).await?;
}

// create the local directories needed to write the files
for dir in &manifest.directories {
let local_dir = options.destination.file_path.join(dir);

// create the local directories needed to write the files
for dir in &manifest.directories {
let local_dir = options.destination.file_path.join(dir);
debug!("creating directory {:?}", local_dir);
create_dir_all(local_dir).await?;
if !local_dir.exists() {
debug!("creating directory {:?}", local_dir);
create_dir(local_dir).await?;
}
}
}

let sockets = socket_factory(
Expand Down Expand Up @@ -248,6 +260,7 @@ async fn controller<C: StreamCipherExt + ?Sized>(
mut str_cipher: Box<C>,
) -> Result<()> {
loop {
debug!("waiting for message");
let message: Message = read_message(&mut str_stream, &mut str_cipher).await?;

match message.message {
Expand Down Expand Up @@ -284,9 +297,13 @@ async fn controller<C: StreamCipherExt + ?Sized>(
let details = details.clone();

async move {
let result = writer(details, queue, confirmation, message).await;
let result = writer(&details, queue, confirmation, &message).await;

if let Err(error) = result {
message
.send(Message::failure(details.id, 2, Some(error.to_string())))
.await
.unwrap();
error!("writer failed: {:?}", error);
}
}
Expand Down
18 changes: 8 additions & 10 deletions src/receiver/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,13 @@ impl SplitQueue {
receivers.get(id).cloned()
}

// TODO benchmark this
pub(crate) async fn send(&self, job: Job, id: u32) -> Result<()> {
let sender_option = {
let sender = {
let senders = self.senders.lock().await;
senders.get(&id).cloned()
senders.get(&id).ok_or(Error::missing_queue())?.clone()
};

if let Some(sender) = sender_option {
sender.send(job).await?;
}

sender.send(job).await?;
Ok(())
}
}
Expand All @@ -75,10 +71,10 @@ impl FileDetails {
}

pub(crate) async fn writer(
details: FileDetails,
details: &FileDetails,
writer_queue: WriterQueue,
confirmation_sender: AsyncSender<(u32, u64)>,
message_sender: AsyncSender<Message>,
message_sender: &AsyncSender<Message>,
) -> Result<()> {
let file = OpenOptions::new()
.write(true)
Expand Down Expand Up @@ -158,7 +154,9 @@ pub(crate) async fn writer(
let local_hash = hash_file(&details.partial_path).await?; // hash the file

if local_hash.as_bytes() != &remote_signature[..] {
message_sender.send(Message::failure(details.id, 0)).await?; // notify the sender
message_sender
.send(Message::failure(details.id, 0, None))
.await?; // notify the sender
remove_file(&details.partial_path).await?; // remove the partial file
return Err(Error::failure(0));
} else {
Expand Down
28 changes: 22 additions & 6 deletions src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub(crate) async fn main(
stats.total_data.fetch_sub(details.size as usize, Relaxed);
}
}

if manifest.files.is_empty() {
info!("all files completed");
write_message(&mut str_stream, &Message::done(), &mut str_cipher).await?;
return Ok(());
}
}
Some(message::Message::Failure(failure)) => {
error!("received failure message {}", failure.reason);
Expand Down Expand Up @@ -273,6 +279,19 @@ async fn controller<C: StreamCipherExt + ?Sized>(
);
}
}
Some(message::Message::Failure(failure)) if failure.reason == 2 => {
if active.remove(&failure.id).is_some() {
error!(
"remote writer failed {} [TRANSFER WILL NOT BE RETRIED]",
failure.description.unwrap()
);
} else {
warn!(
"received writer failure message {} for unknown file {}",
failure.reason, failure.id
);
}
}
Some(message::Message::Failure(failure)) => {
warn!(
"received unknown failure message {} for file {}",
Expand Down Expand Up @@ -546,13 +565,10 @@ fn files_and_dirs(
let path = entry.path();

if path.is_dir() {
dirs.push(path.clone());

if dirs.len() > 1 && !recursive {
continue;
if recursive {
dirs.push(path.clone());
files_and_dirs(&path, files, dirs, recursive)?;
}

files_and_dirs(&path, files, dirs, recursive)?;
} else {
files.push(path);
}
Expand Down

0 comments on commit f04a051

Please sign in to comment.