Skip to content

Commit

Permalink
code cleanup & finalized remote -> remote transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
chanderlud committed Jan 3, 2024
1 parent 43aff4f commit d2f3198
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 87 deletions.
8 changes: 0 additions & 8 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub(crate) enum ErrorKind {
EmptyPath,
InvalidExtension,
UnexpectedMessage(Box<dyn Message>),
Stop,
}

impl From<io::Error> for Error {
Expand Down Expand Up @@ -183,7 +182,6 @@ impl std::fmt::Display for Error {
ErrorKind::UnexpectedMessage(ref message) => {
write!(f, "Unexpected message {:?}", message)
}
ErrorKind::Stop => write!(f, "Stop"),
}
}
}
Expand Down Expand Up @@ -225,12 +223,6 @@ impl Error {
}
}

pub(crate) fn stop() -> Self {
Self {
kind: ErrorKind::Stop,
}
}

#[cfg(windows)]
pub(crate) fn status_error() -> Self {
Self {
Expand Down
130 changes: 63 additions & 67 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use simple_logging::{log_to_file, log_to_stderr};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::signal::ctrl_c;
use tokio::sync::Notify;
use tokio::time::{interval, sleep, Instant, Interval};
use tokio::{io, select};
use tokio::signal::ctrl_c;

use crate::cipher::CipherStream;
use crate::error::Error;
Expand Down Expand Up @@ -116,10 +116,10 @@ async fn main() -> Result<()> {
let mut options = Options::parse();
let mut command = Options::command();

let cancel_signal = Arc::new(Notify::new());
let signal = Arc::new(Notify::new());

tokio::spawn({
let cancel_signal = cancel_signal.clone();
let cancel_signal = signal.clone();

async move {
ctrl_c().await.expect("failed to listen for ctrl-c");
Expand All @@ -128,14 +128,6 @@ async fn main() -> Result<()> {
}
});

std::thread::spawn({
let cancel_signal = cancel_signal.clone();

move || {
wait_for_stop(cancel_signal);
}
});

match options.mode {
Mode::Local => {
if let Some(path) = &options.log_file {
Expand Down Expand Up @@ -234,14 +226,13 @@ async fn main() -> Result<()> {
sender.not().then_some(stats.sent_packets.clone()),
None,
None,
cancel_signal.clone(),
signal.clone(),
));

// receiver -> sender stream
let rts_stream = connect_stream(remote_ip, options.start_port, &mut options).await?;
let rts = connect_stream(remote_ip, options.start_port, &mut options).await?;
// sender -> receiver stream
let str_stream =
connect_stream(remote_ip, options.start_port + 1, &mut options).await?;
let str = connect_stream(remote_ip, options.start_port, &mut options).await?;

let stats_handle = tokio::spawn({
let stats = stats.clone();
Expand All @@ -250,15 +241,7 @@ async fn main() -> Result<()> {
local_stats_printer(stats, interval)
});

let main_future = run_main(
sender,
options,
stats.clone(),
rts_stream,
str_stream,
remote_ip,
cancel_signal
);
let main_future = run_main(sender, options, stats.clone(), rts, str, remote_ip, signal);

let command_future = async {
match command_handle.await?? {
Expand All @@ -281,24 +264,32 @@ async fn main() -> Result<()> {
stats_handle.await?;
}
Mode::Remote(sender) => {
let (rts_stream, str_stream, remote_addr) = match options.stream_setup_mode {
// remote clients must listen for STOP on stdin
std::thread::spawn({
let cancel_signal = signal.clone();

move || {
wait_for_stop(cancel_signal);
}
});

let (rts, str, remote_addr) = match options.stream_setup_mode {
// remote clients usually are in listen mode
SetupMode::Listen => {
let (rts_stream, remote_addr) =
listen_stream(options.start_port, &mut options).await?;
let (str_stream, _) =
listen_stream(options.start_port + 1, &mut options).await?;
let (rts, addr) = listen_stream(options.start_port, &mut options).await?;
let (str, _) = listen_stream(options.start_port, &mut options).await?;

(rts_stream, str_stream, remote_addr)
(rts, str, addr)
}
// remote clients only use connect mode for remote -> remote transfers where the source is always in connect mode
SetupMode::Connect => {
// unwrap is safe because host must be specified for remote IoSpec
let addr = options.destination.host.unwrap().ip();

let rts_stream = connect_stream(addr, options.start_port, &mut options).await?;
let str_stream =
connect_stream(addr, options.start_port + 1, &mut options).await?;
let rts = connect_stream(addr, options.start_port, &mut options).await?;
let str = connect_stream(addr, options.start_port, &mut options).await?;

(rts_stream, str_stream, addr)
(rts, str, addr)
}
};

Expand All @@ -308,10 +299,10 @@ async fn main() -> Result<()> {
sender,
options,
stats.clone(),
rts_stream,
str_stream,
rts,
str,
remote_addr,
cancel_signal
signal,
)
.await?;

Expand All @@ -323,26 +314,24 @@ async fn main() -> Result<()> {
let sender_addr = options.source.host.unwrap();
let receiver_addr = options.destination.host.unwrap();

let sender_client = connect_client(sender_addr, &options.source.username).await?;
let receiver_client =
connect_client(receiver_addr, &options.destination.username).await?;

let sender_handle = tokio::spawn(command_runner(
sender_client,
connect_client(sender_addr, &options.source.username).await?,
options.format_command(false, SetupMode::Connect), // sender is inverted somewhat confusingly
// use the sender's stats
Some(stats.sent_packets.clone()),
Some(stats.confirmed_packets.clone()),
Some(stats.total_data.clone()),
cancel_signal.clone(),
signal.clone(),
));

let receiver_handle = tokio::spawn(command_runner(
receiver_client,
connect_client(receiver_addr, &options.destination.username).await?,
options.format_command(true, SetupMode::Listen),
// ignore the receiver's stats
None,
None,
None,
cancel_signal.clone(),
signal.clone(),
));

let stats_handle = tokio::spawn({
Expand All @@ -356,9 +345,9 @@ async fn main() -> Result<()> {
let receiver_status = receiver_handle.await??;

if sender_status != Some(0) {
error!("sender command failed with status {:?}", sender_status);
error!("sender command returned status {:?}", sender_status);
} else if receiver_status != Some(0) {
error!("receiver command failed with status {:?}", receiver_status);
error!("receiver command returned status {:?}", receiver_status);
}

stats.complete.store(true, Relaxed);
Expand All @@ -376,15 +365,15 @@ async fn run_main(
sender: bool,
options: Options,
stats: TransferStats,
rts_stream: CipherStream<TcpStream>,
str_stream: CipherStream<TcpStream>,
rts: CipherStream<TcpStream>,
str: CipherStream<TcpStream>,
remote_addr: IpAddr,
cancel_signal: Arc<Notify>,
signal: Arc<Notify>,
) -> Result<()> {
if sender {
sender::main(options, stats, rts_stream, str_stream, remote_addr, cancel_signal).await
sender::main(options, stats, rts, str, remote_addr, signal).await
} else {
receiver::main(options, stats, rts_stream, str_stream, remote_addr, cancel_signal).await
receiver::main(options, stats, rts, str, remote_addr, signal).await
}
}

Expand Down Expand Up @@ -534,16 +523,15 @@ async fn command_runner(
debug!("executing command: {}", command);

let mut channel = client.get_channel().await?;
let mut status: Option<u32> = None;
channel.exec(true, command).await?;

loop {
select! {
_ = cancel_signal.notified() => {
debug!("cancel signal received");
// the remote client listens for STOP in it's stdin
// this is more reliable & cross platform than sending a signal
channel.data(&b"STOP\n"[..]).await?;
debug!("sent STOP message");
break;
break Ok(None);
}
message = channel.wait() => {
if let Some(message) = message {
Expand All @@ -566,20 +554,24 @@ async fn command_runner(
}
}
ChannelMsg::ExtendedData { ref data, ext: 1 } => {
error!("remote stderr: {}", String::from_utf8_lossy(data))
let error = String::from_utf8_lossy(data);

if error.contains("not recognized as an internal or external command") {
break Err(io::Error::new(
io::ErrorKind::NotFound,
"cccp is not installed on the remote host",
).into())
}
}
ChannelMsg::ExitStatus { exit_status } => status = Some(exit_status),
ChannelMsg::ExitStatus { exit_status } => break Ok(Some(exit_status)),
_ => {}
}
} else {
break
break Ok(None)
}
}
}
}

debug!("command runner finished with status {:?}", status);
Ok(status)
}

/// connects to a listening remote client
Expand All @@ -599,6 +591,7 @@ async fn connect_stream(

let stream = CipherStream::new(tcp_stream, &options.control_crypto)?;
options.control_crypto.next_iv();
options.start_port += 1;
Ok(stream)
}

Expand All @@ -608,9 +601,11 @@ async fn listen_stream(
options: &mut Options,
) -> Result<(CipherStream<TcpStream>, IpAddr)> {
let listener = TcpListener::bind(("0.0.0.0", port)).await?;
let (stream, remote_addr) = listener.accept().await?;
let stream = CipherStream::new(stream, &options.control_crypto)?;
let (tcp_stream, remote_addr) = listener.accept().await?;

let stream = CipherStream::new(tcp_stream, &options.control_crypto)?;
options.control_crypto.next_iv();
options.start_port += 1;
Ok((stream, remote_addr.ip()))
}

Expand All @@ -635,16 +630,17 @@ async fn hash_file<P: AsRef<Path>>(path: P) -> io::Result<Hash> {
Ok(hasher.finalize())
}

/// watches for stdin to receive a STOP message
fn wait_for_stop(signal: Arc<Notify>) {
let stdin = std::io::stdin();
let reader = std::io::BufReader::new(stdin);
let mut lines = reader.lines();
let lines = reader.lines();

while let Some(Ok(line)) = lines.next() {
for line in lines.map_while(std::result::Result::ok) {
if line.contains("STOP") {
debug!("received STOP message");
signal.notify_waiters();
break;
}
}
}
}
7 changes: 5 additions & 2 deletions src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use crate::cipher::random_bytes;
use crate::items::{Cipher, Crypto};
use crate::PACKET_SIZE;

// TODO add a help item for firewall stuff
const HELP_HEADING: &str = "\x1B[1m\x1B[4mAbout\x1B[0m
cccp is a fast, secure, and reliable file transfer utility
Expand All @@ -33,7 +32,11 @@ const HELP_HEADING: &str = "\x1B[1m\x1B[4mAbout\x1B[0m
- CHAHA20
- AES128
- AES192
- AES256";
- AES256
\x1B[1m\x1B[4mFirewall\x1B[0m\
- The first two ports are used for TCP streams which carry control messages
- The remaining ports are UDP sockets which carry data";

#[derive(Parser)]
#[clap(version, about = HELP_HEADING)]
Expand Down
7 changes: 2 additions & 5 deletions src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub(crate) async fn main(
}

let sockets = socket_factory(
options.start_port + 2, // the first two ports are used for control messages and confirmations
options.start_port,
options.end_port,
remote_addr,
options.threads,
Expand Down Expand Up @@ -228,10 +228,7 @@ pub(crate) async fn main(
result = controller_handle => { debug!("controller exited: {:?}", result); result? },
result = receiver_future => { debug!("receivers exited: {:?}", result); result },
result = message_sender_handle => { debug!("message sender exited: {:?}", result); result? },
_ = cancel_signal.notified() => {
debug!("stop signal received");
Err(Error::stop())
}
_ = cancel_signal.notified() => { debug!("stop signal received"); Ok(()) }
}
}

Expand Down
7 changes: 2 additions & 5 deletions src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub(crate) async fn main(
}

let sockets = socket_factory(
options.start_port + 2, // the first two ports are used for control messages and confirmations
options.start_port,
options.end_port,
remote_addr,
options.threads,
Expand Down Expand Up @@ -164,10 +164,7 @@ pub(crate) async fn main(
result = controller_handle => { debug!("controller exited: {:?}", result); result? },
result = sender_future => { debug!("senders exited: {:?}", result); result },
result = receiver_handle => { debug!("message receiver exited: {:?}", result); result? },
_ = cancel_signal.notified() => {
debug!("stop signal received");
Err(Error::stop())
}
_ = cancel_signal.notified() => { debug!("stop signal received"); Ok(()) }
}
}

Expand Down

0 comments on commit d2f3198

Please sign in to comment.