From de30e8d698e12f046aa3839ca80008d1c6e17423 Mon Sep 17 00:00:00 2001 From: chanderlud Date: Sat, 9 Dec 2023 23:34:58 -0800 Subject: [PATCH] working on the writer queue --- src/main.rs | 22 +++++++++++++++++++++- src/receiver/writer.rs | 27 ++++++++++++--------------- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/main.rs b/src/main.rs index 33464c4..0af6bed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ use clap::Parser; use futures::stream::iter; use futures::{StreamExt, TryStreamExt}; use indicatif::{ProgressBar, ProgressStyle}; -use kanal::{AsyncReceiver, SendError}; +use kanal::{AsyncReceiver, ReceiveError, SendError}; use log::{debug, error, info, warn, LevelFilter}; use prost::Message; use regex::Regex; @@ -65,6 +65,8 @@ enum ErrorKind { Decode(prost::DecodeError), Join(tokio::task::JoinError), Send(SendError), + Receive(ReceiveError), + MissingQueue, } impl From for Error { @@ -107,6 +109,14 @@ impl From for Error { } } +impl From for Error { + fn from(error: ReceiveError) -> Self { + Self { + kind: ErrorKind::Receive(error), + } + } +} + impl Termination for Error { fn report(self) -> ExitCode { ExitCode::from(match self.kind { @@ -118,10 +128,20 @@ impl Termination for Error { ErrorKind::Decode(_) => 4, ErrorKind::Join(_) => 5, ErrorKind::Send(_) => 6, + ErrorKind::Receive(_) => 7, + ErrorKind::MissingQueue => 7, }) } } +impl Error { + fn missing_queue() -> Self { + Self { + kind: ErrorKind::MissingQueue, + } + } +} + #[derive(Parser, Clone, Debug)] struct Options { #[clap( diff --git a/src/receiver/writer.rs b/src/receiver/writer.rs index 2711798..18e2bc2 100644 --- a/src/receiver/writer.rs +++ b/src/receiver/writer.rs @@ -11,7 +11,7 @@ use tokio::sync::RwLock; use crate::items::{message, End, Message}; use crate::receiver::{Job, WriterQueue}; -use crate::{Result, TRANSFER_BUFFER_SIZE, WRITE_BUFFER_SIZE}; +use crate::{Error, Result, TRANSFER_BUFFER_SIZE, WRITE_BUFFER_SIZE}; #[derive(Default)] pub(crate) struct SplitQueue { @@ -32,6 +32,12 @@ impl SplitQueue { self.senders.write().await.remove(id); } + pub(crate) async fn get_receiver(&self, id: &u32) -> Option> { + let receivers = self.receivers.read().await; + receivers.get(id).cloned() + } + + // TODO if the the receiver fills, the sender will block & block push_queue pub(crate) async fn send(&self, job: Job, id: u32) { let senders = self.senders.read().await; @@ -39,19 +45,6 @@ impl SplitQueue { sender.send(job).await.unwrap(); } } - - pub(crate) async fn recv(&self, id: &u32) -> Option { - let receiver = { - let inner = self.receivers.read().await; - inner.get(id).cloned() - }; - - if let Some(receiver) = receiver { - receiver.recv().await.ok() - } else { - None - } - } } /// stores file details for writer @@ -92,9 +85,13 @@ pub(crate) async fn writer( ); let mut cache: HashMap = HashMap::new(); + let receiver = writer_queue + .get_receiver(&id) + .await + .ok_or(Error::missing_queue())?; while position != details.file_size { - let job = writer_queue.recv(&id).await.unwrap(); + let job = receiver.recv().await?; match job.index.cmp(&position) { // if the chunk is behind the current position, it was already written