From d2f31988b042bd90fd40c9155032081f8e7b3b06 Mon Sep 17 00:00:00 2001 From: chanderlud Date: Wed, 3 Jan 2024 13:44:02 -0800 Subject: [PATCH] code cleanup & finalized remote -> remote transfers --- src/error.rs | 8 --- src/main.rs | 130 +++++++++++++++++++++----------------------- src/options.rs | 7 ++- src/receiver/mod.rs | 7 +-- src/sender/mod.rs | 7 +-- 5 files changed, 72 insertions(+), 87 deletions(-) diff --git a/src/error.rs b/src/error.rs index 7898558..bc9b56d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -37,7 +37,6 @@ pub(crate) enum ErrorKind { EmptyPath, InvalidExtension, UnexpectedMessage(Box), - Stop, } impl From for Error { @@ -183,7 +182,6 @@ impl std::fmt::Display for Error { ErrorKind::UnexpectedMessage(ref message) => { write!(f, "Unexpected message {:?}", message) } - ErrorKind::Stop => write!(f, "Stop"), } } } @@ -225,12 +223,6 @@ impl Error { } } - pub(crate) fn stop() -> Self { - Self { - kind: ErrorKind::Stop, - } - } - #[cfg(windows)] pub(crate) fn status_error() -> Self { Self { diff --git a/src/main.rs b/src/main.rs index 679cd7d..91784a9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,10 +26,10 @@ use simple_logging::{log_to_file, log_to_stderr}; use tokio::fs::File; use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::{TcpListener, TcpStream, UdpSocket}; +use tokio::signal::ctrl_c; use tokio::sync::Notify; use tokio::time::{interval, sleep, Instant, Interval}; use tokio::{io, select}; -use tokio::signal::ctrl_c; use crate::cipher::CipherStream; use crate::error::Error; @@ -116,10 +116,10 @@ async fn main() -> Result<()> { let mut options = Options::parse(); let mut command = Options::command(); - let cancel_signal = Arc::new(Notify::new()); + let signal = Arc::new(Notify::new()); tokio::spawn({ - let cancel_signal = cancel_signal.clone(); + let cancel_signal = signal.clone(); async move { ctrl_c().await.expect("failed to listen for ctrl-c"); @@ -128,14 +128,6 @@ async fn main() -> Result<()> { } }); - std::thread::spawn({ - let cancel_signal = cancel_signal.clone(); - - move || { - wait_for_stop(cancel_signal); - } - }); - match options.mode { Mode::Local => { if let Some(path) = &options.log_file { @@ -234,14 +226,13 @@ async fn main() -> Result<()> { sender.not().then_some(stats.sent_packets.clone()), None, None, - cancel_signal.clone(), + signal.clone(), )); // receiver -> sender stream - let rts_stream = connect_stream(remote_ip, options.start_port, &mut options).await?; + let rts = connect_stream(remote_ip, options.start_port, &mut options).await?; // sender -> receiver stream - let str_stream = - connect_stream(remote_ip, options.start_port + 1, &mut options).await?; + let str = connect_stream(remote_ip, options.start_port, &mut options).await?; let stats_handle = tokio::spawn({ let stats = stats.clone(); @@ -250,15 +241,7 @@ async fn main() -> Result<()> { local_stats_printer(stats, interval) }); - let main_future = run_main( - sender, - options, - stats.clone(), - rts_stream, - str_stream, - remote_ip, - cancel_signal - ); + let main_future = run_main(sender, options, stats.clone(), rts, str, remote_ip, signal); let command_future = async { match command_handle.await?? { @@ -281,24 +264,32 @@ async fn main() -> Result<()> { stats_handle.await?; } Mode::Remote(sender) => { - let (rts_stream, str_stream, remote_addr) = match options.stream_setup_mode { + // remote clients must listen for STOP on stdin + std::thread::spawn({ + let cancel_signal = signal.clone(); + + move || { + wait_for_stop(cancel_signal); + } + }); + + let (rts, str, remote_addr) = match options.stream_setup_mode { + // remote clients usually are in listen mode SetupMode::Listen => { - let (rts_stream, remote_addr) = - listen_stream(options.start_port, &mut options).await?; - let (str_stream, _) = - listen_stream(options.start_port + 1, &mut options).await?; + let (rts, addr) = listen_stream(options.start_port, &mut options).await?; + let (str, _) = listen_stream(options.start_port, &mut options).await?; - (rts_stream, str_stream, remote_addr) + (rts, str, addr) } + // remote clients only use connect mode for remote -> remote transfers where the source is always in connect mode SetupMode::Connect => { // unwrap is safe because host must be specified for remote IoSpec let addr = options.destination.host.unwrap().ip(); - let rts_stream = connect_stream(addr, options.start_port, &mut options).await?; - let str_stream = - connect_stream(addr, options.start_port + 1, &mut options).await?; + let rts = connect_stream(addr, options.start_port, &mut options).await?; + let str = connect_stream(addr, options.start_port, &mut options).await?; - (rts_stream, str_stream, addr) + (rts, str, addr) } }; @@ -308,10 +299,10 @@ async fn main() -> Result<()> { sender, options, stats.clone(), - rts_stream, - str_stream, + rts, + str, remote_addr, - cancel_signal + signal, ) .await?; @@ -323,26 +314,24 @@ async fn main() -> Result<()> { let sender_addr = options.source.host.unwrap(); let receiver_addr = options.destination.host.unwrap(); - let sender_client = connect_client(sender_addr, &options.source.username).await?; - let receiver_client = - connect_client(receiver_addr, &options.destination.username).await?; - let sender_handle = tokio::spawn(command_runner( - sender_client, + connect_client(sender_addr, &options.source.username).await?, options.format_command(false, SetupMode::Connect), // sender is inverted somewhat confusingly + // use the sender's stats Some(stats.sent_packets.clone()), Some(stats.confirmed_packets.clone()), Some(stats.total_data.clone()), - cancel_signal.clone(), + signal.clone(), )); let receiver_handle = tokio::spawn(command_runner( - receiver_client, + connect_client(receiver_addr, &options.destination.username).await?, options.format_command(true, SetupMode::Listen), + // ignore the receiver's stats None, None, None, - cancel_signal.clone(), + signal.clone(), )); let stats_handle = tokio::spawn({ @@ -356,9 +345,9 @@ async fn main() -> Result<()> { let receiver_status = receiver_handle.await??; if sender_status != Some(0) { - error!("sender command failed with status {:?}", sender_status); + error!("sender command returned status {:?}", sender_status); } else if receiver_status != Some(0) { - error!("receiver command failed with status {:?}", receiver_status); + error!("receiver command returned status {:?}", receiver_status); } stats.complete.store(true, Relaxed); @@ -376,15 +365,15 @@ async fn run_main( sender: bool, options: Options, stats: TransferStats, - rts_stream: CipherStream, - str_stream: CipherStream, + rts: CipherStream, + str: CipherStream, remote_addr: IpAddr, - cancel_signal: Arc, + signal: Arc, ) -> Result<()> { if sender { - sender::main(options, stats, rts_stream, str_stream, remote_addr, cancel_signal).await + sender::main(options, stats, rts, str, remote_addr, signal).await } else { - receiver::main(options, stats, rts_stream, str_stream, remote_addr, cancel_signal).await + receiver::main(options, stats, rts, str, remote_addr, signal).await } } @@ -534,16 +523,15 @@ async fn command_runner( debug!("executing command: {}", command); let mut channel = client.get_channel().await?; - let mut status: Option = None; channel.exec(true, command).await?; loop { select! { _ = cancel_signal.notified() => { - debug!("cancel signal received"); + // the remote client listens for STOP in it's stdin + // this is more reliable & cross platform than sending a signal channel.data(&b"STOP\n"[..]).await?; - debug!("sent STOP message"); - break; + break Ok(None); } message = channel.wait() => { if let Some(message) = message { @@ -566,20 +554,24 @@ async fn command_runner( } } ChannelMsg::ExtendedData { ref data, ext: 1 } => { - error!("remote stderr: {}", String::from_utf8_lossy(data)) + let error = String::from_utf8_lossy(data); + + if error.contains("not recognized as an internal or external command") { + break Err(io::Error::new( + io::ErrorKind::NotFound, + "cccp is not installed on the remote host", + ).into()) + } } - ChannelMsg::ExitStatus { exit_status } => status = Some(exit_status), + ChannelMsg::ExitStatus { exit_status } => break Ok(Some(exit_status)), _ => {} } } else { - break + break Ok(None) } } } } - - debug!("command runner finished with status {:?}", status); - Ok(status) } /// connects to a listening remote client @@ -599,6 +591,7 @@ async fn connect_stream( let stream = CipherStream::new(tcp_stream, &options.control_crypto)?; options.control_crypto.next_iv(); + options.start_port += 1; Ok(stream) } @@ -608,9 +601,11 @@ async fn listen_stream( options: &mut Options, ) -> Result<(CipherStream, IpAddr)> { let listener = TcpListener::bind(("0.0.0.0", port)).await?; - let (stream, remote_addr) = listener.accept().await?; - let stream = CipherStream::new(stream, &options.control_crypto)?; + let (tcp_stream, remote_addr) = listener.accept().await?; + + let stream = CipherStream::new(tcp_stream, &options.control_crypto)?; options.control_crypto.next_iv(); + options.start_port += 1; Ok((stream, remote_addr.ip())) } @@ -635,16 +630,17 @@ async fn hash_file>(path: P) -> io::Result { Ok(hasher.finalize()) } +/// watches for stdin to receive a STOP message fn wait_for_stop(signal: Arc) { let stdin = std::io::stdin(); let reader = std::io::BufReader::new(stdin); - let mut lines = reader.lines(); + let lines = reader.lines(); - while let Some(Ok(line)) = lines.next() { + for line in lines.map_while(std::result::Result::ok) { if line.contains("STOP") { debug!("received STOP message"); signal.notify_waiters(); break; } } -} \ No newline at end of file +} diff --git a/src/options.rs b/src/options.rs index 2f96c34..7f93bff 100644 --- a/src/options.rs +++ b/src/options.rs @@ -17,7 +17,6 @@ use crate::cipher::random_bytes; use crate::items::{Cipher, Crypto}; use crate::PACKET_SIZE; -// TODO add a help item for firewall stuff const HELP_HEADING: &str = "\x1B[1m\x1B[4mAbout\x1B[0m cccp is a fast, secure, and reliable file transfer utility @@ -33,7 +32,11 @@ const HELP_HEADING: &str = "\x1B[1m\x1B[4mAbout\x1B[0m - CHAHA20 - AES128 - AES192 - - AES256"; + - AES256 + +\x1B[1m\x1B[4mFirewall\x1B[0m\ + - The first two ports are used for TCP streams which carry control messages + - The remaining ports are UDP sockets which carry data"; #[derive(Parser)] #[clap(version, about = HELP_HEADING)] diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs index 0f82d18..3f2e096 100644 --- a/src/receiver/mod.rs +++ b/src/receiver/mod.rs @@ -178,7 +178,7 @@ pub(crate) async fn main( } let sockets = socket_factory( - options.start_port + 2, // the first two ports are used for control messages and confirmations + options.start_port, options.end_port, remote_addr, options.threads, @@ -228,10 +228,7 @@ pub(crate) async fn main( result = controller_handle => { debug!("controller exited: {:?}", result); result? }, result = receiver_future => { debug!("receivers exited: {:?}", result); result }, result = message_sender_handle => { debug!("message sender exited: {:?}", result); result? }, - _ = cancel_signal.notified() => { - debug!("stop signal received"); - Err(Error::stop()) - } + _ = cancel_signal.notified() => { debug!("stop signal received"); Ok(()) } } } diff --git a/src/sender/mod.rs b/src/sender/mod.rs index 0f19d95..b930f67 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -85,7 +85,7 @@ pub(crate) async fn main( } let sockets = socket_factory( - options.start_port + 2, // the first two ports are used for control messages and confirmations + options.start_port, options.end_port, remote_addr, options.threads, @@ -164,10 +164,7 @@ pub(crate) async fn main( result = controller_handle => { debug!("controller exited: {:?}", result); result? }, result = sender_future => { debug!("senders exited: {:?}", result); result }, result = receiver_handle => { debug!("message receiver exited: {:?}", result); result? }, - _ = cancel_signal.notified() => { - debug!("stop signal received"); - Err(Error::stop()) - } + _ = cancel_signal.notified() => { debug!("stop signal received"); Ok(()) } } }