Skip to content

Commit

Permalink
couple of small changes
Browse files Browse the repository at this point in the history
  • Loading branch information
chanderlud committed Nov 30, 2023
1 parent 27f49c4 commit 14a2fdd
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 19 deletions.
4 changes: 3 additions & 1 deletion src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub(crate) async fn main(mut options: Options, stats: TransferStats) -> Result<(

info!("opened sockets");

let writer_queue: WriterQueue = Arc::new(deadqueue::limited::Queue::new(100));
let writer_queue: WriterQueue = Arc::new(deadqueue::limited::Queue::new(1_000));
let confirmation_queue: Queue<u64> = Default::default();

let writer_handle = tokio::spawn(writer(
Expand Down Expand Up @@ -172,6 +172,8 @@ async fn send_confirmations(
}

let indexes = mem::take(&mut *data);
drop(data);

send_indexes(&mut control_stream, &indexes).await?;
}
}
Expand Down
32 changes: 14 additions & 18 deletions src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ async fn receive_confirmations(
tokio::spawn({
let cache = cache.clone();
let lost_confirmations = lost_confirmations.clone();
let confirmed_data = confirmed_data.clone();
let read = read.clone();

let mut interval = interval(Duration::from_millis(100));

async move {
Expand All @@ -189,6 +192,9 @@ async fn receive_confirmations(
if lost_confirmations.contains(&index) {
// the job is not requeued because it was confirmed while outside the cache
lost_confirmations.remove(&index);

read.add_permits(1);
confirmed_data.fetch_add(TRANSFER_BUFFER_SIZE, Relaxed);
} else {
unconfirmed.cached_at = None;
queue.push(unconfirmed);
Expand All @@ -200,13 +206,7 @@ async fn receive_confirmations(
});

loop {
control_stream.flush().await?;

let confirmed_indexes = receive_indexes(&mut control_stream).await?;
let length = confirmed_indexes.len();

confirmed_data.fetch_add(length * TRANSFER_BUFFER_SIZE, Relaxed);
read.add_permits(length); // add a permit to the reader for each confirmed index

let mut lost_confirmations = lost_confirmations.lock().await;
let mut cache = cache.write().await;
Expand All @@ -216,6 +216,9 @@ async fn receive_confirmations(
if cache.remove(&index).is_none() {
// if the index is not in the cache, it was already requeued
lost_confirmations.insert(index);
} else {
read.add_permits(1); // add a permit to the reader
confirmed_data.fetch_add(TRANSFER_BUFFER_SIZE, Relaxed);
}
}
}
Expand All @@ -238,19 +241,12 @@ async fn add_permits_at_rate(semaphore: Arc<Semaphore>, rate: u64) {

async fn receive_indexes(control_stream: &mut TcpStream) -> io::Result<Vec<u64>> {
let length = control_stream.read_u64().await? as usize; // read the length of the array
let mut indexes = Vec::with_capacity(length); // create a vector with the capacity of the array

if length == 0 {
return Ok(Vec::new());
for _ in 0..length {
let index = control_stream.read_u64().await?; // read the u64 value
indexes.push(index);
}

let mut buffer = vec![0; length * 8]; // 8 bytes per u64 value

// read u64 values
control_stream.read_exact(&mut buffer).await?;

// convert the buffer into an array of u64 values
Ok(buffer
.chunks(8)
.map(|chunk| u64::from_be_bytes(chunk.try_into().unwrap()))
.collect())
Ok(indexes)
}

0 comments on commit 14a2fdd

Please sign in to comment.