Skip to content

Commit

Permalink
folder support WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
chanderlud committed Dec 5, 2023
1 parent 63efb50 commit 8673831
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 31 deletions.
57 changes: 57 additions & 0 deletions scratch.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
how to get multi file support workin

1. sender needs to be able to decide which files to send
- /a/path/to/a/directory -> every file and sub directory and sub file...
- /a/path/to/some/files/* -> every file in the directory

2. sender needs to assign each file an id + tell the receiver what directories need to be created
- probably gotta bite the bullet and use a protobuf here
- directories should probably be constructed by the receiver?
- the receiver needs to have a map of file ids to destination file paths

struct Message {
inner: MessageType
}

enum MessageType {
Files {
directories: Vec<&str>,
files: HashMap<u32, (&str, u64)> // file path, file size
},
Confirmations {
indexes: HashMap<u32, Vec<u64>>,
},
StartIndex {
id: u32,
index: u64,
},
Start {
id: u32,
},
End {
id: u32, // this should be sent by the receiver when the writer exits so the sender can clean up
}
}

3. the reader needs some changes
- i guess multiple readers will be needed. the id's will need to be added to the packets too
- one queue can be used
- each reader will need its own read semaphore, the control server will need to keep track of them all (cry)

4. the receiver needs to spawn writers and created queues for each id it receives + create the directories
- should be pretty easy

5. the control stream is gonna be a bit more complicated now
- should probably communicate in protobufs
- needs to control the sender and receiver exiting. if the stream breaks it can exit both ends
- we really don't need to care about the reader or writer's results/completion unless they are Err


things to think about
- probably need a system to limit the number of concurrent files
this will include sending the ids to the receiver as they are started
so writers and queues are only created when needed
readers and send queues should be considered too ig
- i guess every time a new file gets started there needs to be a little handshake like
sender --[Start]--> receiver --[StartIndex]--> sender
and then it can start working
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use tokio::{io, select};
mod receiver;
mod sender;

// type aliases used throughout
type UnlimitedQueue<T> = Arc<deadqueue::unlimited::Queue<T>>;
type LimitedQueue<T> = Arc<deadqueue::limited::Queue<T>>;
type Result<T> = std::result::Result<T, Error>;

const READ_BUFFER_SIZE: usize = 10_000_000;
Expand Down
52 changes: 29 additions & 23 deletions src/receiver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
use std::collections::HashMap;
use std::mem;
use std::net::IpAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::Duration;

use deadqueue::limited::Queue;
use log::{debug, error, info, warn};
use tokio::fs::{metadata, rename};
use tokio::fs::metadata;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpStream, UdpSocket};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time::{interval, timeout};
use tokio::{io, select};

use crate::receiver::writer::writer;
use crate::receiver::writer::{writer, SplitQueue};
use crate::{
socket_factory, LimitedQueue, Options, Result, TransferStats, UnlimitedQueue, INDEX_SIZE,
MAX_RETRIES, RECEIVE_TIMEOUT, TRANSFER_BUFFER_SIZE,
socket_factory, Options, Result, TransferStats, UnlimitedQueue, INDEX_SIZE, MAX_RETRIES,
RECEIVE_TIMEOUT, TRANSFER_BUFFER_SIZE,
};

mod writer;

type WriterQueue = LimitedQueue<Job>;
type WriterQueue = Arc<SplitQueue>;

#[derive(Clone)]
struct Job {
Expand Down Expand Up @@ -83,15 +83,17 @@ pub(crate) async fn main(

info!("opened sockets");

let writer_queue: WriterQueue = Arc::new(Queue::new(1_000));
let confirmation_queue: UnlimitedQueue<u64> = Default::default();
let writer_queue: WriterQueue = Default::default();
let confirmation_queue: UnlimitedQueue<(u32, u64)> = Default::default();

let writer_handle = tokio::spawn(writer(
options.destination.file_path,
partial_path.clone(),
writer_queue.clone(),
file_size,
confirmation_queue.clone(),
start_index,
0,
));

let confirmation_handle = tokio::spawn(send_confirmations(
Expand All @@ -115,13 +117,6 @@ pub(crate) async fn main(
result = confirmation_handle => error!("confirmation sender failed {:?}", result),
result = writer_handle => {
info!("writer finished with result {:?}", result);

// rename the partial file to the original file
rename(
&partial_path,
&options.destination.file_path,
)
.await?;
},
_ = receiver_future => info!("receiver(s) exited"),
}
Expand All @@ -137,9 +132,11 @@ async fn receiver(queue: WriterQueue, socket: UdpSocket) {
match timeout(RECEIVE_TIMEOUT, socket.recv(&mut buf)).await {
Ok(Ok(read)) if read > 0 => {
retries = 0; // reset retries
let data = buf[INDEX_SIZE..].try_into().unwrap();
// TODO let id = ...
let id = 0;
let index = u64::from_be_bytes(buf[..INDEX_SIZE].try_into().unwrap());
queue.push(Job { data, index }).await;
let data = buf[INDEX_SIZE..].try_into().unwrap();
queue.push(Job { data, index }, id).await;
}
Ok(Ok(_)) => warn!("0 byte read?"), // this should never happen
Ok(Err(_)) | Err(_) => retries += 1, // catch errors and timeouts
Expand All @@ -149,10 +146,10 @@ async fn receiver(queue: WriterQueue, socket: UdpSocket) {

async fn send_confirmations(
mut control_stream: TcpStream,
queue: UnlimitedQueue<u64>,
queue: UnlimitedQueue<(u32, u64)>,
confirmed_data: Arc<AtomicUsize>,
) -> io::Result<()> {
let data: Arc<Mutex<Vec<u64>>> = Default::default();
let data: Arc<Mutex<Vec<(u32, u64)>>> = Default::default();

let sender_handle: JoinHandle<io::Result<()>> = tokio::spawn({
let data = data.clone();
Expand All @@ -170,19 +167,28 @@ async fn send_confirmations(
}

// take the data out of the mutex
let indexes = mem::take(&mut *data);
let confirmations = mem::take(&mut *data);
drop(data); // release the lock on data

write_indexes(&mut control_stream, &indexes).await?;
let map: HashMap<u32, Vec<u64>> = HashMap::new();

// group the confirmations by id
let map = confirmations.into_iter().fold(map, |mut map, (id, index)| {
map.entry(id).or_default().push(index);
map
});

// TODO send confirmations
// write_indexes(&mut control_stream, &indexes).await?;
}
}
});

let future = async {
loop {
let index = queue.pop().await; // wait for a confirmation
let confirmation = queue.pop().await; // wait for a confirmation
confirmed_data.fetch_add(TRANSFER_BUFFER_SIZE, Relaxed); // increment the confirmed data counter
data.lock().await.push(index); // push the index to the data vector
data.lock().await.push(confirmation); // push the index to the data vector
}
};

Expand Down
56 changes: 49 additions & 7 deletions src/receiver/writer.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,67 @@
use deadqueue::limited::Queue;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::io::SeekFrom;
use std::path::PathBuf;

use log::{debug, info};
use tokio::fs::OpenOptions;
use tokio::fs::{rename, OpenOptions};
use tokio::io::{self, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter};
use tokio::sync::RwLock;

use crate::receiver::{Job, WriterQueue};
use crate::{UnlimitedQueue, TRANSFER_BUFFER_SIZE, WRITE_BUFFER_SIZE};

#[derive(Default)]
pub(crate) struct SplitQueue {
inner: RwLock<HashMap<u32, Queue<Job>>>,
}

impl SplitQueue {
pub(crate) async fn push_queue(&self, id: u32) {
let mut inner = self.inner.write().await;

inner.insert(id, Queue::new(1_000));
}

pub(crate) async fn pop_queue(&self, id: &u32) {
let mut inner = self.inner.write().await;

inner.remove(id);
}

pub(crate) async fn push(&self, job: Job, id: u32) {
let inner = self.inner.read().await;

if let Some(queue) = inner.get(&id) {
queue.push(job).await;
}
}

pub(crate) async fn pop(&self, id: &u32) -> Option<Job> {
let inner = self.inner.read().await;

if let Some(queue) = inner.get(id) {
Some(queue.pop().await)
} else {
None
}
}
}

pub(crate) async fn writer(
path: PathBuf,
partial_path: PathBuf,
writer_queue: WriterQueue,
file_size: u64,
confirmation_queue: UnlimitedQueue<u64>,
confirmation_queue: UnlimitedQueue<(u32, u64)>,
mut position: u64,
id: u32,
) -> io::Result<()> {
let file = OpenOptions::new()
.write(true)
.create(true)
.open(path)
.open(&partial_path)
.await?;

let mut writer = BufWriter::with_capacity(WRITE_BUFFER_SIZE, file);
Expand All @@ -31,21 +72,21 @@ pub(crate) async fn writer(
let mut cache: BTreeMap<u64, Job> = BTreeMap::new();

while position != file_size {
let job = writer_queue.pop().await;
let job = writer_queue.pop(&id).await.unwrap();

match job.index.cmp(&position) {
// if the chunk is behind the current position, it was already written
Ordering::Less => continue,
// if the chunk is ahead of the current position, save it for later
Ordering::Greater => {
confirmation_queue.push(job.index);
confirmation_queue.push((id, job.index));
cache.insert(job.index, job);
continue;
}
// if the chunk is at the current position, write it
Ordering::Equal => {
write_data(&mut writer, &job.data, &mut position, file_size).await?;
confirmation_queue.push(job.index);
confirmation_queue.push((id, job.index));
}
}

Expand All @@ -57,6 +98,7 @@ pub(crate) async fn writer(

info!("writer wrote all expected bytes");
writer.flush().await?;
rename(&partial_path, path).await?;

Ok(())
}
Expand Down

0 comments on commit 8673831

Please sign in to comment.