Skip to content

Commit

Permalink
fix batch mpmc queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Coldwings committed Jun 17, 2024
1 parent 255a806 commit 7d191a0
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 55 deletions.
88 changes: 40 additions & 48 deletions common/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,65 +286,59 @@ class LockfreeBatchMPMCRingQueue : public LockfreeRingQueueBase<T, N> {
using Base::empty;
using Base::full;

size_t push_batch(const T* x, size_t n) {
size_t push_batch(const T *x, size_t n) {
size_t rh, wt;
wt = tail.load(std::memory_order_relaxed);
for (;;) {
rh = head.load(std::memory_order_relaxed);
auto rn = std::min(n, Base::capacity - (wt - rh));
if (rn == 0) return 0;
if (tail.compare_exchange_strong(wt, wt + rn,
std::memory_order_acq_rel)) {
auto first_idx = idx(wt);
auto part_length = Base::capacity - first_idx;
if (likely(part_length >= rn)) {
memcpy(&slots[first_idx], x, sizeof(T) * rn);
} else {
if (likely(part_length))
memcpy(&slots[first_idx], x, sizeof(T) * (part_length));
memcpy(&slots[0], x + part_length,
sizeof(T) * (rn - part_length));
}
auto wh = wt;
while (!write_head.compare_exchange_weak(
wh, wt + rn, std::memory_order_acq_rel)) {
ThreadPause::pause();
wh = wt;
}
return rn;
rh = head.load(std::memory_order_acquire);
auto wn = std::min(n, Base::capacity - (wt - rh));
if (wn == 0)
return 0;
if (!tail.compare_exchange_strong(wt, wt + wn, std::memory_order_acq_rel))
continue;
auto first_idx = idx(wt);
auto part_length = Base::capacity - first_idx;
if (likely(part_length >= wn)) {
memcpy(&slots[first_idx], x, sizeof(T) * wn);
} else {
if (likely(part_length))
memcpy(&slots[first_idx], x, sizeof(T) * (part_length));
memcpy(&slots[0], x + part_length, sizeof(T) * (wn - part_length));
}
auto wh = wt;
while (!write_head.compare_exchange_strong(wh, wt + wn, std::memory_order_acq_rel))
wh = wt;
return wn;
}
}

bool push(const T& x) { return push_batch(&x, 1) == 1; }
bool push(const T &x) {
return push_batch(&x, 1) == 1;
}

size_t pop_batch(T* x, size_t n) {
size_t pop_batch(T *x, size_t n) {
size_t rt, wh;
rt = read_tail.load(std::memory_order_relaxed);
for (;;) {
wh = write_head.load(std::memory_order_relaxed);
wh = write_head.load(std::memory_order_acquire);
auto rn = std::min(n, wh - rt);
if (rn == 0) return 0;
if (read_tail.compare_exchange_strong(rt, rt + rn,
std::memory_order_acq_rel)) {
auto first_idx = idx(rt);
auto part_length = Base::capacity - first_idx;
if (likely(part_length >= rn)) {
memcpy(x, &slots[first_idx], sizeof(T) * rn);
} else {
if (likely(part_length))
memcpy(x, &slots[first_idx], sizeof(T) * (part_length));
memcpy(x + part_length, &slots[0],
sizeof(T) * (rn - part_length));
}
auto rh = rt;
while (!head.compare_exchange_weak(rh, rt + rn,
std::memory_order_acq_rel)) {
ThreadPause::pause();
rh = rt;
}
return rn;
if (rn == 0)
return 0;
if (!read_tail.compare_exchange_strong(rt, rt + rn, std::memory_order_acq_rel))
continue;
auto first_idx = idx(rt);
auto part_length = Base::capacity - first_idx;
if (likely(part_length >= rn)) {
memcpy(x, &slots[first_idx], sizeof(T) * rn);
} else {
if (likely(part_length))
memcpy(x, &slots[first_idx], sizeof(T) * (part_length));
memcpy(x + part_length, &slots[0], sizeof(T) * (rn - part_length));
}
auto rh = rt;
while (!head.compare_exchange_strong(rh, rt + rn, std::memory_order_acq_rel))
rh = rt;
return rn;
}
}

Expand Down Expand Up @@ -444,7 +438,6 @@ class LockfreeSPSCRingQueue : public LockfreeRingQueueBase<T, N> {
n, Base::capacity - (t - head.load(std::memory_order_acquire)));
if (n == 0) return 0;
auto first_idx = idx(t);
auto last_idx = idx(t + n - 1);
auto part_length = Base::capacity - first_idx;
if (likely(part_length >= n)) {
memcpy(&slots[first_idx], x, sizeof(T) * n);
Expand All @@ -462,7 +455,6 @@ class LockfreeSPSCRingQueue : public LockfreeRingQueueBase<T, N> {
n = std::min(n, tail.load(std::memory_order_acquire) - h);
if (n == 0) return 0;
auto first_idx = idx(h);
auto last_idx = idx(h + n - 1); (void)last_idx;
auto part_length = Base::capacity - first_idx;
if (likely(part_length >= n)) {
memcpy(x, &slots[first_idx], sizeof(T) * n);
Expand Down
22 changes: 15 additions & 7 deletions common/test/test_lockfree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,20 +182,28 @@ int test_queue_batch(const char *name, QType &queue) {
for (size_t i = 0; i < sender_num; i++) {
senders.emplace_back([i, &queue] {
photon::set_cpu_affinity(i);
std::chrono::nanoseconds wspent{std::chrono::nanoseconds(0)};
std::vector<int> vec;
vec.resize(items_num / sender_num);
for (size_t x = 0; x < items_num / sender_num; x++) {
vec[x] = x;
}
size_t size;
std::chrono::nanoseconds wspent{std::chrono::nanoseconds(0)};
for (size_t x = 0; x < items_num / sender_num;) {
auto tm = std::chrono::high_resolution_clock::now();
LSType::lock(wlock);
while (!queue.push(x)) {
while (!(size = queue.push_batch(&vec[x], std::min(32UL, vec.size() - x)))) {
LSType::unlock(wlock);
CPUPause::pause();
LSType::lock(wlock);
}
LSType::unlock(wlock);
wspent += std::chrono::high_resolution_clock::now() - tm;
sc[x]++;
scnt[i]++;
// ThreadPause::pause();
wspent += (std::chrono::high_resolution_clock::now() - tm) / size;
for (auto y = x; y < x + size; y++) {
sc[y] ++;
scnt[i] ++;
}
x += size;
}
LOG_DEBUG("` sender done, ` ns per action", i,
wspent.count() / (items_num / sender_num));
Expand All @@ -204,7 +212,7 @@ int test_queue_batch(const char *name, QType &queue) {
for (auto &x : senders) x.join();
for (auto &x : receivers) x.join();
auto end = std::chrono::steady_clock::now();
LOG_DEBUG("` ` p ` c, ` items, Spent ` us\n", name, sender_num, receiver_num, items_num,
LOG_DEBUG("` ` p ` c, ` items, Spent ` us", name, sender_num, receiver_num, items_num,
std::chrono::duration_cast<std::chrono::microseconds>(end - begin).count());
for (size_t i = 0; i < items_num / sender_num; i++) {
if (sc[i] != rc[i] || sc[i] != sender_num) {
Expand Down

0 comments on commit 7d191a0

Please sign in to comment.