From e62c02786aafd43bb10504764c5de1e148e78a69 Mon Sep 17 00:00:00 2001 From: Valery Mironov <32071355+MBkkt@users.noreply.github.com> Date: Wed, 8 Nov 2023 11:14:28 +0100 Subject: [PATCH] Make proxy filter thread safe for different segments (#574) --- core/search/filter.hpp | 5 +-- core/search/proxy_filter.cpp | 59 +++++++++++++++++------------- core/search/proxy_filter.hpp | 21 +++++------ tests/search/proxy_filter_test.cpp | 7 ++-- 4 files changed, 48 insertions(+), 44 deletions(-) diff --git a/core/search/filter.hpp b/core/search/filter.hpp index 64e8c75b9..43d7cc61d 100644 --- a/core/search/filter.hpp +++ b/core/search/filter.hpp @@ -98,10 +98,7 @@ class filter { score_t boost() const noexcept { return boost_; } - filter& boost(score_t boost) noexcept { - boost_ = boost; - return *this; - } + void boost(score_t boost) noexcept { boost_ = boost; } virtual type_info::type_id type() const noexcept = 0; diff --git a/core/search/proxy_filter.cpp b/core/search/proxy_filter.cpp index eb095a51a..fc7cb8e72 100644 --- a/core/search/proxy_filter.cpp +++ b/core/search/proxy_filter.cpp @@ -25,9 +25,10 @@ #include #include "cost.hpp" -#include "score.hpp" #include "utils/bitset.hpp" +#include + namespace irs { // Bitset expecting doc iterator to be able only to move forward. @@ -178,39 +179,46 @@ class lazy_filter_bitset_iterator : public doc_iterator, struct proxy_query_cache { proxy_query_cache(IResourceManager& memory, filter::ptr&& ptr) noexcept - : readers_{Alloc{memory}}, real_filter_(std::move(ptr)) {} + : real_filter_{std::move(ptr)}, readers_{Alloc{memory}} {} using Alloc = ManagedTypedAllocator< std::pair>>; + filter::ptr real_filter_; + filter::prepared::ptr real_filter_prepared_; + absl::Mutex readers_lock_; absl::flat_hash_map< const SubReader*, std::unique_ptr, absl::container_internal::hash_default_hash, absl::container_internal::hash_default_eq, Alloc> readers_; - filter::prepared::ptr prepared_real_filter_; - filter::ptr real_filter_; }; class proxy_query : public filter::prepared { public: - explicit proxy_query(proxy_filter::cache_ptr cache) : cache_(cache) { - IRS_ASSERT(cache_->prepared_real_filter_); + explicit proxy_query(proxy_filter::cache_ptr cache) : cache_{cache} { + IRS_ASSERT(cache_->real_filter_prepared_); } doc_iterator::ptr execute(const ExecutionContext& ctx) const final { - // first try to find segment in cache. - [[maybe_unused]] auto& [_, cached] = - *cache_->readers_.emplace(&ctx.segment, nullptr).first; - - if (!cached) { - cached = std::make_unique( - ctx, *cache_->prepared_real_filter_); + auto* cache_bitset = [&]() -> lazy_filter_bitset* { + absl::ReaderMutexLock lock{&cache_->readers_lock_}; + auto it = cache_->readers_.find(&ctx.segment); + if (it != cache_->readers_.end()) { + return it->second.get(); + } + return nullptr; + }(); + if (!cache_bitset) { + auto bitset = std::make_unique( + ctx, *cache_->real_filter_prepared_); + cache_bitset = bitset.get(); + absl::WriterMutexLock lock{&cache_->readers_lock_}; + IRS_ASSERT(!cache_->readers_.contains(&ctx.segment)); + cache_->readers_.emplace(&ctx.segment, std::move(bitset)); } - - IRS_ASSERT(cached); return memory::make_tracked(ctx.memory, - *cached); + *cache_bitset); } void visit(const SubReader&, PreparedStateVisitor&, score_t) const final { @@ -218,25 +226,26 @@ class proxy_query : public filter::prepared { } private: - mutable proxy_filter::cache_ptr cache_; + proxy_filter::cache_ptr cache_; }; filter::prepared::ptr proxy_filter::prepare(const PrepareContext& ctx) const { - if (!cache_ || !cache_->real_filter_ || !ctx.scorers.empty()) { - // Currently we do not support caching scores. - // Proxy filter should not be used with scorers! - IRS_ASSERT(false); + // Currently we do not support caching scores. + // Proxy filter should not be used with scorers! + IRS_ASSERT(ctx.scorers.empty()); + if (!cache_ || !ctx.scorers.empty()) { return filter::prepared::empty(); } - if (!cache_->prepared_real_filter_) { - cache_->prepared_real_filter_ = cache_->real_filter_->prepare(ctx); + if (!cache_->real_filter_prepared_) { + cache_->real_filter_prepared_ = cache_->real_filter_->prepare(ctx); + cache_->real_filter_.reset(); } return memory::make_tracked(ctx.memory, cache_); } filter& proxy_filter::cache_filter(IResourceManager& memory, - filter::ptr&& ptr) { - cache_ = std::make_shared(memory, std::move(ptr)); + filter::ptr&& real) { + cache_ = std::make_shared(memory, std::move(real)); IRS_ASSERT(cache_->real_filter_); return *cache_->real_filter_; } diff --git a/core/search/proxy_filter.hpp b/core/search/proxy_filter.hpp index 6629864d9..850761adb 100644 --- a/core/search/proxy_filter.hpp +++ b/core/search/proxy_filter.hpp @@ -44,19 +44,17 @@ class proxy_filter final : public filter { filter::prepared::ptr prepare(const PrepareContext& ctx) const final; - template - std::pair set_filter(IResourceManager& memory, - Args&&... args) { - static_assert(std::is_base_of_v); - auto& ptr = - cache_filter(memory, std::make_unique(std::forward(args)...)); - return {static_cast(ptr), cache_}; + template + std::pair set_filter(IResourceManager& memory, + Args&&... args) { + static_assert(std::is_base_of_v); + static_assert(std::is_base_of_v); + auto& real = + cache_filter(memory, std::make_unique(std::forward(args)...)); + return {DownCast(real), cache_}; } - proxy_filter& set_cache(cache_ptr cache) noexcept { - cache_ = std::move(cache); - return *this; - } + void set_cache(cache_ptr cache) noexcept { cache_ = std::move(cache); } irs::type_info::type_id type() const noexcept final { return irs::type::id(); @@ -67,4 +65,5 @@ class proxy_filter final : public filter { mutable cache_ptr cache_; }; + } // namespace irs diff --git a/tests/search/proxy_filter_test.cpp b/tests/search/proxy_filter_test.cpp index a699607f9..cbf224a73 100644 --- a/tests/search/proxy_filter_test.cpp +++ b/tests/search/proxy_filter_test.cpp @@ -125,13 +125,12 @@ class doclist_test_filter : public filter { filter::prepared::ptr prepare(const PrepareContext& ctx) const final { ++prepares_; - return memory::make_tracked(ctx.memory, documents_, + return memory::make_tracked(ctx.memory, *documents_, ctx.boost); } - // intentional copy here to simplify multiple runs of same expected void set_expected(const std::vector& documents) { - documents_ = documents; + documents_ = &documents; } irs::type_info::type_id type() const noexcept final { @@ -139,7 +138,7 @@ class doclist_test_filter : public filter { } private: - std::vector documents_; + const std::vector* documents_; static size_t prepares_; };