Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
Separate WaitGroup (#572)
Browse files Browse the repository at this point in the history
* Separate WaitGroup

* Update core/utils/wait_group.hpp

Co-authored-by: Andrei Lobov <andrei.lobov@arangodb.com>

* Exact count

* fix lints

---------

Co-authored-by: Andrei Lobov <andrei.lobov@arangodb.com>
  • Loading branch information
MBkkt and Dronplane committed Nov 17, 2023
1 parent c743aed commit e4f0e81
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 30 deletions.
32 changes: 2 additions & 30 deletions core/index/index_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "utils/object_pool.hpp"
#include "utils/string.hpp"
#include "utils/thread_utils.hpp"
#include "utils/wait_group.hpp"

#include <absl/container/flat_hash_map.h>

Expand Down Expand Up @@ -835,36 +836,7 @@ class IndexWriter : private util::noncopyable {
std::deque<PendingSegmentContext> pending_segments_;
// entries from 'pending_segments_' that are available for reuse
Freelist pending_freelist_;
// TODO(MBkkt) Considered to replace with YACLib in ArangoDB 3.11+ or ...
struct WaitGroup {
std::mutex& Mutex() noexcept { return m_; }

void Add() noexcept { counter_.fetch_add(1, std::memory_order_relaxed); }

void Done() noexcept {
if (counter_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
std::lock_guard lock{m_};
cv_.notify_one();
}
}

void Wait(std::unique_lock<std::mutex>& lock) noexcept {
IRS_ASSERT(lock.mutex() == &m_);
IRS_ASSERT(lock.owns_lock());
if (counter_.fetch_sub(1, std::memory_order_acq_rel) != 1) {
do {
cv_.wait(lock);
// relaxed probably enough
} while (counter_.load(std::memory_order_acquire) != 0);
}
counter_.store(1, std::memory_order_relaxed);
}

private:
std::atomic_size_t counter_{1};
std::mutex m_;
std::condition_variable cv_;
} pending_;
WaitGroup pending_;

// set of segments to be removed from the index upon commit
ConsolidatingSegments segment_mask_;
Expand Down
76 changes: 76 additions & 0 deletions core/utils/wait_group.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 by EMC Corporation, All Rights Reserved
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is EMC Corporation
///
/// @author Valery Mironov
////////////////////////////////////////////////////////////////////////////////

#pragma once

#include <atomic>
#include <condition_variable>
#include <mutex>

namespace irs {

// TODO(MBkkt) Considered to replace with YACLib
struct WaitGroup {
explicit WaitGroup(size_t counter = 0) noexcept : counter_{2 * counter + 1} {}

void Add(size_t counter = 1) noexcept {
counter_.fetch_add(2 * counter, std::memory_order_relaxed);
}

void Done(size_t counter = 1) noexcept {
if (counter_.fetch_sub(2 * counter, std::memory_order_acq_rel) ==
2 * counter) {
std::lock_guard lock{m_};
cv_.notify_one();
}
}

// Multiple parallel Wait not supported, if needed check YACLib
void Wait(size_t counter = 0) noexcept {
if (counter_.fetch_sub(1, std::memory_order_acq_rel) != 1) {
std::unique_lock lock{m_};
while (counter_.load(std::memory_order_acquire) != 0) {
cv_.wait(lock);
}
}
// We can put acquire here and remove above, but is it worth?
Reset(counter);
}

// It shouldn't used for synchronization
size_t Count() const noexcept {
return counter_.load(std::memory_order_relaxed) / 2;
}

void Reset(size_t counter) noexcept {
counter_.store(2 * counter + 1, std::memory_order_relaxed);
}

std::mutex& Mutex() noexcept { return m_; }

private:
std::atomic_size_t counter_;
std::condition_variable cv_;
std::mutex m_;
};

} // namespace irs

0 comments on commit e4f0e81

Please sign in to comment.