Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
[APM-670] Memory accounting prepare and execute (#542)
Browse files Browse the repository at this point in the history
* Add ability to pass resource manager to prepare and execute

* Fix tests

* Use exception for Increase instead of bool

* Change name

* Fix boost compute

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* Update iresearch

* Update iresearch
  • Loading branch information
MBkkt authored Aug 15, 2023
1 parent 418856c commit 9a0acb8
Show file tree
Hide file tree
Showing 90 changed files with 2,883 additions and 1,787 deletions.
61 changes: 31 additions & 30 deletions core/formats/columnstore2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,21 +381,22 @@ class column_base : public column_reader, private util::noncopyable {

column_header& mutable_header() { return hdr_; }
void reset_stream(const index_input* stream) { stream_ = stream; }
bool allocate_buffered_memory(size_t size, size_t mappings) {
if (!resource_manager_cached_.Increase(size + mappings)) {
auto column_name = name();
if (irs::IsNull(column_name)) {
column_name = "<anonymous>";
}
IRS_LOG_WARN(
absl::StrCat("Failed to allocate memory for buffered column id ",
header().id, " name: ", column_name, " of size ", size));
return false;
}

bool allocate_buffered_memory(size_t size, size_t mappings) noexcept try {
resource_manager_cached_.Increase(size + mappings);
// should be only one alllocation
IRS_ASSERT(column_data_.empty());
column_data_.resize(size);
return true;
} catch (...) {
auto column_name = name();
if (irs::IsNull(column_name)) {
column_name = "<anonymous>";
}
IRS_LOG_WARN(
absl::StrCat("Failed to allocate memory for buffered column id ",
header().id, " name: ", column_name, " of size ", size));
return false;
}

size_t calculate_bitmap_size(size_t file_len,
Expand Down Expand Up @@ -619,9 +620,9 @@ struct mask_column : public column_base {
const index_input& data_in,
compression::decompressor::ptr&& /*inflater*/,
encryption::stream* cipher) {
return memory::make_tracked_managed<column_reader, mask_column>(
rm_r, std::move(name), rm_c, std::move(payload), std::move(hdr),
std::move(index), data_in, cipher);
return memory::make_tracked<mask_column>(rm_r, std::move(name), rm_c,
std::move(payload), std::move(hdr),
std::move(index), data_in, cipher);
}

mask_column(std::optional<std::string>&& name, IResourceManager& rm_c,
Expand Down Expand Up @@ -753,7 +754,7 @@ column_ptr dense_fixed_length_column::read(
compression::decompressor::ptr&& inflater, encryption::stream* cipher) {
const uint64_t len = index_in.read_long();
const uint64_t data = index_in.read_long();
return memory::make_tracked_managed<column_reader, dense_fixed_length_column>(
return memory::make_tracked<dense_fixed_length_column>(
rm_r, std::move(name), rm_c, std::move(payload), std::move(hdr),
std::move(index), data_in, std::move(inflater), cipher, data, len);
}
Expand Down Expand Up @@ -787,7 +788,7 @@ doc_iterator::ptr dense_fixed_length_column::iterator(ColumnHint hint) const {

class fixed_length_column : public column_base {
public:
using Blocks = std::vector<uint64_t, ManagedTypedAllocator<uint64_t>>;
using Blocks = ManagedVector<uint64_t>;

static column_ptr read(std::optional<std::string>&& name,
IResourceManager& rm_r, IResourceManager& rm_c,
Expand All @@ -798,7 +799,7 @@ class fixed_length_column : public column_base {
encryption::stream* cipher) {
const uint64_t len = index_in.read_long();
auto blocks = read_blocks_dense(hdr, index_in, rm_r);
return memory::make_tracked_managed<column_reader, fixed_length_column>(
return memory::make_tracked<fixed_length_column>(
rm_r, std::move(name), rm_c, std::move(payload), std::move(hdr),
std::move(index), data_in, std::move(inflater), cipher, std::move(blocks),
len);
Expand Down Expand Up @@ -997,18 +998,19 @@ class sparse_column : public column_base {
compression::decompressor::ptr&& inflater,
encryption::stream* cipher) {
auto blocks = read_blocks_sparse(hdr, index_in, rm_r);
return memory::make_tracked_managed<column_reader, sparse_column>(
return memory::make_tracked<sparse_column>(
rm_r, std::move(name), rm_c, std::move(payload), std::move(hdr),
std::move(index), data_in, std::move(inflater), cipher,
std::move(blocks));
}

sparse_column(
std::optional<std::string>&& name, IResourceManager& resource_manager,
bstring&& payload, column_header&& hdr, column_index&& index,
const index_input& data_in, compression::decompressor::ptr&& inflater,
encryption::stream* cipher,
std::vector<column_block, ManagedTypedAllocator<column_block>>&& blocks)
sparse_column(std::optional<std::string>&& name,
IResourceManager& resource_manager, bstring&& payload,
column_header&& hdr, column_index&& index,
const index_input& data_in,
compression::decompressor::ptr&& inflater,
encryption::stream* cipher,
ManagedVector<column_block>&& blocks)
: column_base{std::move(name), resource_manager, std::move(payload),
std::move(hdr), std::move(index), data_in,
cipher},
Expand Down Expand Up @@ -1053,9 +1055,9 @@ class sparse_column : public column_base {
}

private:
static std::vector<column_block, ManagedTypedAllocator<column_block>>
read_blocks_sparse(const column_header& hdr, index_input& in,
IResourceManager& resource_manager);
static ManagedVector<column_block> read_blocks_sparse(
const column_header& hdr, index_input& in,
IResourceManager& resource_manager);

template<typename ValueReader>
class payload_reader : private ValueReader {
Expand All @@ -1072,8 +1074,7 @@ class sparse_column : public column_base {

template<bool encrypted>
bool make_buffered_data(
column_header& hdr, index_input& in,
std::vector<column_block, ManagedTypedAllocator<column_block>>& blocks,
column_header& hdr, index_input& in, ManagedVector<column_block>& blocks,
std::vector<byte_type>& column_data,
std::span<memory::managed_ptr<column_reader>> next_sorted_columns,
remapped_bytes_view_input::mapping* mapping) {
Expand Down Expand Up @@ -1167,7 +1168,7 @@ class sparse_column : public column_base {
return true;
}

std::vector<column_block, ManagedTypedAllocator<column_block>> blocks_;
ManagedVector<column_block> blocks_;
compression::decompressor::ptr inflater_;
};

Expand Down
3 changes: 1 addition & 2 deletions core/formats/columnstore2.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ class column final : public irs::column_output {
irs::type_info compression_;
compression::compressor::ptr deflater_;
columnstore_writer::column_finalizer_f finalizer_;
std::vector<column_block, ManagedTypedAllocator<column_block>>
blocks_; // at most 65536 blocks
ManagedVector<column_block> blocks_; // at most 65536 blocks
memory_output data_;
memory_output docs_;
sparse_bitmap_writer docs_writer_{docs_.stream, ctx_.version};
Expand Down
11 changes: 5 additions & 6 deletions core/formats/formats.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ using DocumentMask =
absl::container_internal::hash_default_hash<doc_id_t>,
absl::container_internal::hash_default_eq<doc_id_t>,
ManagedTypedAllocator<doc_id_t>>;
using DocMap = std::vector<doc_id_t, ManagedTypedAllocator<doc_id_t>>;
using DocMap = ManagedVector<doc_id_t>;
using DocMapView = std::span<const doc_id_t>;
using callback_f = std::function<bool(doc_iterator&)>;

Expand All @@ -76,7 +76,7 @@ struct WanderatorOptions {
struct SegmentWriterOptions {
const ColumnInfoProvider& column_info;
const FeatureInfoProvider& feature_info;
const std::set<irs::type_info::type_id>& scorers_features;
const feature_set_t& scorers_features;
ScorersView scorers;
const Comparer* const comparator{};
IResourceManager& resource_manager{IResourceManager::kNoop};
Expand Down Expand Up @@ -124,9 +124,8 @@ struct postings_writer {
virtual ~postings_writer() = default;
// out - corresponding terms stream
virtual void prepare(index_output& out, const flush_state& state) = 0;
virtual void begin_field(
IndexFeatures index_features,
const std::map<irs::type_info::type_id, field_id>& features) = 0;
virtual void begin_field(IndexFeatures index_features,
const feature_map_t& features) = 0;
virtual state write(doc_iterator& docs) = 0;
virtual void begin_block() = 0;
virtual void encode(data_output& out, const term_meta& state) = 0;
Expand Down Expand Up @@ -485,7 +484,7 @@ struct flush_state {
const DocMap* docmap{};
const ColumnProvider* columns{};
// Accumulated segment features
const std::set<type_info::type_id>* features{};
const feature_set_t* features{};
// Segment name
const std::string_view name; // segment name
ScorersView scorers;
Expand Down
4 changes: 2 additions & 2 deletions core/formats/formats_burst_trie.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ const fst::FstReadOptions& fst_read_options() {
// mininum size of string weight we store in FST
[[maybe_unused]] constexpr const size_t MIN_WEIGHT_SIZE = 2;

using Blocks = std::vector<entry, ManagedTypedAllocator<entry>>;
using Blocks = ManagedVector<entry>;

void MergeBlocks(Blocks& blocks, OutputBuffer& buffer) {
IRS_ASSERT(!blocks.empty());
Expand Down Expand Up @@ -1028,7 +1028,7 @@ class field_writer final : public irs::field_writer {
encryption::stream::ptr index_out_cipher_;
index_output::ptr index_out_; // output stream for indexes
postings_writer::ptr pw_; // postings writer
std::vector<entry, ManagedTypedAllocator<entry>> stack_;
ManagedVector<entry> stack_;
fst_buffer* fst_buf_; // pimpl buffer used for building FST for fields
volatile_byte_ref last_term_; // last pushed term
std::vector<size_t> prefixes_;
Expand Down
2 changes: 1 addition & 1 deletion core/formats/skip_list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class SkipWriter : util::noncopyable {
void Skip(doc_id_t count, Writer&& write);

protected:
std::vector<memory_output, ManagedTypedAllocator<memory_output>> levels_;
ManagedVector<memory_output> levels_;
size_t max_levels_;
doc_id_t skip_0_; // skip interval for 0 level
doc_id_t skip_n_; // skip interval for 1..n levels
Expand Down
3 changes: 1 addition & 2 deletions core/index/buffered_column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ struct BufferedValue {

class BufferedColumn final : public column_output, private util::noncopyable {
public:
using BufferedValues =
std::vector<BufferedValue, ManagedTypedAllocator<BufferedValue>>;
using BufferedValues = ManagedVector<BufferedValue>;
using Buffer = irs::basic_string<irs::byte_type,
irs::ManagedTypedAllocator<irs::byte_type>>;
explicit BufferedColumn(const ColumnInfo& info, IResourceManager& rm)
Expand Down
4 changes: 0 additions & 4 deletions core/index/field_meta.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ struct field_stats {
uint32_t num_unique{};
};

using feature_map_t = std::map<type_info::type_id, field_id>;
using feature_set_t = std::set<type_info::type_id>;
using features_t = std::span<const type_info::type_id>;

// Represents field metadata
struct field_meta {
public:
Expand Down
6 changes: 6 additions & 0 deletions core/index/index_features.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#pragma once

#include <functional>
#include <map>
#include <set>
#include <span>

#include "index/column_info.hpp"
Expand Down Expand Up @@ -81,4 +83,8 @@ using FeatureInfoProvider =
std::function<std::pair<ColumnInfo, FeatureWriterFactory>(
type_info::type_id)>;

using feature_map_t = std::map<type_info::type_id, field_id>;
using feature_set_t = std::set<type_info::type_id>;
using features_t = std::span<const type_info::type_id>;

} // namespace irs
12 changes: 6 additions & 6 deletions core/index/index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ void RemoveFromExistingSegment(DocumentMask& deleted_docs,
return;
}

auto prepared = query.filter->prepare(reader);
auto prepared = query.filter->prepare({.index = reader});

if (IRS_UNLIKELY(!prepared)) {
return; // skip invalid prepared filters
}

auto itr = prepared->execute(reader);
auto itr = prepared->execute({.segment = reader});

if (IRS_UNLIKELY(!itr)) {
return; // skip invalid iterators
Expand All @@ -232,12 +232,12 @@ bool RemoveFromImportedSegment(DocumentMask& deleted_docs,
return false;
}

auto prepared = query.filter->prepare(reader);
auto prepared = query.filter->prepare({.index = reader});
if (IRS_UNLIKELY(!prepared)) {
return false; // skip invalid prepared filters
}

auto itr = prepared->execute(reader);
auto itr = prepared->execute({.segment = reader});
if (IRS_UNLIKELY(!itr)) {
return false; // skip invalid iterators
}
Expand Down Expand Up @@ -270,13 +270,13 @@ void FlushedSegmentContext::Remove(IndexWriter::QueryContext& query) {

auto& document_mask = flushed.document_mask;

auto prepared = query.filter->prepare(*reader);
auto prepared = query.filter->prepare({.index = *reader});

if (IRS_UNLIKELY(!prepared)) {
return; // Skip invalid prepared filters
}

auto itr = prepared->execute(*reader);
auto itr = prepared->execute({.segment = *reader});

if (IRS_UNLIKELY(!itr)) {
return; // Skip invalid iterators
Expand Down
8 changes: 3 additions & 5 deletions core/index/index_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,16 +678,14 @@ class IndexWriter : private util::noncopyable {
RefTrackingDirectory dir_;

// sequential list of pending modification
std::vector<QueryContext, ManagedTypedAllocator<QueryContext>> queries_;
ManagedVector<QueryContext> queries_;
// all of the previously flushed versions of this segment
std::vector<FlushedSegment, ManagedTypedAllocator<FlushedSegment>> flushed_;
ManagedVector<FlushedSegment> flushed_;
// update_contexts to use with 'flushed_'
// sequentially increasing through all offsets
// (sequential doc_id in 'flushed_' == offset + doc_limits::min(), size()
// == sum of all 'flushed_'.'docs_count')
std::vector<segment_writer::DocContext,
ManagedTypedAllocator<segment_writer::DocContext>>
flushed_docs_;
ManagedVector<segment_writer::DocContext> flushed_docs_;

// function to get new SegmentMeta from
segment_meta_generator_t meta_generator_;
Expand Down
2 changes: 1 addition & 1 deletion core/index/merge_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void AccumulateFeatures(feature_set_t& accum, const feature_map_t& features) {

// mapping of old doc_id to new doc_id (reader doc_ids are sequential 0 based)
// masked doc_ids have value of MASKED_DOC_ID
using doc_id_map_t = std::vector<doc_id_t, ManagedTypedAllocator<doc_id_t>>;
using doc_id_map_t = ManagedVector<doc_id_t>;

// document mapping function
using doc_map_f = std::function<doc_id_t(doc_id_t)>;
Expand Down
9 changes: 4 additions & 5 deletions core/index/merge_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ class MergeWriter : public util::noncopyable {
ReaderCtx(const SubReader& reader, IResourceManager& rm) noexcept
: ReaderCtx{&reader, rm} {}

const SubReader* reader; // segment reader
std::vector<doc_id_t, ManagedTypedAllocator<doc_id_t>>
doc_id_map; // FIXME use bitpacking vector
const SubReader* reader; // segment reader
ManagedVector<doc_id_t> doc_id_map; // FIXME use bitpacking vector
std::function<doc_id_t(doc_id_t)> doc_map; // mapping function
};

Expand Down Expand Up @@ -98,11 +97,11 @@ class MergeWriter : public util::noncopyable {
const FlushProgress& progress);

directory& dir_;
std::vector<ReaderCtx, ManagedTypedAllocator<ReaderCtx>> readers_;
ManagedVector<ReaderCtx> readers_;
const ColumnInfoProvider* column_info_{};
const FeatureInfoProvider* feature_info_{};
ScorersView scorers_;
const std::set<irs::type_info::type_id>* scorers_features_{};
const feature_set_t* scorers_features_{};
const Comparer* const comparator_{};
};

Expand Down
2 changes: 1 addition & 1 deletion core/index/segment_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ class segment_writer : public ColumnProvider, util::noncopyable {
cached_columns_; // pointers remain valid
absl::flat_hash_map<field_id, cached_column*> column_ids_;
sorted_column sort_;
std::vector<DocContext, ManagedTypedAllocator<DocContext>> docs_context_;
ManagedVector<DocContext> docs_context_;
// invalid/removed doc_ids (e.g. partially indexed due to indexing failure)
DocsMask docs_mask_;
fields_data fields_;
Expand Down
9 changes: 7 additions & 2 deletions core/resource_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@

#pragma once

#include <vector>

#include "shared.hpp"
#include "utils/managed_allocator.hpp"

namespace irs {

struct IResourceManager {
static IResourceManager kNoop;
#ifdef IRESEARCH_DEBUG
Expand All @@ -38,10 +41,9 @@ struct IResourceManager {
IResourceManager(const IResourceManager&) = delete;
IResourceManager operator=(const IResourceManager&) = delete;

virtual bool Increase([[maybe_unused]] size_t v) noexcept {
virtual void Increase([[maybe_unused]] size_t v) {
IRS_ASSERT(this != &kForbidden);
IRS_ASSERT(v != 0);
return true;
}

virtual void Decrease([[maybe_unused]] size_t v) noexcept {
Expand Down Expand Up @@ -82,4 +84,7 @@ struct ManagedTypedAllocator
using Base::Base;
};

template<typename T>
using ManagedVector = std::vector<T, ManagedTypedAllocator<T>>;

} // namespace irs
Loading

0 comments on commit 9a0acb8

Please sign in to comment.