Skip to content

Commit

Permalink
potentially fixes a bug where the receiver is still writing when it e…
Browse files Browse the repository at this point in the history
…xits
  • Loading branch information
chanderlud committed Nov 29, 2023
1 parent f58880a commit 11e7ce3
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 21 deletions.
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::{error, process};
use std::fmt::{Display, Formatter};
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
Expand All @@ -7,6 +6,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::Duration;
use std::{error, process};

use async_ssh2_tokio::{AuthMethod, Client, ServerCheckMethod};
use clap::Parser;
Expand Down Expand Up @@ -317,6 +317,7 @@ async fn main() {
let mut options = Options::parse();

// TODO choose a better log file location
// local client should log in execution location
simple_logging::log_to_file("cccp.log", options.log_level).expect("failed to log");

if options.start_port > options.end_port {
Expand Down
43 changes: 29 additions & 14 deletions src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::time::Duration;

use log::{debug, error, info, warn};
use tokio::fs::rename;
use tokio::{io, select};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time::{interval, timeout};
use tokio::time::{interval, sleep, timeout};
use tokio::{io, select};

use crate::receiver::metadata::Metadata;
use crate::receiver::writer::writer;
Expand All @@ -23,6 +23,8 @@ use crate::{
mod metadata;
mod writer;

type WriterQueue = Arc<deadqueue::limited::Queue<Vec<u8>>>;

pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<()> {
if options.destination.file_path.is_dir() {
info!("destination is a folder, reformatting path with target file");
Expand Down Expand Up @@ -62,7 +64,7 @@ pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<(

info!("opened sockets");

let writer_queue: Queue<Vec<u8>> = Default::default();
let writer_queue: WriterQueue = Arc::new(deadqueue::limited::Queue::new(100));
let confirmation_queue: Queue<u64> = Default::default();

let writer_handle = tokio::spawn(writer(
Expand All @@ -73,12 +75,23 @@ pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<(
meta_data,
));

let confirmation_handle = tokio::spawn(
send_confirmations(
control_stream,
confirmation_queue,
stats.confirmed_data)
);
let confirmation_handle = tokio::spawn({
let writer_queue = writer_queue.clone();

async move {
let result = send_confirmations(
control_stream,
confirmation_queue,
stats.confirmed_data,
).await;

while !writer_queue.is_empty() {
sleep(Duration::from_secs(1)).await;
}

result
}
});

let handles: Vec<_> = sockets
.into_iter()
Expand All @@ -87,10 +100,12 @@ pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<(

let receiver_future = async {
for handle in handles {
handle.await?;
_ = handle.await;
}

Ok::<(), io::Error>(())
while !writer_queue.is_empty() {
sleep(Duration::from_secs(1)).await;
}
};

select! {
Expand All @@ -105,13 +120,13 @@ pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<(
)
.await?;
},
receiver_future = receiver_future => error!("receiver(s) failed {:?}", receiver_future),
_ = receiver_future => info!("receiver(s) exited"),
}

Ok(())
}

pub(crate) async fn receiver(queue: Queue<Vec<u8>>, socket: UdpSocket) {
pub(crate) async fn receiver(queue: WriterQueue, socket: UdpSocket) {
let mut buf = [0; INDEX_SIZE + TRANSFER_BUFFER_SIZE];
let mut retries = 0;

Expand All @@ -122,7 +137,7 @@ pub(crate) async fn receiver(queue: Queue<Vec<u8>>, socket: UdpSocket) {
retries = 0;

if read > 0 {
queue.push(buf[..read].to_vec());
queue.push(buf[..read].to_vec()).await;
} else {
warn!("0 byte read?");
}
Expand Down
3 changes: 2 additions & 1 deletion src/receiver/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ use tokio::fs::OpenOptions;
use tokio::io::{self, AsyncSeekExt, AsyncWrite, AsyncWriteExt};

use crate::receiver::metadata::Metadata;
use crate::receiver::WriterQueue;
use crate::{Queue, INDEX_SIZE, TRANSFER_BUFFER_SIZE};

pub(crate) async fn writer(
path: PathBuf,
writer_queue: Queue<Vec<u8>>,
writer_queue: WriterQueue,
file_size: u64,
confirmation_queue: Queue<u64>,
mut metadata: Metadata,
Expand Down
9 changes: 4 additions & 5 deletions src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,11 @@ async fn receive_confirmations(
let mut cache = cache.write().await;

// requeue and remove entries
for key in keys_to_remove {
if let Some(mut unconfirmed) = cache.remove(&key) {
if lost_confirmations.contains(&key) {
for index in keys_to_remove {
if let Some(mut unconfirmed) = cache.remove(&index) {
if lost_confirmations.contains(&index) {
// the job is not requeued because it was confirmed while outside the cache
debug!("found lost confirmation for {}", key);
lost_confirmations.remove(&key);
lost_confirmations.remove(&index);
} else {
unconfirmed.cached_at = None;
queue.push(unconfirmed);
Expand Down

0 comments on commit 11e7ce3

Please sign in to comment.