Skip to content

Commit

Permalink
working on the writer queue
Browse files Browse the repository at this point in the history
  • Loading branch information
chanderlud committed Dec 10, 2023
1 parent eb9549f commit de30e8d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 16 deletions.
22 changes: 21 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use clap::Parser;
use futures::stream::iter;
use futures::{StreamExt, TryStreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use kanal::{AsyncReceiver, SendError};
use kanal::{AsyncReceiver, ReceiveError, SendError};
use log::{debug, error, info, warn, LevelFilter};
use prost::Message;
use regex::Regex;
Expand Down Expand Up @@ -65,6 +65,8 @@ enum ErrorKind {
Decode(prost::DecodeError),
Join(tokio::task::JoinError),
Send(SendError),
Receive(ReceiveError),
MissingQueue,
}

impl From<io::Error> for Error {
Expand Down Expand Up @@ -107,6 +109,14 @@ impl From<SendError> for Error {
}
}

impl From<ReceiveError> for Error {
fn from(error: ReceiveError) -> Self {
Self {
kind: ErrorKind::Receive(error),
}
}
}

impl Termination for Error {
fn report(self) -> ExitCode {
ExitCode::from(match self.kind {
Expand All @@ -118,10 +128,20 @@ impl Termination for Error {
ErrorKind::Decode(_) => 4,
ErrorKind::Join(_) => 5,
ErrorKind::Send(_) => 6,
ErrorKind::Receive(_) => 7,
ErrorKind::MissingQueue => 7,
})
}
}

impl Error {
fn missing_queue() -> Self {
Self {
kind: ErrorKind::MissingQueue,
}
}
}

#[derive(Parser, Clone, Debug)]
struct Options {
#[clap(
Expand Down
27 changes: 12 additions & 15 deletions src/receiver/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::RwLock;

use crate::items::{message, End, Message};
use crate::receiver::{Job, WriterQueue};
use crate::{Result, TRANSFER_BUFFER_SIZE, WRITE_BUFFER_SIZE};
use crate::{Error, Result, TRANSFER_BUFFER_SIZE, WRITE_BUFFER_SIZE};

#[derive(Default)]
pub(crate) struct SplitQueue {
Expand All @@ -32,26 +32,19 @@ impl SplitQueue {
self.senders.write().await.remove(id);
}

pub(crate) async fn get_receiver(&self, id: &u32) -> Option<AsyncReceiver<Job>> {
let receivers = self.receivers.read().await;
receivers.get(id).cloned()
}

// TODO if the the receiver fills, the sender will block & block push_queue
pub(crate) async fn send(&self, job: Job, id: u32) {
let senders = self.senders.read().await;

if let Some(sender) = senders.get(&id) {
sender.send(job).await.unwrap();
}
}

pub(crate) async fn recv(&self, id: &u32) -> Option<Job> {
let receiver = {
let inner = self.receivers.read().await;
inner.get(id).cloned()
};

if let Some(receiver) = receiver {
receiver.recv().await.ok()
} else {
None
}
}
}

/// stores file details for writer
Expand Down Expand Up @@ -92,9 +85,13 @@ pub(crate) async fn writer(
);

let mut cache: HashMap<u64, Job> = HashMap::new();
let receiver = writer_queue
.get_receiver(&id)
.await
.ok_or(Error::missing_queue())?;

while position != details.file_size {
let job = writer_queue.recv(&id).await.unwrap();
let job = receiver.recv().await?;

match job.index.cmp(&position) {
// if the chunk is behind the current position, it was already written
Expand Down

0 comments on commit de30e8d

Please sign in to comment.