Skip to content

Commit

Permalink
concurrent file processing with improved error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
chanderlud committed Dec 22, 2023
1 parent 1ff4b47 commit 7c8093f
Showing 1 changed file with 68 additions and 54 deletions.
122 changes: 68 additions & 54 deletions src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;
use std::time::Duration;

use futures::stream::iter;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use kanal::{AsyncReceiver, AsyncSender};
use log::{debug, error, info, warn};
use tokio::fs::{create_dir_all, metadata};
Expand Down Expand Up @@ -58,65 +58,64 @@ pub(crate) async fn main(
manifest.directories.len()
);

let completed = Arc::new(Mutex::new(Vec::new()));

let files: HashMap<u32, FileDetails> = iter(manifest.files.into_iter())
.filter_map(|(id, details)| {
// formats the path to the file locally
let path = if is_dir {
options.destination.file_path.join(&details.path)
} else {
options.destination.file_path.clone()
};
let mut completed = Vec::new();

let filtered_files = manifest.files.into_iter().filter_map(|(id, details)| {
// formats the path to the file locally
let path = if is_dir {
options.destination.file_path.join(&details.path)
} else {
options.destination.file_path.clone()
};

if path.exists() && !options.overwrite {
completed.push(id);
None
} else {
Some((id, details, path))
}
});

let files: HashMap<u32, FileDetails> = iter(filtered_files)
.map(|(id, details, path)| {
let total_data = stats.total_data.clone();
let completed = completed.clone();

async move {
if path.exists() && !options.overwrite {
completed.lock().await.push(id);
None
// append partial extension to the existing extension, if there is one
let partial_extension = if let Some(extension) = path.extension() {
extension
.to_str()
.ok_or(Error::invalid_extension())?
.to_owned()
+ ".partial"
} else {
// append partial extension to the existing extension, if there is one
let partial_extension = if let Some(extension) = path.extension() {
extension.to_str()?.to_owned() + ".partial"
} else {
".partial".to_string()
};

let partial_path = path.with_extension(partial_extension);

let start_index = if partial_path.exists() {
let metadata = metadata(&partial_path).await.ok()?;
// the file is written sequentially, so we can calculate the start index by rounding down to the nearest multiple of the transfer buffer size
let chunks = metadata.len().div_floor(TRANSFER_BUFFER_SIZE as u64);
chunks * TRANSFER_BUFFER_SIZE as u64
} else {
0
};

// increment the total data counter
total_data.fetch_add((details.size - start_index) as usize, Relaxed);

Some((
".partial".to_string()
};

let partial_path = path.with_extension(partial_extension);
let start_index = start_index(&partial_path).await?;

// increment the total data counter
total_data.fetch_add((details.size - start_index) as usize, Relaxed);

Ok::<(u32, FileDetails), Error>((
id,
FileDetails {
id,
FileDetails {
id,
path,
partial_path,
size: details.size,
start_index,
signature: details.signature,
crypto: details.crypto,
},
))
}
path,
partial_path,
size: details.size,
start_index,
signature: details.signature,
crypto: details.crypto,
},
))
}
})
.collect()
.await;
.buffer_unordered(options.threads)
.try_collect()
.await?;

let completed = completed.lock().await.clone();
debug!(
"processed files | files={} completed={}",
files.len(),
Expand Down Expand Up @@ -242,7 +241,7 @@ async fn receiver(queue: WriterQueue, socket: UdpSocket) -> Result<()> {

async fn controller<C: StreamCipherExt + ?Sized>(
mut str_stream: TcpStream,
files: HashMap<u32, FileDetails>,
mut files: HashMap<u32, FileDetails>,
writer_queue: WriterQueue,
confirmation_sender: AsyncSender<(u32, u64)>,
message_sender: AsyncSender<Message>,
Expand All @@ -255,15 +254,19 @@ async fn controller<C: StreamCipherExt + ?Sized>(
Some(message::Message::Start(message)) => {
debug!("received start message: {:?}", message);

let details = match files.get(&message.id) {
let details = match files.get_mut(&message.id) {
Some(details) => details,
None => {
error!("received start message for unknown id {}", message.id);
continue;
}
};

// TODO there is an edge case here where the transfer fails & this start_index is no longer valid
// this handles an edge case where a partially transferred file fails and needs to be retried from the start
if details.start_index > 0 && !details.partial_path.exists() {
details.start_index = 0;
}

// send the start index to the remote client
write_message(
&mut str_stream,
Expand Down Expand Up @@ -419,3 +422,14 @@ fn format_path(path: &Path) -> Result<PathBuf> {

Ok(path)
}

async fn start_index(path: &Path) -> Result<u64> {
if path.exists() {
let metadata = metadata(&path).await?;
// the file is written sequentially, so we can calculate the start index by rounding down to the nearest multiple of the transfer buffer size
let chunks = metadata.len().div_floor(TRANSFER_BUFFER_SIZE as u64);
Ok(chunks * TRANSFER_BUFFER_SIZE as u64)
} else {
Ok(0)
}
}

0 comments on commit 7c8093f

Please sign in to comment.