Skip to content

Commit

Permalink
Support customizing UMPSCQueue segment size in SerialExecutor
Browse files Browse the repository at this point in the history
Summary:
Some SerialExecutor use cases only enqueue a small number of items
into the queue at any point in time, or for the lifetime of the SerialExecutor.
For use cases which create SerialExecutors per request, for example, the
default queue segment size forces a 2^8=256 element segment by default. With
128 byte elements (for hw destructive interference reasons) this works out to
32kb of memory per segment. This is on the excessive end when maybe O(10) tasks
will be enqueued.

Reviewed By: Gownta

Differential Revision: D48412352

fbshipit-source-id: 4f9a317137d01d3360ef0d7101af02b03048f12d
  • Loading branch information
Eric Hao authored and facebook-github-bot committed Sep 18, 2023
1 parent 63d3968 commit 1216522
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 27 deletions.
3 changes: 3 additions & 0 deletions folly/SingletonThreadLocal.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,12 @@ class SingletonThreadLocal {
}
};

FOLLY_PUSH_WARNING
FOLLY_CLANG_DISABLE_WARNING("-Wglobal-constructors")
template <typename T, typename Tag, typename Make, typename TLTag>
detail::UniqueInstance SingletonThreadLocal<T, Tag, Make, TLTag>::unique{
tag<SingletonThreadLocal>, tag<T, Tag>, tag<Make, TLTag>};
FOLLY_POP_WARNING

} // namespace folly

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,48 @@
* limitations under the License.
*/

#include <folly/executors/SerialExecutor.h>

#include <glog/logging.h>

#include <folly/ExceptionString.h>

namespace folly {

SerialExecutor::SerialExecutor(KeepAlive<Executor> parent)
template <int LgQueueSegmentSize>
SerialExecutorExt<LgQueueSegmentSize>::SerialExecutorExt(
KeepAlive<Executor> parent)
: parent_(std::move(parent)) {}

SerialExecutor::~SerialExecutor() {
template <int LgQueueSegmentSize>
SerialExecutorExt<LgQueueSegmentSize>::~SerialExecutorExt() {
DCHECK(!keepAliveCounter_);
}

Executor::KeepAlive<SerialExecutor> SerialExecutor::create(
KeepAlive<Executor> parent) {
return makeKeepAlive<SerialExecutor>(new SerialExecutor(std::move(parent)));
template <int LgQueueSegmentSize>
Executor::KeepAlive<SerialExecutorExt<LgQueueSegmentSize>>
SerialExecutorExt<LgQueueSegmentSize>::create(KeepAlive<Executor> parent) {
return makeKeepAlive<SerialExecutorExt<LgQueueSegmentSize>>(
new SerialExecutorExt<LgQueueSegmentSize>(std::move(parent)));
}

SerialExecutor::UniquePtr SerialExecutor::createUnique(
template <int LgQueueSegmentSize>
typename SerialExecutorExt<LgQueueSegmentSize>::UniquePtr
SerialExecutorExt<LgQueueSegmentSize>::createUnique(
std::shared_ptr<Executor> parent) {
auto executor = new SerialExecutor(getKeepAliveToken(parent.get()));
auto executor = new SerialExecutorExt<LgQueueSegmentSize>(
getKeepAliveToken(parent.get()));
return {executor, Deleter{std::move(parent)}};
}

bool SerialExecutor::keepAliveAcquire() noexcept {
template <int LgQueueSegmentSize>
bool SerialExecutorExt<LgQueueSegmentSize>::keepAliveAcquire() noexcept {
auto keepAliveCounter =
keepAliveCounter_.fetch_add(1, std::memory_order_relaxed);
DCHECK(keepAliveCounter > 0);
return true;
}

void SerialExecutor::keepAliveRelease() noexcept {
template <int LgQueueSegmentSize>
void SerialExecutorExt<LgQueueSegmentSize>::keepAliveRelease() noexcept {
auto keepAliveCounter =
keepAliveCounter_.fetch_sub(1, std::memory_order_acq_rel);
DCHECK(keepAliveCounter > 0);
Expand All @@ -56,29 +64,34 @@ void SerialExecutor::keepAliveRelease() noexcept {
}
}

void SerialExecutor::add(Func func) {
template <int LgQueueSegmentSize>
void SerialExecutorExt<LgQueueSegmentSize>::add(Func func) {
if (scheduleTask(std::move(func))) {
parent_->add(
[keepAlive = getKeepAliveToken(this)] { keepAlive->worker(); });
}
}

void SerialExecutor::addWithPriority(Func func, int8_t priority) {
template <int LgQueueSegmentSize>
void SerialExecutorExt<LgQueueSegmentSize>::addWithPriority(
Func func, int8_t priority) {
if (scheduleTask(std::move(func))) {
parent_->addWithPriority(
[keepAlive = getKeepAliveToken(this)] { keepAlive->worker(); },
priority);
}
}

bool SerialExecutor::scheduleTask(Func&& func) {
template <int LgQueueSegmentSize>
bool SerialExecutorExt<LgQueueSegmentSize>::scheduleTask(Func&& func) {
queue_.enqueue(Task{std::move(func), RequestContext::saveContext()});
// If this thread is the first to mark the queue as non-empty, schedule the
// worker.
return scheduled_.fetch_add(1, std::memory_order_acq_rel) == 0;
}

void SerialExecutor::worker() {
template <int LgQueueSegmentSize>
void SerialExecutorExt<LgQueueSegmentSize>::worker() {
std::size_t queueSize = scheduled_.load(std::memory_order_acquire);
DCHECK_NE(queueSize, 0);

Expand Down
31 changes: 19 additions & 12 deletions folly/executors/SerialExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,34 @@ namespace folly {
* parent executor is executing tasks.
*/

class SerialExecutor : public SerializedExecutor {
template <int LgQueueSegmentSize = 8>
class SerialExecutorExt : public SerializedExecutor {
public:
SerialExecutor(SerialExecutor const&) = delete;
SerialExecutor& operator=(SerialExecutor const&) = delete;
SerialExecutor(SerialExecutor&&) = delete;
SerialExecutor& operator=(SerialExecutor&&) = delete;
SerialExecutorExt(SerialExecutorExt const&) = delete;
SerialExecutorExt& operator=(SerialExecutorExt const&) = delete;
SerialExecutorExt(SerialExecutorExt&&) = delete;
SerialExecutorExt& operator=(SerialExecutorExt&&) = delete;

static KeepAlive<SerialExecutor> create(
static KeepAlive<SerialExecutorExt> create(
KeepAlive<Executor> parent = getGlobalCPUExecutor());

class Deleter {
public:
Deleter() {}

void operator()(SerialExecutor* executor) { executor->keepAliveRelease(); }
void operator()(SerialExecutorExt* executor) {
executor->keepAliveRelease();
}

private:
friend class SerialExecutor;
friend class SerialExecutorExt;
explicit Deleter(std::shared_ptr<Executor> parent)
: parent_(std::move(parent)) {}

std::shared_ptr<Executor> parent_;
};

using UniquePtr = std::unique_ptr<SerialExecutor, Deleter>;
using UniquePtr = std::unique_ptr<SerialExecutorExt, Deleter>;
[[deprecated("Replaced by create")]] static UniquePtr createUnique(
std::shared_ptr<Executor> parent);

Expand Down Expand Up @@ -104,8 +107,8 @@ class SerialExecutor : public SerializedExecutor {
std::shared_ptr<RequestContext> ctx;
};

explicit SerialExecutor(KeepAlive<Executor> parent);
~SerialExecutor() override;
explicit SerialExecutorExt(KeepAlive<Executor> parent);
~SerialExecutorExt() override;

bool keepAliveAcquire() noexcept override;

Expand All @@ -118,9 +121,13 @@ class SerialExecutor : public SerializedExecutor {
std::atomic<std::size_t> scheduled_{0};
// The consumer should only dequeue when the queue is non-empty, so we don't
// need blocking.
folly::UMPSCQueue<Task, /* MayBlock */ false> queue_;
folly::UMPSCQueue<Task, /* MayBlock */ false, LgQueueSegmentSize> queue_;

std::atomic<ssize_t> keepAliveCounter_{1};
};

using SerialExecutor = SerialExecutorExt<>;

} // namespace folly

#include <folly/executors/SerialExecutor-inl.h>

0 comments on commit 1216522

Please sign in to comment.