Skip to content

Commit

Permalink
refactoring, file integrity option added, fixed a potential issue in …
Browse files Browse the repository at this point in the history
…the writer
  • Loading branch information
chanderlud committed Dec 12, 2023
1 parent de30e8d commit 16bb5cd
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 160 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cccp"
version = "0.4.0"
version = "0.6.0"
edition = "2021"
build = "build.rs"

Expand All @@ -18,9 +18,9 @@ dirs = "5.0"
rpassword = "7.3"
indicatif = "0.17"
prost = "0.12"
prost-build = "0.12"
bytesize = "1.3.0"
kanal = "0.1.0-pre8"
blake3 = "1.5.0"

[build-dependencies]
prost-build = "0.12.3"
Expand Down
21 changes: 18 additions & 3 deletions src/items.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,53 @@ message Message {
StartIndex start_index = 3;
Start start = 4;
End end = 5;
Done done = 6;
Failure failure = 6;
Done done = 7;
}
}

// the sender provides these details to the receiver
message Manifest {
repeated string directories = 1;
map<uint32, FileDetail> files = 2; // map for file details
}

message FileDetail {
string file_path = 1;
uint64 file_size = 2;
string path = 1; // file path relative to the destination directory
uint64 size = 2; // file size in bytes
optional bytes signature = 3; // blake3 hash of file
}

// map of transfers and their confirmed indexes
message Confirmations {
map<uint32, ConfirmationIndexes> indexes = 1;
}

// the confirmed indexes
message ConfirmationIndexes {
repeated uint64 inner = 1;
}

// the receiver tells the sender which index it wants to start at
message StartIndex {
uint64 index = 1;
}

// signals the receiver that the sender wants to start a transfer
message Start {
uint32 id = 1;
}

// signals the the sender that the receiver has finished receiving the transfer
message End {
uint32 id = 1;
}

// signals the sender that the receiver has failed to receive the transfer
message Failure {
uint32 id = 1;
uint32 reason = 2;
}

// signals the receiver that the sender won't start new transfers
message Done {}
27 changes: 27 additions & 0 deletions src/items.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
include!(concat!(env!("OUT_DIR"), "/cccp.items.rs"));

impl Message {
pub(crate) fn start(id: u32) -> Self {
Self {
message: Some(message::Message::Start(Start { id })),
}
}

pub(crate) fn end(id: u32) -> Self {
Self {
message: Some(message::Message::End(End { id })),
}
}

pub(crate) fn failure(id: u32, reason: u32) -> Self {
Self {
message: Some(message::Message::Failure(Failure { id, reason })),
}
}

pub(crate) fn done() -> Self {
Self {
message: Some(message::Message::Done(Done {})),
}
}
}
72 changes: 55 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::error;
use std::fmt::{Display, Formatter};
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::{ExitCode, Termination};
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
Expand All @@ -12,23 +12,26 @@ use std::sync::Arc;
use std::time::Duration;

use async_ssh2_tokio::{AuthMethod, Client, ServerCheckMethod};
use blake3::{Hash, Hasher};
use bytesize::ByteSize;
use clap::Parser;
use futures::stream::iter;
use futures::{StreamExt, TryStreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use kanal::{AsyncReceiver, ReceiveError, SendError};
use kanal::{ReceiveError, SendError};
use log::{debug, error, info, warn, LevelFilter};
use prost::Message;
use regex::Regex;
use rpassword::prompt_password;
use simple_logging::{log_to_file, log_to_stderr};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket};
use tokio::sync::AcquireError;
use tokio::time::{interval, sleep};
use tokio::{io, select};

mod items;
mod receiver;
mod sender;

Expand All @@ -49,10 +52,6 @@ const PACKET_SIZE: usize = 8 + ID_SIZE + INDEX_SIZE + TRANSFER_BUFFER_SIZE;
// how long to wait for a job to be confirmed before requeuing it
const REQUEUE_INTERVAL: Duration = Duration::from_millis(1_000);

pub mod items {
include!(concat!(env!("OUT_DIR"), "/cccp.items.rs"));
}

#[derive(Debug)]
struct Error {
kind: ErrorKind,
Expand All @@ -66,7 +65,10 @@ enum ErrorKind {
Join(tokio::task::JoinError),
Send(SendError),
Receive(ReceiveError),
Acquire(AcquireError),
MissingQueue,
MaxRetries,
Failure(u32),
}

impl From<io::Error> for Error {
Expand Down Expand Up @@ -117,6 +119,14 @@ impl From<ReceiveError> for Error {
}
}

impl From<AcquireError> for Error {
fn from(error: AcquireError) -> Self {
Self {
kind: ErrorKind::Acquire(error),
}
}
}

impl Termination for Error {
fn report(self) -> ExitCode {
ExitCode::from(match self.kind {
Expand All @@ -129,7 +139,8 @@ impl Termination for Error {
ErrorKind::Join(_) => 5,
ErrorKind::Send(_) => 6,
ErrorKind::Receive(_) => 7,
ErrorKind::MissingQueue => 7,
ErrorKind::Acquire(_) => 8,
_ => 9,
})
}
}
Expand All @@ -140,6 +151,18 @@ impl Error {
kind: ErrorKind::MissingQueue,
}
}

fn max_retries() -> Self {
Self {
kind: ErrorKind::MaxRetries,
}
}

fn failure(reason: u32) -> Self {
Self {
kind: ErrorKind::Failure(reason),
}
}
}

#[derive(Parser, Clone, Debug)]
Expand Down Expand Up @@ -186,7 +209,7 @@ struct Options {
#[clap(
short,
long = "bind-address",
help = "manually specify the address to listen on"
help = "manually specify the bind address"
)]
bind_address: Option<IpAddr>,

Expand All @@ -206,6 +229,13 @@ struct Options {
)]
max: usize,

#[clap(
short,
long = "verify",
help = "verify integrity of files using blake3"
)]
verify: bool,

#[clap(help = "where to get the data from")]
source: FileLocation,

Expand Down Expand Up @@ -680,14 +710,22 @@ async fn read_message<R: AsyncReadExt + Unpin, M: Message + Default>(reader: &mu
Ok(message)
}

/// send messages from a channel to a writer
async fn message_sender<W: AsyncWrite + Unpin, M: Message>(
mut writer: W,
receiver: AsyncReceiver<M>,
) -> Result<()> {
while let Ok(message) = receiver.recv().await {
write_message(&mut writer, &message).await?;
async fn hash_file<P: AsRef<Path>>(path: P) -> io::Result<Hash> {
let file = File::open(path).await?;
let mut reader = BufReader::with_capacity(READ_BUFFER_SIZE, file);
let mut buffer = [0; 2048];

let mut hasher = Hasher::new();

loop {
let read = reader.read(&mut buffer).await?;

if read != 0 {
hasher.update(&buffer[..read]);
} else {
break;
}
}

Ok(())
Ok(hasher.finalize())
}
44 changes: 31 additions & 13 deletions src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::time::Duration;

use log::{debug, error, info, warn};
use tokio::fs::{create_dir_all, metadata};
use tokio::io::AsyncWrite;
use tokio::net::{TcpStream, UdpSocket};
use tokio::select;
use tokio::sync::Mutex;
Expand All @@ -19,7 +20,7 @@ use tokio::time::{interval, timeout};
use crate::items::{message, ConfirmationIndexes, Confirmations, Manifest, Message, StartIndex};
use crate::receiver::writer::{writer, SplitQueue};
use crate::{
read_message, socket_factory, write_message, Options, Result, TransferStats, ID_SIZE,
read_message, socket_factory, write_message, Error, Options, Result, TransferStats, ID_SIZE,
INDEX_SIZE, MAX_RETRIES, RECEIVE_TIMEOUT, TRANSFER_BUFFER_SIZE,
};

Expand Down Expand Up @@ -59,9 +60,7 @@ pub(crate) async fn main(

// set the total data to be received
for details in manifest.files.values() {
stats
.total_data
.fetch_add(details.file_size as usize, Relaxed);
stats.total_data.fetch_add(details.size as usize, Relaxed);
}

let sockets = socket_factory(
Expand All @@ -79,7 +78,7 @@ pub(crate) async fn main(

// `message_sender` can now be used to send messages to the sender
let (message_sender, message_receiver) = kanal::unbounded_async();
tokio::spawn(crate::message_sender(rts_stream, message_receiver));
tokio::spawn(send_messages(rts_stream, message_receiver));

let confirmation_handle = tokio::spawn(send_confirmations(
message_sender.clone(),
Expand All @@ -104,18 +103,20 @@ pub(crate) async fn main(

let receiver_future = async {
for handle in handles {
_ = handle.await;
handle.await??;
}

Ok(())
};

select! {
result = confirmation_handle => result?,
result = controller_handle => result?,
_ = receiver_future => { warn!("receiver(s) exited"); Ok(()) },
result = receiver_future => result,
}
}

async fn receiver(queue: WriterQueue, socket: UdpSocket) {
async fn receiver(queue: WriterQueue, socket: UdpSocket) -> Result<()> {
let mut buf = [0; ID_SIZE + INDEX_SIZE + TRANSFER_BUFFER_SIZE]; // buffer for receiving data
let mut retries = 0; // counter to keep track of retries

Expand All @@ -129,12 +130,18 @@ async fn receiver(queue: WriterQueue, socket: UdpSocket) {
u64::from_be_bytes(buf[ID_SIZE..INDEX_SIZE + ID_SIZE].try_into().unwrap());
let data = buf[INDEX_SIZE + ID_SIZE..].try_into().unwrap();

queue.send(Job { data, index }, id).await;
queue.send(Job { data, index }, id).await?;
}
Ok(Ok(_)) => warn!("0 byte read?"), // this should never happen
Ok(Err(_)) | Err(_) => retries += 1, // catch errors and timeouts
}
}

if retries == MAX_RETRIES {
Err(Error::max_retries())
} else {
Ok(())
}
}

async fn controller(
Expand All @@ -158,7 +165,7 @@ async fn controller(
writer_queue.push_queue(message.id).await; // create a queue for the writer

let file_path = if file_path.is_dir() {
file_path.join(&details.file_path)
file_path.join(&details.path)
} else {
file_path.clone()
};
Expand Down Expand Up @@ -188,9 +195,10 @@ async fn controller(
write_message(&mut str_stream, &StartIndex { index: start_index }).await?;

let file = writer::FileDetails {
file_size: details.file_size,
size: details.size,
partial_path,
path: file_path,
signature: details.signature,
};

tokio::spawn({
Expand All @@ -216,8 +224,6 @@ async fn controller(
}
}
});

debug!("started file {:?}", details);
}
Some(message::Message::Done(_)) => {
debug!("received done message");
Expand Down Expand Up @@ -294,3 +300,15 @@ async fn send_confirmations(
_ = future => Ok(())
}
}

/// send messages from a channel to a writer
async fn send_messages<W: AsyncWrite + Unpin, M: prost::Message>(
mut writer: W,
receiver: AsyncReceiver<M>,
) -> Result<()> {
while let Ok(message) = receiver.recv().await {
write_message(&mut writer, &message).await?;
}

Ok(())
}
Loading

0 comments on commit 16bb5cd

Please sign in to comment.