Skip to content

Commit

Permalink
Manual pr 0.6 0.7 (alibaba#509)
Browse files Browse the repository at this point in the history
* FIX: lockfree MPMC queue should not fail to pop/push when queue is not empty/full (alibaba#504)

* FIX: lockfree MPMC queue should not fail to pop/push when queue is not empty/full

Signed-off-by: Coldwings <coldwings@me.com>

* Old CI image not able to access repo, change to preset image

Signed-off-by: Coldwings <coldwings@me.com>
  • Loading branch information
lihuiba committed Jun 14, 2024
1 parent fa7a438 commit 56e2da5
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 26 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.linux.x86-64.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
runs-on: ubuntu-latest
container:
image: ghcr.io/coldwings/photon-ut-base:latest
options: --cpus 4
options: --cpus 4 --privileged
steps:
- uses: szenius/set-timezone@v1.2
with:
Expand Down Expand Up @@ -86,7 +86,7 @@ jobs:
runs-on: ubuntu-latest
container:
image: ghcr.io/coldwings/photon-ut-base:latest
options: --cpus 4
options: --cpus 4 --privileged
steps:
- uses: szenius/set-timezone@v1.2
with:
Expand Down Expand Up @@ -127,7 +127,7 @@ jobs:
runs-on: ubuntu-latest
container:
image: ghcr.io/coldwings/photon-ut-base:latest
options: --cpus 4
options: --cpus 4 --privileged
steps:
- uses: szenius/set-timezone@v1.2
with:
Expand Down Expand Up @@ -168,7 +168,7 @@ jobs:
runs-on: ubuntu-latest
container:
image: ghcr.io/coldwings/photon-ut-base:latest
options: --cpus 4
options: --cpus 4 --privileged
steps:
- uses: szenius/set-timezone@v1.2
with:
Expand Down Expand Up @@ -209,7 +209,7 @@ jobs:
runs-on: ubuntu-latest
container:
image: ghcr.io/coldwings/photon-ut-base:latest
options: --cpus 4
options: --cpus 4 --privileged
steps:
- uses: szenius/set-timezone@v1.2
with:
Expand Down
28 changes: 8 additions & 20 deletions common/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class LockfreeMPMCRingQueue : public LockfreeRingQueueBase<T, N> {
using Base::empty;
using Base::full;

bool push_weak(const T& x) {
bool push(const T& x) {
auto t = tail.load(std::memory_order_acquire);
for (;;) {
auto& slot = slots[idx(t)];
Expand All @@ -194,15 +194,16 @@ class LockfreeMPMCRingQueue : public LockfreeRingQueueBase<T, N> {
}
} else {
auto const prevTail = t;
auto h = head.load(std::memory_order_acquire);
t = tail.load(std::memory_order_acquire);
if (t == prevTail) {
if (t == prevTail && Base::check_full(h, t)) {
return false;
}
}
}
}

bool pop_weak(T& x) {
bool pop(T& x) {
auto h = head.load(std::memory_order_acquire);
for (;;) {
auto& slot = slots[idx(h)];
Expand All @@ -215,28 +216,15 @@ class LockfreeMPMCRingQueue : public LockfreeRingQueueBase<T, N> {
}
} else {
auto const prevHead = h;
auto t = tail.load(std::memory_order_acquire);
h = head.load(std::memory_order_acquire);
if (h == prevHead) {
if (h == prevHead && Base::check_empty(h, t)) {
return false;
}
}
}
}

bool push(const T& x) {
do {
if (push_weak(x)) return true;
} while (!full());
return false;
}

bool pop(T& x) {
do {
if (pop_weak(x)) return true;
} while (!empty());
return false;
}

template <typename Pause = ThreadPause>
void send(const T& x) {
static_assert(std::is_base_of<PauseBase, Pause>::value,
Expand Down Expand Up @@ -538,8 +526,8 @@ namespace common {
* and load balancing.
* Watch out that `recv` should run in photon environment (because it has to)
* use photon semaphore to be notified that new item has sended. `send` could
* running in photon or std::thread environment (needs to set template `Pause` as
* `ThreadPause`).
* running in photon or std::thread environment (needs to set template `Pause`
* as `ThreadPause`).
*
* @tparam QueueType shoulde be one of LockfreeMPMCRingQueue,
* LockfreeBatchMPMCRingQueue, or LockfreeSPSCRingQueue, with their own template
Expand Down
2 changes: 1 addition & 1 deletion examples/sync-primitive/sync-primitive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ int main() {
while (true) {
func_1us();
Message* m;
bool ok = ring.pop_weak(m);
bool ok = ring.pop(m);
if (!ok)
continue;
m->start = std::chrono::steady_clock::now();
Expand Down

0 comments on commit 56e2da5

Please sign in to comment.