Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce concurrent circular buffer #29

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

jeehoonkang
Copy link
Contributor

Rendered
Implementation

A benchmark result:
Graphs

@schets
Copy link
Member

schets commented Mar 14, 2018

This looks great!

Some comments:

  • If applications care about contention, they'll probably want to use try_recv. I'm not sure if that's worth the api bloat. Also, try_recv will probably be faster when contained inside a pinned scope, maybe that's worth benchmarking for consideration?
  • At the cost of queue memory, I believe it's possible to reduce the number of cache misses to read an object to 1 (or more if required to read the object) by not sharing read/write indices but instead marking each slot with a sequence number. I think crossbeam-channel does this optimization. One could protect against queue-resizing with seqlock-style protection on the queue pointer.

I think I have a branch of SeqQueue modified to be spmc, and it showed significant performance improvements if I recall correctly. I'll try and find it. This has much less allocator churn, but does have a latency spike and makes returning memory trickier if the queue size spikes. Maybe some compromise between the two could be found - something which only extends to a new linked elements if the large queue is full?

@schets
Copy link
Member

schets commented Mar 14, 2018

Also, I think that there should be a leaky mode the queue can be constructed with which removes crossbeam protection from the buffer at the cost of leaking it.

I suspect there are many applications which would make this tradeoff, that might only have a few queues of this style which don't grow too large over the course of things.

@ghost
Copy link

ghost commented Mar 14, 2018

Great work, happy to see the area of concurrent queues being explored further! I don't have any objections to adding this to Crossbeam, but there are several questions we should try to resolve first.

Differences from Deque

The text mentions that CircBuf is the result of simplifying Deque and making it faster. But do we have any idea how much faster CircBuf is? Any concrete numbers?

Naming

Not a huge fan of the CircBuf name. I'd wager most people associate the term "circular buffer" more commonly with bounded buffers, while one of the key properties of CircBuf is that it grows and shrinks. That said, I don't have any better suggestions. :)

Maybe something like fn unbounded<T>() -> (spmc::unbounded::Sender<T>, spmc::unbounded::Receiver<T>) would read better? I don't know...

Return type in try_recv

We've had this discussion before, but I still find this unidiomatic:

fn try_recv(&self) -> Result<Option<T>, RecvError>

This is hardly readable without taking a peek at the documentation. Consider:

match rx.try_recv() {
    Ok(Some(v)) => return Some(v),
    Ok(None) => thread::yield_now(),
    Err(RecvError) => return None,
}

Why not the following instead:

match rx.try_recv() {
    TryRecv::Data(v) => return Some(v),
    TryRecv::Retry => thread::yield_now(),
    TryRecv::Empty => return None,
}

Much clearer, isn't it?

Now we're missing all the useful combinators on None and Result, but implementing methods on TryRecv like .is_data(), .into_inner(), etc. shouldn't be too hard.

As a somewhat related example, Future::poll took a slightly different approach by returning Result<Async<Self::Item>, Self::Error>. Note that Async is essentially equivalent to Option, yet they chose to make a special type in order to make the result indicate its meaning more clearly.

Blocking recv operation

The blocking recv operation basically just calls try_recv in a loop and inserts thread::yield_now() in between consecutive iterations. While that method of waiting isn't too bad, we should probably do better.

Blocking operations are typically implemented like this:

  1. Call try_recv several times in a loop.
  2. Repeat the first step, but call atomic::spin_loop_hint() between iterations.
  3. Do another round of retries, but call thread::yield_now() instead.
  4. Store the current Thread somewhere so that a future send operation knows how to wake us up. Call try_recv. If it failed, park the current thread with thread::park().

But I'm not sure if it's even worth bothering with blocking methods. I really doubt Tokio would ever want to call .recv().

Crate organization

Let's discuss where this crates fits in the big picture. Can we try answering the following questions?

  1. What does CircBuf offer and who should use it?
  2. What it doesn't offer and who shouldn't use it?

Here's a try for crossbeam-deque: it provides a concurrent deque for implementing work-stealing schedulers. It's intended to be a fairly low-level data structure, which is why it provides the steal method that may spuriously fail. Not the most user-friendly interface, but someone implementing a fast scheduler will appreciate the fine-grained control. It is not designed for casual use. The deque cannot be closed like a channel and doesn't offer blocking push, recv, or steal methods. If you need blocking, you'll probably want to implement it yourself anyway.

For comparison, here's how I think of crossbeam-channel: it is intended to be the go-to channel for casual use. The interface is simple and easy. The underlying concurrent queue implementations are not very interesting. Rather, the selection mechanism and built-in blocking methods are where it shines - that's the biggest part of the implementation. The channels intentionally don't offer fine-grained control over performance, even though they could (at the cost of uglier interface). These channels are definitely not for users seeking the highest possible performance.

Back to queues... I like thinking of concurrent queues in terms of the following matrix:

bounded unbounded
SPSC simple circular buffer CircBuf
SPMC ???(1) CircBuf
MPSC ???(2) MPSC queue by DV
MPMC MPMC queue by DV MsQueue or SegQueue

We already have some implementations ready to fill this matrix (e.g. MsQueue and SegQueue). The bounded MPMC variant can be easily filled by ripping out the array-based queue from crossbeam-channel. We can have unbounded MPSC queues by stripping down std::sync::mpsc. And so on.

I like CircBuf because it's pretty versatile and fills two slots in the matrix, maybe even three. It might also be the right choice for the bounded SPMC (1) variant. And the bounded MPSC (2) variant should probably be just a slightly tweaked version of the bounded MPMC variant.

Maybe we should create crossbeam-queue with modules named spsc, spmc, mpsc, and mpmc, where each module contains a bounded and unbounded variant? What do you think? The purpose of this crate would be to provide mostly low-level queues to be used as building blocks in schedulers, real-time audio, actor frameworks, and so on. The module structure would make it easy to choose the right variant for your use case. Just food for thought...

@jeehoonkang
Copy link
Contributor Author

@schets @stjepang Thanks for your valuable inputs! Because of the bad SPMC performance results, I'd like to close this PR. I "guess" LCRQ or Yang-Mellor-Crummey queue are faster than circbuf for SPMC and even SPSC scenarios, I guess. I'll explore that direction.

  • I'll remove recv(). As @schets suggested, pinning and then looping over try_recv() was as good as recv(). @stjepang, I didn't know there's atomic::spin_loop_hint().. I was much waiting for such a function!

  • I think I understand the SegQueue's optimization @schets mentioned. Indeed, I mistakenly omitted to plot the result for SPMC channels, and it turned out that SegQueue performs ~30% better than circular buffer. (I'm attaching a benchmark result.) Probably that's because SegQueue's pop()doesn't access tail at fast path, while circular buffer'stry_recv()` accesses both head and tail.

  • On the "leaky" mode: I agree it's useful for some use cases. Maybe we can configure the other data structures as leaky as well..?

  • Comparison to crossbeam-deque: The plot I'm attaching is based on the following measurements:

unbounded_seq             Rust deque        0.208 sec
unbounded_spmc            Rust deque        1.573 sec
unbounded_spsc            Rust deque        0.326 sec
unbounded_seq             Rust circbuf      0.162 sec
unbounded_spmc            Rust circbuf      1.494 sec
unbounded_spsc            Rust circbuf      0.214 sec
  • On the naming: I agree circbuf is a bad name. Anyway I'd like to close this PR.. :) Also, I came to agree with @stjepang on the return type of try_recv().

  • I agree with @stjepang's plan on for crossbeam-queue. Instead of organizing crates with implementations (e.g. circbuf, ms-queue, ...), we can organize the submodules of a single crate with usages (e.g. spsc, mpsc, ...). I proposed to introduce crossbeam-circbuf following the precedence of crossbeam-skiplist, but for queues and channels, probably the one-crate-that-fits-all would be better.

plot

@jeehoonkang
Copy link
Contributor Author

status report: I've changed my mind, and I'm retrying to merge this RFC. As discussed above, hopefully it can fill several entries in the concurrent queue implementation matrix. I revised the concurrent-circbuf crate as @stjepang and @schets said (except for introducing bounded and unbounded submodules).

But the performance is still bad. I optimized the circular buffer as @schets said, but unfortunately, I couldn't get any speedup. It's still ~40% slower than the segmented queue for SPMC scenarios. I'll put some more energy to find out why it's slow.

@schets
Copy link
Member

schets commented Mar 20, 2018

@jeehoonkang it looks like tx and rx share a cache line, which they shouldn't have to. Also, the writer should cache the last-seen reader index so it doesn't have to hit the other cache line all the time

@jeehoonkang
Copy link
Contributor Author

@schets Thanks for your comments! After applying the optimizations you mentioned, I finally managed to get a good performance result :)

@stjepang I'm thinking of providing crossbeam_circbuf::{bounded,unbounded}::{spsc,spmc} channels in this crate. The crate shows decent performance for these 4 scenarios, though a more thorough performance analysis is needed.

A remaining question is whether (1) we publish crossbeam-circbuf as proposed in this RFC, or (2) we prepare for crossbeam-queue. Do you have any opinion? Note that I think the concurrent-circbuf crate is currently quite publishable except for documentation, which I'm working on. Also, for the crossbeam-queue crate, I think we need to: (1) define the interface of the 8 flavors of the queues; (2) rebase crossbeam-channel on crossbeam-queue. Of course we also need to implement bounded/unbounded MPSC/MPMC queues, as suggested in the above comment.

plot

@ghost
Copy link

ghost commented Apr 3, 2018

@jeehoonkang

A remaining question is whether (1) we publish crossbeam-circbuf as proposed in this RFC, or (2) we prepare for crossbeam-queue. Do you have any opinion? Note that I think the concurrent-circbuf crate is currently quite publishable except for documentation, which I'm working on.

I'd prefer going with crossbeam-queue because we'll probably have a bunch of queue variants in the future anyway, so why not organize them all in a single crate? You've already written several concurrent queues. :)

Also, for the crossbeam-queue crate, I think we need to: (1) define the interface of the 8 flavors of the queues;

While I like categorizing queues in terms of spsc/spmc/mpsc/mpmc and bounded/unbounded, there are also other possible strategies. For example, Java has ArrayBlockingQueue, ConcurrentLinkedQueue, LinkedBlockingQueue, and so on. I'm not completely sold on any particular interface design strategy. :) In any case, we can think about that later...

(2) rebase crossbeam-channel on crossbeam-queue. Of course we also need to implement bounded/unbounded MPSC/MPMC queues, as suggested in the above comment.

Do you mean using crossbeam-queue as a dependency of crossbeam-channel? If so, that'd be great in theory, but I'm afraid channels need to employ a few special low-level tricks (like marking a bit in the tail index to close the channel) so we won't be able to use "vanilla" queues from crossbeam-queue.

@jeehoonkang
Copy link
Contributor Author

@stjepang yeah, probably we're discussing what should be the API of crossbeam-queue. I also don't have a concrete design, but here are my two cents.

We already had a discussion on how to organize crates in the context of crossbeam-skiplist. I agree with @Vtec234 and @stjepang on creating a crate for each data structure implementation. We can have crossbeam-circbuf, crossbeam-segment-queue, etc.

Then how about crossbeam-queue? After all, do we need it? I don't think so, because crossbeam-channel already plays the role of crossbeam-queue: actually, queue and channel are almost synonym to each other. We can refactor crossbeam-channel to act as crossbeam-queue by providing a trait (say Queue) that all queue implementations should implement (including a way to signal the end-of-queue), and making each implementation crate implement Queue. Then crossbeam-channel can implement Channel<Q: Queue> (or ChannelInner<Q: Queue>).

I can think of two remaining questions. The first question is, what'll the queue/channel trait looks like? Should we have 8 traits for bounded/unbounded, multi/single-producer, multi/single-consumer queues? (Or more for blocking/nonblocking variants?) Can we refactor them in a nice way? I wonder if we can get inspiration from C++'s policy-based design.

The second question is, how to effectively manage queue implementations in terms of development cost and user friendliness, if there are too many? Though this problem can be mitigated by merging all crates into a monorepo and writing a clear guideline on choosing queue implementations.

@ghost
Copy link

ghost commented Apr 25, 2018

Ok, looks like we have a few unresolved questions and potential solutions, but are feeling hesitant to commit to any of them.

How about we put this queue into crossbeam-utils for the time being and wait until we figure out what to do about it? I like thinking of crossbeam-utils as a catch-all crate for miscellaneous stuff and for stuff we're still feeling unsure about or want to experiment with (i.e. a "nursery" like https://github.com/rust-lang-nursery).

This is why I've decided to add AtomicCell to crossbeam-utils in crossbeam-rs/crossbeam-utils#13 (at least for now) instead of creating a new crate crossbeam-atomic, as suggested by @Vtec234 in #28 (review).

@ghost
Copy link

ghost commented Jun 17, 2018

@jeehoonkang I have a new, more pragmatic suggestion on how to move forward here. :)

Besides schedulers, it is not yet clear what is the ideal use case for CircBuf. Another unresolved question is whether we need all the possible variants of the data structure (bounded/unbounded and spsc/spmc), or are they here just for the sake of completeness. But schedulers are the perfect use case for it, so I think we should put an effort into making it useful in that context first.

Tokio is currently building a fast and fair scheduler. Currently, it uses crossbeam-deque as the data structure for work stealing. Even though it's primarily designed as a LIFO data structure, it also allows consuming items in FIFO order using the Deque::steal method. It's interesting to note that FIFO ordering is sometimes beneficial in Rayon, too (the so-called breadth-first mode). In Tokio, however, FIFO ordering is a requirement because its scheduler must be fair.

Circular buffers would generally be faster in fair schedulers, but I don't think we need a brand new name for the data structure nor a dedicated crate for it. Instead, we can simply add an option to crossbeam-deque for pushing items in FIFO order.

The idea is to have two internal flavors of crossbeam-deque, in the same manner as we have multiple flavors for std::sync::mpsc and crossbeam-channel.

More concretely, this is what I have in mind:

enum Flavor<T> {
    // Chase-Lev deque (the current `crossbeam-deque`).
    Lifo(LifoQueue<T>),

    // Circular buffer (`CircBuf`).
    Fifo(FifoQueue<T>),
}

pub struct Deque<T>(Arc<Flavor<T>>);
pub struct Stealer<T>(Arc<Flavor<T>>);

impl<T> Deque<T> {
    // This is just a suggestion; we may name these functions differently.
    pub fn new_lifo() -> Deque<T>;
    pub fn new_fifo() -> Deque<T>;
}

Choosing between LIFO and FIFO here only affects the behavior of the Deque::pop method - are items pushed and popped from the same or the opposite ends of the deque?

A big advantage of having the option of both LIFO and FIFO ordering within the same interface is simplicity. For example, it means Rayon can set up its job queue like this:

let d = if breadth_first {
    Deque::new_fifo()
} else {
    Deque::new_lifo()
};

let s = d.stealer();

This approach has worked out very well in crossbeam-channel, where we have two channel constructors (bounded(cap) and unbounded()) with completely different underlying channel implementations, and yet behind the same interface.

Java's ForkJoinPool uses a similar method, where a flag in the constructor chooses between LIFO and FIFO mode (see here and here).

If you like this idea, I can merge concurrent-circbuf into crossbeam-deque and submit a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants