Skip to content

Commit

Permalink
Don't cache head/tail index in Consumer/Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
mgeier committed Dec 6, 2021
1 parent 4f15f30 commit b0fbd44
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 40 deletions.
22 changes: 10 additions & 12 deletions src/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,13 @@ impl<T> Producer<T> {
/// For a safe alternative that provides mutable slices of [`Default`]-initialized slots,
/// see [`Producer::write_chunk()`].
pub fn write_chunk_uninit(&mut self, n: usize) -> Result<WriteChunkUninit<'_, T>, ChunkError> {
let tail = self.tail.get();
let tail = self.buffer.tail.load(Ordering::Acquire);

// Check if the queue has *possibly* not enough slots.
if self.buffer.capacity - self.buffer.distance(self.head.get(), tail) < n {
if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n {
// Refresh the head ...
let head = self.buffer.head.load(Ordering::Acquire);
self.head.set(head);
self.cached_head.set(head);

// ... and check if there *really* are not enough slots.
let slots = self.buffer.capacity - self.buffer.distance(head, tail);
Expand Down Expand Up @@ -286,13 +286,13 @@ impl<T> Consumer<T> {
///
/// See the documentation of the [`chunks`](crate::chunks#examples) module.
pub fn read_chunk(&mut self, n: usize) -> Result<ReadChunk<'_, T>, ChunkError> {
let head = self.head.get();
let head = self.buffer.head.load(Ordering::Acquire);

// Check if the queue has *possibly* not enough slots.
if self.buffer.distance(head, self.tail.get()) < n {
if self.buffer.distance(head, self.cached_tail.get()) < n {
// Refresh the tail ...
let tail = self.buffer.tail.load(Ordering::Acquire);
self.tail.set(tail);
self.cached_tail.set(tail);

// ... and check if there *really* are not enough slots.
let slots = self.buffer.distance(head, tail);
Expand Down Expand Up @@ -465,9 +465,9 @@ impl<T> WriteChunkUninit<'_, T> {
}

unsafe fn commit_unchecked(self, n: usize) -> usize {
let tail = self.producer.buffer.increment(self.producer.tail.get(), n);
let tail = self.producer.buffer.tail.load(Ordering::Acquire);
let tail = self.producer.buffer.increment(tail, n);
self.producer.buffer.tail.store(tail, Ordering::Release);
self.producer.tail.set(tail);
n
}

Expand Down Expand Up @@ -654,7 +654,7 @@ impl<T> ReadChunk<'_, T> {
}

unsafe fn commit_unchecked(self, n: usize) -> usize {
let head = self.consumer.head.get();
let head = self.consumer.buffer.head.load(Ordering::Acquire);
// Safety: head has not yet been incremented
let first_ptr = self.consumer.buffer.slot_ptr(head);
let first_len = self.first_len.min(n);
Expand All @@ -668,7 +668,6 @@ impl<T> ReadChunk<'_, T> {
}
let head = self.consumer.buffer.increment(head, n);
self.consumer.buffer.head.store(head, Ordering::Release);
self.consumer.head.set(head);
n
}

Expand Down Expand Up @@ -722,9 +721,8 @@ impl<'a, T> Drop for ReadChunkIntoIter<'a, T> {
let consumer = &self.chunk.consumer;
let head = consumer
.buffer
.increment(consumer.head.get(), self.iterated);
.increment(consumer.buffer.head.load(Ordering::Acquire), self.iterated);
consumer.buffer.head.store(head, Ordering::Release);
consumer.head.set(head);
}
}

Expand Down
44 changes: 16 additions & 28 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,11 @@ impl<T> RingBuffer<T> {
});
let p = Producer {
buffer: buffer.clone(),
head: Cell::new(0),
tail: Cell::new(0),
cached_head: Cell::new(0),
};
let c = Consumer {
buffer,
head: Cell::new(0),
tail: Cell::new(0),
cached_tail: Cell::new(0),
};
(p, c)
}
Expand Down Expand Up @@ -281,12 +279,7 @@ pub struct Producer<T> {
/// A copy of `buffer.head` for quick access.
///
/// This value can be stale and sometimes needs to be resynchronized with `buffer.head`.
head: Cell<usize>,

/// A copy of `buffer.tail` for quick access.
///
/// This value is always in sync with `buffer.tail`.
tail: Cell<usize>,
cached_head: Cell<usize>,
}

unsafe impl<T: Send> Send for Producer<T> {}
Expand Down Expand Up @@ -318,7 +311,6 @@ impl<T> Producer<T> {
}
let tail = self.buffer.increment1(tail);
self.buffer.tail.store(tail, Ordering::Release);
self.tail.set(tail);
Ok(())
} else {
Err(PushError::Full(value))
Expand All @@ -345,8 +337,9 @@ impl<T> Producer<T> {
/// ```
pub fn slots(&self) -> usize {
let head = self.buffer.head.load(Ordering::Acquire);
self.head.set(head);
self.buffer.capacity - self.buffer.distance(head, self.tail.get())
let tail = self.buffer.tail.load(Ordering::Acquire);
self.cached_head.set(head);
self.buffer.capacity - self.buffer.distance(head, tail)
}

/// Returns `true` if there are currently no slots available for writing.
Expand Down Expand Up @@ -441,13 +434,13 @@ impl<T> Producer<T> {
/// This is a strict subset of the functionality implemented in `write_chunk_uninit()`.
/// For performance, this special case is immplemented separately.
fn next_tail(&self) -> Option<usize> {
let tail = self.tail.get();
let tail = self.buffer.tail.load(Ordering::Acquire);

// Check if the queue is *possibly* full.
if self.buffer.distance(self.head.get(), tail) == self.buffer.capacity {
if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity {
// Refresh the head ...
let head = self.buffer.head.load(Ordering::Acquire);
self.head.set(head);
self.cached_head.set(head);

// ... and check if it's *really* full.
if self.buffer.distance(head, tail) == self.buffer.capacity {
Expand Down Expand Up @@ -483,15 +476,10 @@ pub struct Consumer<T> {
/// A reference to the ring buffer.
buffer: Arc<RingBuffer<T>>,

/// A copy of `buffer.head` for quick access.
///
/// This value is always in sync with `buffer.head`.
head: Cell<usize>,

/// A copy of `buffer.tail` for quick access.
///
/// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`.
tail: Cell<usize>,
cached_tail: Cell<usize>,
}

unsafe impl<T: Send> Send for Consumer<T> {}
Expand Down Expand Up @@ -531,7 +519,6 @@ impl<T> Consumer<T> {
let value = unsafe { self.buffer.slot_ptr(head).read() };
let head = self.buffer.increment1(head);
self.buffer.head.store(head, Ordering::Release);
self.head.set(head);
Ok(value)
} else {
Err(PopError::Empty)
Expand Down Expand Up @@ -583,9 +570,10 @@ impl<T> Consumer<T> {
/// assert_eq!(c.slots(), 0);
/// ```
pub fn slots(&self) -> usize {
let head = self.buffer.head.load(Ordering::Acquire);
let tail = self.buffer.tail.load(Ordering::Acquire);
self.tail.set(tail);
self.buffer.distance(self.head.get(), tail)
self.cached_tail.set(tail);
self.buffer.distance(head, tail)
}

/// Returns `true` if there are currently no slots available for reading.
Expand Down Expand Up @@ -679,13 +667,13 @@ impl<T> Consumer<T> {
/// This is a strict subset of the functionality implemented in `read_chunk()`.
/// For performance, this special case is immplemented separately.
fn next_head(&self) -> Option<usize> {
let head = self.head.get();
let head = self.buffer.head.load(Ordering::Acquire);

// Check if the queue is *possibly* empty.
if head == self.tail.get() {
if head == self.cached_tail.get() {
// Refresh the tail ...
let tail = self.buffer.tail.load(Ordering::Acquire);
self.tail.set(tail);
self.cached_tail.set(tail);

// ... and check if it's *really* empty.
if head == tail {
Expand Down

0 comments on commit b0fbd44

Please sign in to comment.