Skip to content

Commit

Permalink
Improve heave load performance in RingChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
Coldwings committed Feb 2, 2024
1 parent fe8f2f7 commit 17c759a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 19 deletions.
30 changes: 17 additions & 13 deletions common/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,6 @@ class RingChannel : public QueueType {
protected:
photon::semaphore queue_sem;
std::atomic<uint64_t> idler{0};
uint64_t m_busy_yield_turn;
uint64_t m_busy_yield_timeout;

using T = decltype(std::declval<QueueType>().recv());

Expand All @@ -568,30 +566,36 @@ class RingChannel : public QueueType {
* parameter DEFAULT_BUSY_YIELD_TIMEOUT. Ring Channel will try busy yield
* in `busy_yield_timeout` usecs.
*/
RingChannel(uint64_t busy_yield_turn = 64,
uint64_t busy_yield_timeout = 1024)
: m_busy_yield_turn(busy_yield_turn),
m_busy_yield_timeout(busy_yield_timeout) {}
RingChannel(uint64_t busy_yield_turn = -1UL,
uint64_t busy_yield_timeout = 1024) {}

template <typename Pause = ThreadPause>
void send(const T& x) {
while (!push(x)) {
if (!full()) Pause::pause();
Pause::pause();
}
queue_sem.signal(idler.load(std::memory_order_acquire));
if (idler.load(std::memory_order_acquire))
queue_sem.signal(1);
}
T recv() {
T recv(uint64_t max_yield_turn = -1UL, uint64_t max_yield_usec = 1024) {
T x;
Timeout yield_timeout(m_busy_yield_timeout);
int yield_turn = m_busy_yield_turn;
if (pop(x)) return x;
// yield once if failed, so photon::now will be update
photon::thread_yield();
idler.fetch_add(1, std::memory_order_acq_rel);
DEFER(idler.fetch_sub(1, std::memory_order_acq_rel));
Timeout yield_timeout(max_yield_usec);
uint64_t yield_turn = max_yield_turn;
while (!pop(x)) {
if (yield_turn > 0 && photon::now < yield_timeout.expiration()) {
if (yield_turn > 0 && !yield_timeout.expired()) {
yield_turn--;
photon::thread_yield();
} else {
queue_sem.wait(1);
// wait for 100ms
queue_sem.wait(1, 100UL * 1000);
// reset yield mark and set into busy wait
yield_turn = max_yield_turn;
yield_timeout.timeout(max_yield_usec);
}
}
return x;
Expand Down
4 changes: 4 additions & 0 deletions thread/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ add_executable(perf_usleepdefer_semaphore perf_usleepdefer_semaphore.cpp)
target_link_libraries(perf_usleepdefer_semaphore PRIVATE photon_shared)
add_test(NAME perf_usleepdefer_semaphore COMMAND $<TARGET_FILE:perf_usleepdefer_semaphore>)

add_executable(perf_workpool perf_workpool.cpp)
target_link_libraries(perf_workpool PRIVATE photon_shared)
add_test(NAME perf_workpool COMMAND $<TARGET_FILE:perf_workpool>)

add_executable(test-thread test.cpp x.cpp)
target_link_libraries(test-thread PRIVATE photon_static)
add_test(NAME test-thread COMMAND $<TARGET_FILE:test-thread>)
Expand Down
24 changes: 18 additions & 6 deletions thread/workerpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class WorkPool::impl {
LockfreeMPMCRingQueue<Delegate<void>, RING_SIZE>>
ring;
int mode;
std::atomic<uint64_t> running_tasks{0};

impl(size_t vcpu_num, int ev_engine, int io_engine, int mode)
: queue_sem(0), ready_vcpu(0), mode(mode) {
Expand Down Expand Up @@ -102,6 +103,11 @@ class WorkPool::impl {
exit_cv.notify_all();
}

struct TaskLB {
Delegate<void> task;
WorkPool::impl *wp;
};

void main_loop() {
add_vcpu();
DEFER(remove_vcpu());
Expand All @@ -110,25 +116,31 @@ class WorkPool::impl {
DEFER(if (pool) delete_thread_pool(pool));
ready_vcpu.signal(1);
for (;;) {
auto task = ring.recv();
auto task = ring.recv(running_tasks.load() ? 0: 256);
if (!task) break;
running_tasks.fetch_add(std::memory_order_acq_rel);
TaskLB tasklb{task, this};
if (mode < 0) {
task();
delegate_helper(&tasklb);
} else if (mode == 0) {
auto th = photon::thread_create(
&WorkPool::impl::delegate_helper, &task);
&WorkPool::impl::delegate_helper, &tasklb);
photon::thread_yield_to(th);
} else {
auto th = pool->thread_create(&WorkPool::impl::delegate_helper,
&task);
&tasklb);
photon::thread_yield_to(th);
}
}
while (running_tasks.load(std::memory_order_acquire))
photon::thread_yield();
}

static void *delegate_helper(void *arg) {
auto task = *(Delegate<void> *)arg;
task();
// must copy to keep tasklb alive
TaskLB tasklb = *(TaskLB*)arg;
tasklb.task();
tasklb.wp->running_tasks.fetch_sub(std::memory_order_acq_rel);
return nullptr;
}

Expand Down

0 comments on commit 17c759a

Please sign in to comment.