From e4f0e81bcb985b4295709246b3b524bd06409fbf Mon Sep 17 00:00:00 2001 From: Valery Mironov <32071355+MBkkt@users.noreply.github.com> Date: Fri, 3 Nov 2023 21:22:10 +0100 Subject: [PATCH] Separate WaitGroup (#572) * Separate WaitGroup * Update core/utils/wait_group.hpp Co-authored-by: Andrei Lobov * Exact count * fix lints --------- Co-authored-by: Andrei Lobov --- core/index/index_writer.hpp | 32 +--------------- core/utils/wait_group.hpp | 76 +++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 30 deletions(-) create mode 100644 core/utils/wait_group.hpp diff --git a/core/index/index_writer.hpp b/core/index/index_writer.hpp index c3bcb251a..9645d8db5 100644 --- a/core/index/index_writer.hpp +++ b/core/index/index_writer.hpp @@ -45,6 +45,7 @@ #include "utils/object_pool.hpp" #include "utils/string.hpp" #include "utils/thread_utils.hpp" +#include "utils/wait_group.hpp" #include @@ -835,36 +836,7 @@ class IndexWriter : private util::noncopyable { std::deque pending_segments_; // entries from 'pending_segments_' that are available for reuse Freelist pending_freelist_; - // TODO(MBkkt) Considered to replace with YACLib in ArangoDB 3.11+ or ... - struct WaitGroup { - std::mutex& Mutex() noexcept { return m_; } - - void Add() noexcept { counter_.fetch_add(1, std::memory_order_relaxed); } - - void Done() noexcept { - if (counter_.fetch_sub(1, std::memory_order_acq_rel) == 1) { - std::lock_guard lock{m_}; - cv_.notify_one(); - } - } - - void Wait(std::unique_lock& lock) noexcept { - IRS_ASSERT(lock.mutex() == &m_); - IRS_ASSERT(lock.owns_lock()); - if (counter_.fetch_sub(1, std::memory_order_acq_rel) != 1) { - do { - cv_.wait(lock); - // relaxed probably enough - } while (counter_.load(std::memory_order_acquire) != 0); - } - counter_.store(1, std::memory_order_relaxed); - } - - private: - std::atomic_size_t counter_{1}; - std::mutex m_; - std::condition_variable cv_; - } pending_; + WaitGroup pending_; // set of segments to be removed from the index upon commit ConsolidatingSegments segment_mask_; diff --git a/core/utils/wait_group.hpp b/core/utils/wait_group.hpp new file mode 100644 index 000000000..b439da655 --- /dev/null +++ b/core/utils/wait_group.hpp @@ -0,0 +1,76 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 by EMC Corporation, All Rights Reserved +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is EMC Corporation +/// +/// @author Valery Mironov +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include +#include +#include + +namespace irs { + +// TODO(MBkkt) Considered to replace with YACLib +struct WaitGroup { + explicit WaitGroup(size_t counter = 0) noexcept : counter_{2 * counter + 1} {} + + void Add(size_t counter = 1) noexcept { + counter_.fetch_add(2 * counter, std::memory_order_relaxed); + } + + void Done(size_t counter = 1) noexcept { + if (counter_.fetch_sub(2 * counter, std::memory_order_acq_rel) == + 2 * counter) { + std::lock_guard lock{m_}; + cv_.notify_one(); + } + } + + // Multiple parallel Wait not supported, if needed check YACLib + void Wait(size_t counter = 0) noexcept { + if (counter_.fetch_sub(1, std::memory_order_acq_rel) != 1) { + std::unique_lock lock{m_}; + while (counter_.load(std::memory_order_acquire) != 0) { + cv_.wait(lock); + } + } + // We can put acquire here and remove above, but is it worth? + Reset(counter); + } + + // It shouldn't used for synchronization + size_t Count() const noexcept { + return counter_.load(std::memory_order_relaxed) / 2; + } + + void Reset(size_t counter) noexcept { + counter_.store(2 * counter + 1, std::memory_order_relaxed); + } + + std::mutex& Mutex() noexcept { return m_; } + + private: + std::atomic_size_t counter_; + std::condition_variable cv_; + std::mutex m_; +}; + +} // namespace irs