For more than a decade, software has been moving away from running faster by simply executing more instructions per second to executing multiple instructions at once. Terms like "Multithreading", "Concurrency", and "Parallelism" are expected to be in the repertoire of every serious software developer.
Due to the existence of race conditions, concurrent programs have to be synchronized in some way. This is clasically done with mutexes and other synchronization primitives. This, however, can become tedious and error-prone as a code base and the level of concurrency grow.
Inspired by Go, copper aims to bring an easier mechanism for synchronization to C++. By providing message queues as communication data structures, the need for manual synchronization via mutexes vanishes or at least becomes much smaller.
Compared to a naive "synchronized queue" implementation, Copper offers the following features, which are often partially or fully missing:
-
Support for entirely unbuffered channels.
-
Support for void channels.
-
Support for select statements.
-
No polling.
Copper is designed as an easy drop-in library that can be used in any kind of application that has access to std. Below are linked some frameworks which offer similar functionality to Copper.
Consider the following small project, which is representative of similar structures that can be found often in concurrent applications:
A broadcasting system between threads. There is a single "broker" thread which takes care of the management. Other threads can register themselves as taking part in the broadcast and can also unregister again later. A thread that is registered can send messages to the broker which are then broadcasted to all registered listeners.
Furthermore, if no message arrived for the last 5 seconds, the broker broadcasts a "ping" just to tell all listeners that the broker is still alive. And finally, some "admin" of the broadcasting system can always send a signal to spontaneously shut down the broker and unregister all listener threads.
Feel free to think about how this could best be implemented using common synchronisation objects like lock-free queues, atomics, and mutexes. You could start a thread pool as part of the broker and delegate each incoming request (register, unregister, or broadcast) to a new worker thread. Or you could have a single synchronized queue that holds something like std::variant<register_op, unregister_op, broadcast_op>
that the broker thread runs over one by one. Commonly used would also be some form of polling to check for the "shutdown" signal every once in a while.
Copper simplifies the architecture greatly. The "broker" would contain four different channels:
-
copper::buffered_channel<int> register_commands
, which threads push their ID into when registering to listen to the broadcast. -
copper::buffered_channel<int> unregister_command
, which threads push their ID into to unregister. -
copper::buffered_channel<std::pair<int, std::string>> broadcast_command
, which threads push their ID and their message into to broadcast it. -
copper::buffered_channel<void> shutdown_command
, which the admin can use to shutdown the broker. This works as a content-less signal mechanism due to thevoid
type, as the only relevant information is whether the shutdown event has been triggered, without any additional data.
The broker can now process all incoming requests from other threads synchronously without any polling or other unfortunate constructs.
auto last_broadcast = ...; // current time
while (!shutdown) {
const auto op_status = copper::try_select_until(
last_broadcast + 5s,
register_commands >> /* handle register */,
unregister_command >> /* handle unregister */,
broadcast_command >> /* handle broadcast */,
shutdown_command >> /* handle shutdown */,
);
if (op_status == copper::channel_op_status::unavailable) {
// broadcast a ping
}
}
Since no data is shared between concurrent threads, except for the channel
objects to communicate, there is no need to add any additional mutexes or other guards around variables like last_broadcast
or shutdown
.
try_select_until
guarantees fairness among its operations, meaning that every command is certain to be executed at some point. It could therefore be possible that a flood of broadcast commands causes a huge delay on a shutdown, so one might want to introduce a priority ranking on the different channels by calling multiple select statements.
auto last_broadcast = ...; // current time
while (!shutdown) {
// shutdown has the highest priority.
if (shutdown_command.try_pop()) {
shutdown = true;
break;
}
// If no shutdown was waiting, prioritize register and unregister over broadcast.
(void) copper::loop_try_select(
register_commands >> /* handle register */,
unregister_command >> /* handle unregister */,
shutdown_command >> /* handle shutdown */,
);
// Finally, do a select on all operations at once.
const auto op_status = copper::try_select_until(
last_broadcast + 5s,
register_commands >> /* handle register */,
unregister_command >> /* handle unregister */,
broadcast_command >> /* handle broadcast */,
shutdown_command >> /* handle shutdown */,
);
if (op_status == copper::channel_op_status::unavailable) {
// broadcast a ping
}
}
While buffered_channels
, possibly with a limited buffer size, are a good choice for most use cases, unbuffered_channels
can be used to enforce a closer link between the communicating threads. As a push
operation can only be executed if there is a pop
operation pending at the same time (and vice-versa), each of these operations marks a point of synchronization between the threads.
For example, imagine a thread that fetches some frequently updated data from the web and pushes it into a channel to be consumed by another thread. Using an unbuffered channel in this case would, first, prevent the fetching thread from loading data that might never be used, and second, ensure that data arriving at the consumer thread always is as recent as possible.
copper::unbuffered_channel<std::string> chan;
void fetch_thread() {
const auto f = [] { return /* fetch data from web */; };
while (chan.push_func(f) != copper::channel_op_status::closed);
}
void consumer_thread() {
for (;;) {
const auto next_data = chan.pop();
if (!next_data) {
break;
}
some_computation(next_data.value());
}
}
Another advantage to unbuffered channels can be the way data is passed from one thread to another. Whereas buffered channels almost always use their internal buffer as an intermediate storage before a message can be consumed, communication over an unbuffered channel can happen with as little as a single move constructor. This gives unbuffered channels an edge when dealing with large messages that cannot be moved easily. More concrete data can be found in docs/benchmark.adoc.