diff --git a/Cargo.toml b/Cargo.toml index d9c042d..65513ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cccp" -version = "0.4.0" +version = "0.6.0" edition = "2021" build = "build.rs" @@ -18,9 +18,9 @@ dirs = "5.0" rpassword = "7.3" indicatif = "0.17" prost = "0.12" -prost-build = "0.12" bytesize = "1.3.0" kanal = "0.1.0-pre8" +blake3 = "1.5.0" [build-dependencies] prost-build = "0.12.3" diff --git a/src/items.proto b/src/items.proto index 17a8b7d..97ffcd1 100644 --- a/src/items.proto +++ b/src/items.proto @@ -9,38 +9,53 @@ message Message { StartIndex start_index = 3; Start start = 4; End end = 5; - Done done = 6; + Failure failure = 6; + Done done = 7; } } +// the sender provides these details to the receiver message Manifest { repeated string directories = 1; map files = 2; // map for file details } message FileDetail { - string file_path = 1; - uint64 file_size = 2; + string path = 1; // file path relative to the destination directory + uint64 size = 2; // file size in bytes + optional bytes signature = 3; // blake3 hash of file } +// map of transfers and their confirmed indexes message Confirmations { map indexes = 1; } +// the confirmed indexes message ConfirmationIndexes { repeated uint64 inner = 1; } +// the receiver tells the sender which index it wants to start at message StartIndex { uint64 index = 1; } +// signals the receiver that the sender wants to start a transfer message Start { uint32 id = 1; } +// signals the the sender that the receiver has finished receiving the transfer message End { uint32 id = 1; } +// signals the sender that the receiver has failed to receive the transfer +message Failure { + uint32 id = 1; + uint32 reason = 2; +} + +// signals the receiver that the sender won't start new transfers message Done {} \ No newline at end of file diff --git a/src/items.rs b/src/items.rs new file mode 100644 index 0000000..2367d24 --- /dev/null +++ b/src/items.rs @@ -0,0 +1,27 @@ +include!(concat!(env!("OUT_DIR"), "/cccp.items.rs")); + +impl Message { + pub(crate) fn start(id: u32) -> Self { + Self { + message: Some(message::Message::Start(Start { id })), + } + } + + pub(crate) fn end(id: u32) -> Self { + Self { + message: Some(message::Message::End(End { id })), + } + } + + pub(crate) fn failure(id: u32, reason: u32) -> Self { + Self { + message: Some(message::Message::Failure(Failure { id, reason })), + } + } + + pub(crate) fn done() -> Self { + Self { + message: Some(message::Message::Done(Done {})), + } + } +} diff --git a/src/main.rs b/src/main.rs index 0af6bed..b434d0e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ use std::error; use std::fmt::{Display, Formatter}; use std::net::{IpAddr, SocketAddr}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::process::{ExitCode, Termination}; use std::str::FromStr; use std::sync::atomic::AtomicUsize; @@ -12,23 +12,26 @@ use std::sync::Arc; use std::time::Duration; use async_ssh2_tokio::{AuthMethod, Client, ServerCheckMethod}; +use blake3::{Hash, Hasher}; use bytesize::ByteSize; use clap::Parser; use futures::stream::iter; use futures::{StreamExt, TryStreamExt}; use indicatif::{ProgressBar, ProgressStyle}; -use kanal::{AsyncReceiver, ReceiveError, SendError}; +use kanal::{ReceiveError, SendError}; use log::{debug, error, info, warn, LevelFilter}; use prost::Message; use regex::Regex; use rpassword::prompt_password; use simple_logging::{log_to_file, log_to_stderr}; use tokio::fs::File; -use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket}; +use tokio::sync::AcquireError; use tokio::time::{interval, sleep}; use tokio::{io, select}; +mod items; mod receiver; mod sender; @@ -49,10 +52,6 @@ const PACKET_SIZE: usize = 8 + ID_SIZE + INDEX_SIZE + TRANSFER_BUFFER_SIZE; // how long to wait for a job to be confirmed before requeuing it const REQUEUE_INTERVAL: Duration = Duration::from_millis(1_000); -pub mod items { - include!(concat!(env!("OUT_DIR"), "/cccp.items.rs")); -} - #[derive(Debug)] struct Error { kind: ErrorKind, @@ -66,7 +65,10 @@ enum ErrorKind { Join(tokio::task::JoinError), Send(SendError), Receive(ReceiveError), + Acquire(AcquireError), MissingQueue, + MaxRetries, + Failure(u32), } impl From for Error { @@ -117,6 +119,14 @@ impl From for Error { } } +impl From for Error { + fn from(error: AcquireError) -> Self { + Self { + kind: ErrorKind::Acquire(error), + } + } +} + impl Termination for Error { fn report(self) -> ExitCode { ExitCode::from(match self.kind { @@ -129,7 +139,8 @@ impl Termination for Error { ErrorKind::Join(_) => 5, ErrorKind::Send(_) => 6, ErrorKind::Receive(_) => 7, - ErrorKind::MissingQueue => 7, + ErrorKind::Acquire(_) => 8, + _ => 9, }) } } @@ -140,6 +151,18 @@ impl Error { kind: ErrorKind::MissingQueue, } } + + fn max_retries() -> Self { + Self { + kind: ErrorKind::MaxRetries, + } + } + + fn failure(reason: u32) -> Self { + Self { + kind: ErrorKind::Failure(reason), + } + } } #[derive(Parser, Clone, Debug)] @@ -186,7 +209,7 @@ struct Options { #[clap( short, long = "bind-address", - help = "manually specify the address to listen on" + help = "manually specify the bind address" )] bind_address: Option, @@ -206,6 +229,13 @@ struct Options { )] max: usize, + #[clap( + short, + long = "verify", + help = "verify integrity of files using blake3" + )] + verify: bool, + #[clap(help = "where to get the data from")] source: FileLocation, @@ -680,14 +710,22 @@ async fn read_message(reader: &mu Ok(message) } -/// send messages from a channel to a writer -async fn message_sender( - mut writer: W, - receiver: AsyncReceiver, -) -> Result<()> { - while let Ok(message) = receiver.recv().await { - write_message(&mut writer, &message).await?; +async fn hash_file>(path: P) -> io::Result { + let file = File::open(path).await?; + let mut reader = BufReader::with_capacity(READ_BUFFER_SIZE, file); + let mut buffer = [0; 2048]; + + let mut hasher = Hasher::new(); + + loop { + let read = reader.read(&mut buffer).await?; + + if read != 0 { + hasher.update(&buffer[..read]); + } else { + break; + } } - Ok(()) + Ok(hasher.finalize()) } diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs index cc95a7e..477d3f2 100644 --- a/src/receiver/mod.rs +++ b/src/receiver/mod.rs @@ -10,6 +10,7 @@ use std::time::Duration; use log::{debug, error, info, warn}; use tokio::fs::{create_dir_all, metadata}; +use tokio::io::AsyncWrite; use tokio::net::{TcpStream, UdpSocket}; use tokio::select; use tokio::sync::Mutex; @@ -19,7 +20,7 @@ use tokio::time::{interval, timeout}; use crate::items::{message, ConfirmationIndexes, Confirmations, Manifest, Message, StartIndex}; use crate::receiver::writer::{writer, SplitQueue}; use crate::{ - read_message, socket_factory, write_message, Options, Result, TransferStats, ID_SIZE, + read_message, socket_factory, write_message, Error, Options, Result, TransferStats, ID_SIZE, INDEX_SIZE, MAX_RETRIES, RECEIVE_TIMEOUT, TRANSFER_BUFFER_SIZE, }; @@ -59,9 +60,7 @@ pub(crate) async fn main( // set the total data to be received for details in manifest.files.values() { - stats - .total_data - .fetch_add(details.file_size as usize, Relaxed); + stats.total_data.fetch_add(details.size as usize, Relaxed); } let sockets = socket_factory( @@ -79,7 +78,7 @@ pub(crate) async fn main( // `message_sender` can now be used to send messages to the sender let (message_sender, message_receiver) = kanal::unbounded_async(); - tokio::spawn(crate::message_sender(rts_stream, message_receiver)); + tokio::spawn(send_messages(rts_stream, message_receiver)); let confirmation_handle = tokio::spawn(send_confirmations( message_sender.clone(), @@ -104,18 +103,20 @@ pub(crate) async fn main( let receiver_future = async { for handle in handles { - _ = handle.await; + handle.await??; } + + Ok(()) }; select! { result = confirmation_handle => result?, result = controller_handle => result?, - _ = receiver_future => { warn!("receiver(s) exited"); Ok(()) }, + result = receiver_future => result, } } -async fn receiver(queue: WriterQueue, socket: UdpSocket) { +async fn receiver(queue: WriterQueue, socket: UdpSocket) -> Result<()> { let mut buf = [0; ID_SIZE + INDEX_SIZE + TRANSFER_BUFFER_SIZE]; // buffer for receiving data let mut retries = 0; // counter to keep track of retries @@ -129,12 +130,18 @@ async fn receiver(queue: WriterQueue, socket: UdpSocket) { u64::from_be_bytes(buf[ID_SIZE..INDEX_SIZE + ID_SIZE].try_into().unwrap()); let data = buf[INDEX_SIZE + ID_SIZE..].try_into().unwrap(); - queue.send(Job { data, index }, id).await; + queue.send(Job { data, index }, id).await?; } Ok(Ok(_)) => warn!("0 byte read?"), // this should never happen Ok(Err(_)) | Err(_) => retries += 1, // catch errors and timeouts } } + + if retries == MAX_RETRIES { + Err(Error::max_retries()) + } else { + Ok(()) + } } async fn controller( @@ -158,7 +165,7 @@ async fn controller( writer_queue.push_queue(message.id).await; // create a queue for the writer let file_path = if file_path.is_dir() { - file_path.join(&details.file_path) + file_path.join(&details.path) } else { file_path.clone() }; @@ -188,9 +195,10 @@ async fn controller( write_message(&mut str_stream, &StartIndex { index: start_index }).await?; let file = writer::FileDetails { - file_size: details.file_size, + size: details.size, partial_path, path: file_path, + signature: details.signature, }; tokio::spawn({ @@ -216,8 +224,6 @@ async fn controller( } } }); - - debug!("started file {:?}", details); } Some(message::Message::Done(_)) => { debug!("received done message"); @@ -294,3 +300,15 @@ async fn send_confirmations( _ = future => Ok(()) } } + +/// send messages from a channel to a writer +async fn send_messages( + mut writer: W, + receiver: AsyncReceiver, +) -> Result<()> { + while let Ok(message) = receiver.recv().await { + write_message(&mut writer, &message).await?; + } + + Ok(()) +} diff --git a/src/receiver/writer.rs b/src/receiver/writer.rs index 18e2bc2..f4e1f1e 100644 --- a/src/receiver/writer.rs +++ b/src/receiver/writer.rs @@ -5,45 +5,49 @@ use std::path::PathBuf; use kanal::{AsyncReceiver, AsyncSender}; use log::{debug, info}; -use tokio::fs::{rename, OpenOptions}; +use tokio::fs::{remove_file, rename, OpenOptions}; use tokio::io::{self, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter}; -use tokio::sync::RwLock; +use tokio::sync::Mutex; -use crate::items::{message, End, Message}; +use crate::items::Message; use crate::receiver::{Job, WriterQueue}; -use crate::{Error, Result, TRANSFER_BUFFER_SIZE, WRITE_BUFFER_SIZE}; +use crate::{hash_file, Error, Result, TRANSFER_BUFFER_SIZE, WRITE_BUFFER_SIZE}; #[derive(Default)] pub(crate) struct SplitQueue { - senders: RwLock>>, - receivers: RwLock>>, + senders: Mutex>>, + receivers: Mutex>>, } impl SplitQueue { pub(crate) async fn push_queue(&self, id: u32) { let (sender, receiver) = kanal::bounded_async(1_000); - self.receivers.write().await.insert(id, receiver); - self.senders.write().await.insert(id, sender); + self.receivers.lock().await.insert(id, receiver); + self.senders.lock().await.insert(id, sender); } pub(crate) async fn pop_queue(&self, id: &u32) { - self.receivers.write().await.remove(id); - self.senders.write().await.remove(id); + self.receivers.lock().await.remove(id); + self.senders.lock().await.remove(id); } pub(crate) async fn get_receiver(&self, id: &u32) -> Option> { - let receivers = self.receivers.read().await; + let receivers = self.receivers.lock().await; receivers.get(id).cloned() } - // TODO if the the receiver fills, the sender will block & block push_queue - pub(crate) async fn send(&self, job: Job, id: u32) { - let senders = self.senders.read().await; + pub(crate) async fn send(&self, job: Job, id: u32) -> Result<()> { + let sender_option = { + let senders = self.senders.lock().await; + senders.get(&id).cloned() + }; - if let Some(sender) = senders.get(&id) { - sender.send(job).await.unwrap(); + if let Some(sender) = sender_option { + sender.send(job).await?; } + + Ok(()) } } @@ -51,7 +55,8 @@ impl SplitQueue { pub(crate) struct FileDetails { pub(crate) path: PathBuf, pub(crate) partial_path: PathBuf, - pub(crate) file_size: u64, + pub(crate) size: u64, + pub(crate) signature: Option>, } impl FileDetails { @@ -90,7 +95,7 @@ pub(crate) async fn writer( .await .ok_or(Error::missing_queue())?; - while position != details.file_size { + while position != details.size { let job = receiver.recv().await?; match job.index.cmp(&position) { @@ -104,23 +109,38 @@ pub(crate) async fn writer( } // if the chunk is at the current position, write it Ordering::Equal => { - write_data(&mut writer, &job.data, &mut position, details.file_size).await?; + write_data(&mut writer, &job.data, &mut position, details.size).await?; confirmation_sender.send((id, job.index)).await?; } } // write all concurrent chunks from the cache while let Some(job) = cache.remove(&position) { - write_data(&mut writer, &job.data, &mut position, details.file_size).await?; + write_data(&mut writer, &job.data, &mut position, details.size).await?; } } info!("writer wrote all expected bytes"); writer.flush().await?; // flush the writer + + // verify the signature if provided by the sender + if let Some(ref remote_signature) = details.signature { + let local_hash = hash_file(&details.partial_path).await?; // hash the file + + if local_hash.as_bytes() != &remote_signature[..] { + message_sender.send(Message::failure(id, 0)).await?; // notify the sender + remove_file(&details.partial_path).await?; // remove the partial file + writer_queue.pop_queue(&id).await; // remove the queue + return Err(Error::failure(0)); + } else { + info!("{:?} passed signature verification", details.path) + } + } + details.rename().await?; // rename the file writer_queue.pop_queue(&id).await; // remove the queue - send_end_message(&message_sender, id).await?; + message_sender.send(Message::end(id)).await?; // send the end message Ok(()) } @@ -139,12 +159,3 @@ async fn write_data( *position += len; // advance the position writer.write_all(&buffer[..len as usize]).await // write the data } - -async fn send_end_message(sender: &AsyncSender, id: u32) -> Result<()> { - let end_message = Message { - message: Some(message::Message::End(End { id })), - }; - - sender.send(end_message).await?; - Ok(()) -} diff --git a/src/sender/mod.rs b/src/sender/mod.rs index 8aa52de..48c0256 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -14,13 +14,11 @@ use tokio::select; use tokio::sync::{Mutex, RwLock, Semaphore}; use tokio::time::{interval, Instant}; -use crate::items::{ - message, Confirmations, Done, End, FileDetail, Manifest, Message, Start, StartIndex, -}; +use crate::items::{message, Confirmations, FileDetail, Manifest, Message, StartIndex}; use crate::sender::reader::reader; use crate::{ - read_message, socket_factory, write_message, Options, Result, TransferStats, ID_SIZE, - INDEX_SIZE, MAX_RETRIES, REQUEUE_INTERVAL, TRANSFER_BUFFER_SIZE, + hash_file, read_message, socket_factory, write_message, Error, Options, Result, TransferStats, + ID_SIZE, INDEX_SIZE, MAX_RETRIES, REQUEUE_INTERVAL, TRANSFER_BUFFER_SIZE, }; mod reader; @@ -43,53 +41,12 @@ pub(crate) async fn main( ) -> Result<()> { info!("sending {} -> {}", options.source, options.destination); - // collect the files and directories to send - let mut files = Vec::new(); - let mut dirs = Vec::new(); - files_and_dirs(&options.source.file_path, &mut files, &mut dirs)?; - - let mut file_map: HashMap = HashMap::with_capacity(files.len()); - - for (index, mut file) in files.into_iter().enumerate() { - let file_size = tokio::fs::metadata(&file).await?.len(); - stats.total_data.fetch_add(file_size as usize, Relaxed); - - if file == options.source.file_path { - file = PathBuf::from(file.iter().last().unwrap()) - } else { - file = file - .strip_prefix(&options.source.file_path) - .unwrap() - .to_path_buf(); - } - - let file_path = file.to_string_lossy().replace('\\', "/"); - - file_map.insert( - index as u32, - FileDetail { - file_path, - file_size, - }, - ); - } - - let directories = dirs - .into_iter() - .map(|dir| { - if let Ok(file_path) = dir.strip_prefix(&options.source.file_path) { - file_path.to_string_lossy().to_string() - } else { - dir.to_string_lossy().to_string() - } - }) - .map(|dir| dir.replace('\\', "/")) - .collect(); - - let manifest = Manifest { - directories, - files: file_map, - }; + let manifest = build_manifest( + options.source.file_path.clone(), + options.verify, + &stats.total_data, + ) + .await?; debug!("sending manifest: {:?}", manifest); write_message(&mut str_stream, &manifest).await?; @@ -111,13 +68,20 @@ pub(crate) async fn main( // a semaphore to control the send rate let send = Arc::new(Semaphore::new(0)); - + // a semaphore to control the readers let read = Arc::new(Semaphore::new(1_000)); + // just confirmation messages let (confirmation_sender, confirmation_receiver) = kanal::unbounded_async(); - let (end_sender, end_receiver) = kanal::unbounded_async(); - - tokio::spawn(split_receiver(rts_stream, confirmation_sender, end_sender)); + // end and failure messages + let (controller_sender, controller_receiver) = kanal::unbounded_async(); + + // receive messages from the receiver into two channels based on message type + let receiver_handle = tokio::spawn(split_receiver( + rts_stream, + confirmation_sender, + controller_sender, + )); let confirmation_handle = tokio::spawn(receive_confirmations( confirmation_receiver, @@ -127,9 +91,7 @@ pub(crate) async fn main( read.clone(), )); - let rate = options.pps(); - let semaphore = send.clone(); - tokio::spawn(add_permits_at_rate(semaphore, rate)); + tokio::spawn(add_permits_at_rate(send.clone(), options.pps())); let controller_handle = tokio::spawn(controller( str_stream, @@ -138,7 +100,7 @@ pub(crate) async fn main( read, stats.confirmed_data, options.source.file_path, - end_receiver, + controller_receiver, options.max, )); @@ -157,14 +119,18 @@ pub(crate) async fn main( let sender_future = async { for handle in handles { - _ = handle.await; + handle.await??; // propagate errors } + + Ok(()) }; + // propagate the first error select! { result = confirmation_handle => result?, result = controller_handle => result?, - _ = sender_future => { warn!("senders exited"); Ok(()) }, + result = sender_future => result, + result = receiver_handle => result?, } } @@ -174,17 +140,17 @@ async fn sender( socket: UdpSocket, cache: JobCache, send: Arc, -) { +) -> Result<()> { let mut retries = 0; while retries < MAX_RETRIES { - let permit = send.acquire().await.unwrap(); // acquire a permit - let mut job = job_receiver.recv().await.unwrap(); // get a job from the queue + let permit = send.acquire().await?; // acquire a permit + let mut job = job_receiver.recv().await?; // get a job from the queue // send the job data to the socket if let Err(error) = socket.send(&job.data).await { error!("failed to send data: {}", error); - job_sender.send(job).await.unwrap(); // put the job back in the queue + job_sender.send(job).await?; // put the job back in the queue retries += 1; } else { // cache the job @@ -195,6 +161,12 @@ async fn sender( permit.forget(); } + + if retries == MAX_RETRIES { + Err(Error::max_retries()) + } else { + Ok(()) + } } #[allow(clippy::too_many_arguments)] @@ -205,7 +177,7 @@ async fn controller( read: Arc, confirmed_data: Arc, base_path: PathBuf, - end_receiver: AsyncReceiver, + controller_receiver: AsyncReceiver, max: usize, ) -> Result<()> { let mut id = 0; @@ -213,26 +185,19 @@ async fn controller( loop { while active < max { - match files.files.remove(&id) { + match files.files.get(&id) { None => break, Some(file_details) => { - let message = Message { - message: Some(message::Message::Start(Start { id })), - }; - write_message(&mut control_stream, &message).await?; - - let file_path = base_path.join(&file_details.file_path); - - let start_index: StartIndex = read_message(&mut control_stream).await?; - confirmed_data.fetch_add(start_index.index as usize, Relaxed); - - tokio::spawn(reader( - file_path, - job_sender.clone(), - read.clone(), - start_index.index, + start_file_transfer( + &mut control_stream, id, - )); + file_details, + &base_path, + &job_sender, + &read, + &confirmed_data, + ) + .await?; id += 1; active += 1; @@ -240,10 +205,35 @@ async fn controller( } } - debug!("waiting for a file to end"); - let end = end_receiver.recv().await.unwrap(); - debug!("received end message: {:?} | active {}", end, active); - active -= 1; + debug!("waiting for a message"); + + match controller_receiver.recv().await?.message { + Some(message::Message::End(end)) => { + debug!("received end message {} | active {}", end.id, active); + + files.files.remove(&end.id); + active -= 1; + } + Some(message::Message::Failure(failure)) => { + debug!("received failure message {:?}", failure); + + if let Some(file_details) = files.files.get(&failure.id) { + start_file_transfer( + &mut control_stream, + failure.id, + file_details, + &base_path, + &job_sender, + &read, + &confirmed_data, + ) + .await?; + } else { + warn!("received failure message for unknown file {}", failure.id); + } + } + _ => unreachable!(), // only end and failure messages are sent to this receiver + } if files.files.is_empty() && active == 0 { break; @@ -251,11 +241,34 @@ async fn controller( } debug!("all files completed, sending done message"); + write_message(&mut control_stream, &Message::done()).await?; - let message = Message { - message: Some(message::Message::Done(Done {})), - }; - write_message(&mut control_stream, &message).await?; + Ok(()) +} + +async fn start_file_transfer( + mut control_stream: &mut TcpStream, + id: u32, + file_details: &FileDetail, + base_path: &PathBuf, + job_sender: &AsyncSender, + read: &Arc, + confirmed_data: &Arc, +) -> Result<()> { + write_message(&mut control_stream, &Message::start(id)).await?; + + let file_path = base_path.join(&file_details.path); + + let start_index: StartIndex = read_message(&mut control_stream).await?; + confirmed_data.fetch_add(start_index.index as usize, Relaxed); + + tokio::spawn(reader( + file_path, + job_sender.clone(), + read.clone(), + start_index.index, + id, + )); Ok(()) } @@ -378,11 +391,11 @@ fn files_and_dirs( Ok(()) } -/// split the message stream into `Confirmation` and `End` messages +/// split the message stream into `Confirmation` and `End + Failure` messages async fn split_receiver( mut reader: R, confirmation_sender: AsyncSender, - end_sender: AsyncSender, + controller_sender: AsyncSender, ) -> Result<()> { loop { let message: Message = read_message(&mut reader).await?; @@ -391,10 +404,72 @@ async fn split_receiver( Some(message::Message::Confirmations(confirmations)) => { confirmation_sender.send(confirmations).await? } - Some(message::Message::End(end)) => end_sender.send(end).await?, + Some(message::Message::End(_)) => controller_sender.send(message).await?, + Some(message::Message::Failure(_)) => controller_sender.send(message).await?, _ => { error!("received {:?}", message); } } } } + +async fn build_manifest( + source: PathBuf, + verify: bool, + total_data: &Arc, +) -> Result { + // collect the files and directories to send + let mut files = Vec::new(); + let mut dirs = Vec::new(); + files_and_dirs(&source, &mut files, &mut dirs)?; + + let mut file_map: HashMap = HashMap::with_capacity(files.len()); + + for (index, mut file) in files.into_iter().enumerate() { + let size = tokio::fs::metadata(&file).await?.len(); + total_data.fetch_add(size as usize, Relaxed); + + let signature = if verify { + let hash = hash_file(&file).await?; + Some(hash.as_bytes().to_vec()) + } else { + None + }; + + if file == source { + file = PathBuf::from(file.iter().last().unwrap()) + } else { + file = file.strip_prefix(&source).unwrap().to_path_buf(); + } + + let path = file.to_string_lossy().replace('\\', "/"); + + file_map.insert( + index as u32, + FileDetail { + path, + size, + signature, + }, + ); + } + + let directories = dirs + .into_iter() + .map(|dir| { + if let Ok(file_path) = dir.strip_prefix(&source) { + file_path.to_string_lossy().to_string() + } else { + dir.to_string_lossy().to_string() + } + }) + .map(|dir| dir.replace('\\', "/")) + .collect(); + + let manifest = Manifest { + directories, + files: file_map, + }; + + Ok(manifest) +}