Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
Make proxy filter thread safe for different segments (#574)
Browse files Browse the repository at this point in the history
  • Loading branch information
MBkkt committed Nov 8, 2023
1 parent b2fe116 commit e62c027
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 44 deletions.
5 changes: 1 addition & 4 deletions core/search/filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ class filter {

score_t boost() const noexcept { return boost_; }

filter& boost(score_t boost) noexcept {
boost_ = boost;
return *this;
}
void boost(score_t boost) noexcept { boost_ = boost; }

virtual type_info::type_id type() const noexcept = 0;

Expand Down
59 changes: 34 additions & 25 deletions core/search/proxy_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
#include <bit>

#include "cost.hpp"
#include "score.hpp"
#include "utils/bitset.hpp"

#include <absl/synchronization/mutex.h>

namespace irs {

// Bitset expecting doc iterator to be able only to move forward.
Expand Down Expand Up @@ -178,65 +179,73 @@ class lazy_filter_bitset_iterator : public doc_iterator,

struct proxy_query_cache {
proxy_query_cache(IResourceManager& memory, filter::ptr&& ptr) noexcept
: readers_{Alloc{memory}}, real_filter_(std::move(ptr)) {}
: real_filter_{std::move(ptr)}, readers_{Alloc{memory}} {}

using Alloc = ManagedTypedAllocator<
std::pair<const SubReader* const, std::unique_ptr<lazy_filter_bitset>>>;

filter::ptr real_filter_;
filter::prepared::ptr real_filter_prepared_;
absl::Mutex readers_lock_;
absl::flat_hash_map<
const SubReader*, std::unique_ptr<lazy_filter_bitset>,
absl::container_internal::hash_default_hash<const SubReader*>,
absl::container_internal::hash_default_eq<const SubReader*>, Alloc>
readers_;
filter::prepared::ptr prepared_real_filter_;
filter::ptr real_filter_;
};

class proxy_query : public filter::prepared {
public:
explicit proxy_query(proxy_filter::cache_ptr cache) : cache_(cache) {
IRS_ASSERT(cache_->prepared_real_filter_);
explicit proxy_query(proxy_filter::cache_ptr cache) : cache_{cache} {
IRS_ASSERT(cache_->real_filter_prepared_);
}

doc_iterator::ptr execute(const ExecutionContext& ctx) const final {
// first try to find segment in cache.
[[maybe_unused]] auto& [_, cached] =
*cache_->readers_.emplace(&ctx.segment, nullptr).first;

if (!cached) {
cached = std::make_unique<lazy_filter_bitset>(
ctx, *cache_->prepared_real_filter_);
auto* cache_bitset = [&]() -> lazy_filter_bitset* {
absl::ReaderMutexLock lock{&cache_->readers_lock_};
auto it = cache_->readers_.find(&ctx.segment);
if (it != cache_->readers_.end()) {
return it->second.get();
}
return nullptr;
}();
if (!cache_bitset) {
auto bitset = std::make_unique<lazy_filter_bitset>(
ctx, *cache_->real_filter_prepared_);
cache_bitset = bitset.get();
absl::WriterMutexLock lock{&cache_->readers_lock_};
IRS_ASSERT(!cache_->readers_.contains(&ctx.segment));
cache_->readers_.emplace(&ctx.segment, std::move(bitset));
}

IRS_ASSERT(cached);
return memory::make_tracked<lazy_filter_bitset_iterator>(ctx.memory,
*cached);
*cache_bitset);
}

void visit(const SubReader&, PreparedStateVisitor&, score_t) const final {
// No terms to visit
}

private:
mutable proxy_filter::cache_ptr cache_;
proxy_filter::cache_ptr cache_;
};

filter::prepared::ptr proxy_filter::prepare(const PrepareContext& ctx) const {
if (!cache_ || !cache_->real_filter_ || !ctx.scorers.empty()) {
// Currently we do not support caching scores.
// Proxy filter should not be used with scorers!
IRS_ASSERT(false);
// Currently we do not support caching scores.
// Proxy filter should not be used with scorers!
IRS_ASSERT(ctx.scorers.empty());
if (!cache_ || !ctx.scorers.empty()) {
return filter::prepared::empty();
}
if (!cache_->prepared_real_filter_) {
cache_->prepared_real_filter_ = cache_->real_filter_->prepare(ctx);
if (!cache_->real_filter_prepared_) {
cache_->real_filter_prepared_ = cache_->real_filter_->prepare(ctx);
cache_->real_filter_.reset();
}
return memory::make_tracked<proxy_query>(ctx.memory, cache_);
}

filter& proxy_filter::cache_filter(IResourceManager& memory,
filter::ptr&& ptr) {
cache_ = std::make_shared<proxy_query_cache>(memory, std::move(ptr));
filter::ptr&& real) {
cache_ = std::make_shared<proxy_query_cache>(memory, std::move(real));
IRS_ASSERT(cache_->real_filter_);
return *cache_->real_filter_;
}
Expand Down
21 changes: 10 additions & 11 deletions core/search/proxy_filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,17 @@ class proxy_filter final : public filter {

filter::prepared::ptr prepare(const PrepareContext& ctx) const final;

template<typename T, typename... Args>
std::pair<T&, cache_ptr> set_filter(IResourceManager& memory,
Args&&... args) {
static_assert(std::is_base_of_v<filter, T>);
auto& ptr =
cache_filter(memory, std::make_unique<T>(std::forward<Args>(args)...));
return {static_cast<T&>(ptr), cache_};
template<typename Impl, typename Base = Impl, typename... Args>
std::pair<Base&, cache_ptr> set_filter(IResourceManager& memory,
Args&&... args) {
static_assert(std::is_base_of_v<filter, Base>);
static_assert(std::is_base_of_v<Base, Impl>);
auto& real =
cache_filter(memory, std::make_unique<Impl>(std::forward<Args>(args)...));
return {DownCast<Base>(real), cache_};
}

proxy_filter& set_cache(cache_ptr cache) noexcept {
cache_ = std::move(cache);
return *this;
}
void set_cache(cache_ptr cache) noexcept { cache_ = std::move(cache); }

irs::type_info::type_id type() const noexcept final {
return irs::type<proxy_filter>::id();
Expand All @@ -67,4 +65,5 @@ class proxy_filter final : public filter {

mutable cache_ptr cache_;
};

} // namespace irs
7 changes: 3 additions & 4 deletions tests/search/proxy_filter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,20 @@ class doclist_test_filter : public filter {

filter::prepared::ptr prepare(const PrepareContext& ctx) const final {
++prepares_;
return memory::make_tracked<doclist_test_query>(ctx.memory, documents_,
return memory::make_tracked<doclist_test_query>(ctx.memory, *documents_,
ctx.boost);
}

// intentional copy here to simplify multiple runs of same expected
void set_expected(const std::vector<doc_id_t>& documents) {
documents_ = documents;
documents_ = &documents;
}

irs::type_info::type_id type() const noexcept final {
return irs::type<doclist_test_filter>::id();
}

private:
std::vector<doc_id_t> documents_;
const std::vector<doc_id_t>* documents_;
static size_t prepares_;
};

Expand Down

0 comments on commit e62c027

Please sign in to comment.