Skip to content

Commit

Permalink
added extra debugging and works on controller stability
Browse files Browse the repository at this point in the history
  • Loading branch information
chanderlud committed Dec 15, 2023
1 parent 5d6ee1f commit cf67ab3
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 38 deletions.
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ async fn write_message<W: AsyncWrite + Unpin, M: Message, C: StreamCipherExt + ?

writer.write_all(&buffer).await?;

debug!("sent message: {:?}", message);
Ok(())
}

Expand All @@ -750,6 +751,7 @@ async fn read_message<
cipher.apply_keystream(&mut buffer[..]);

let message = M::decode(&buffer[..])?;
debug!("received message: {:?}", message);

Ok(message)
}
Expand Down
11 changes: 9 additions & 2 deletions src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async fn receiver(queue: WriterQueue, socket: UdpSocket) -> Result<()> {

async fn controller<C: StreamCipherExt + ?Sized>(
mut str_stream: TcpStream,
mut files: HashMap<u32, FileDetails>,
files: HashMap<u32, FileDetails>,
writer_queue: WriterQueue,
confirmation_sender: AsyncSender<(u32, u64)>,
confirmed_data: Arc<AtomicUsize>,
Expand All @@ -227,7 +227,13 @@ async fn controller<C: StreamCipherExt + ?Sized>(
Some(message::Message::Start(message)) => {
debug!("received start message: {:?}", message);

let details = files.remove(&message.id).unwrap();
let details = match files.get(&message.id) {
Some(details) => details,
None => {
error!("received start message for unknown id {}", message.id);
continue;
}
};

let start_index = if details.partial_path.exists() {
info!("partial file exists, resuming transfer");
Expand All @@ -254,6 +260,7 @@ async fn controller<C: StreamCipherExt + ?Sized>(
let writer_queue = writer_queue.clone();
let confirmation_sender = confirmation_sender.clone();
let message_sender = message_sender.clone();
let details = details.clone();

async move {
let result = writer::<C>(
Expand Down
1 change: 1 addition & 0 deletions src/receiver/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl SplitQueue {
}

/// stores file details for writer
#[derive(Clone)]
pub(crate) struct FileDetails {
pub(crate) path: PathBuf,
pub(crate) partial_path: PathBuf,
Expand Down
84 changes: 48 additions & 36 deletions src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,29 +212,27 @@ async fn controller<C: StreamCipherExt + ?Sized>(
mut cipher: Box<C>,
) -> Result<()> {
let mut id = 0;
let mut active = 0;
let mut active: HashMap<u32, FileDetail> = HashMap::with_capacity(max);

loop {
if !files.is_empty() {
while active < max {
match files.get(&id) {
None => id += 1,
Some(details) => {
start_file_transfer(
&mut control_stream,
id,
details,
&base_path,
&job_sender,
&read,
&confirmed_data,
&mut cipher,
)
.await?;

active += 1;
id += 1
}
while active.len() < max && !files.is_empty() {
match files.remove(&id) {
None => id += 1,
Some(details) => {
start_file_transfer(
&mut control_stream,
id,
&details,
&base_path,
&job_sender,
&read,
&confirmed_data,
&mut cipher,
)
.await?;

active.insert(id, details);
id += 1
}
}
}
Expand All @@ -243,14 +241,15 @@ async fn controller<C: StreamCipherExt + ?Sized>(

match controller_receiver.recv().await?.message {
Some(message::Message::End(end)) => {
debug!("received end message {} | active {}", end.id, active);

files.remove(&end.id);
active -= 1;
if active.remove(&end.id).is_none() {
warn!("received end message for unknown file {}", end.id);
} else {
debug!("received end message {} | active {}", end.id, active.len());
}
}
Some(message::Message::Failure(failure)) => {
if failure.reason == 0 {
if let Some(details) = files.get(&failure.id) {
if let Some(details) = active.get(&failure.id) {
warn!(
"transfer {} failed signature verification, retrying...",
failure.id
Expand All @@ -277,15 +276,15 @@ async fn controller<C: StreamCipherExt + ?Sized>(
}
} else {
warn!(
"received failure message {} for unknown file {}",
"received unknown failure message {} for {}",
failure.reason, failure.id
);
}
}
_ => unreachable!(), // only end and failure messages are sent to this receiver
}

if files.is_empty() && active == 0 {
if files.is_empty() && active.is_empty() {
break;
}
}
Expand All @@ -312,14 +311,27 @@ async fn start_file_transfer<C: StreamCipherExt + ?Sized>(
let start_index: StartIndex = read_message(&mut control_stream, cipher).await?;
confirmed_data.fetch_add(start_index.index as usize, Relaxed);

tokio::spawn(reader(
base_path.join(&details.path),
job_sender.clone(),
read.clone(),
start_index.index,
id,
details.crypto.as_ref().map(make_cipher),
));
tokio::spawn({
let job_sender = job_sender.clone();
let read = read.clone();
let details = details.clone();
let base_path = base_path.to_path_buf();

async move {
let result = reader(
base_path.join(&details.path),
job_sender,
read,
start_index.index,
id,
details.crypto.as_ref().map(make_cipher),
).await;

if let Err(error) = result {
error!("reader failed: {:?}", error);
}
}
});

Ok(())
}
Expand Down

0 comments on commit cf67ab3

Please sign in to comment.