From ce70b50b4f49379b6dd58eed08b0c2abea7d7c99 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Sun, 5 Nov 2023 23:35:54 +0800 Subject: [PATCH] [refactor](sink) move where clase on mv to row distribution For better performance and elasticity, we move memtable from loadchannel to sink, VTabletSinkV2 is introduced, then there are VTabletWriter and VTabletSinkV2 distributing rows to tablets. where clauses on mvs are executed in VTabletWriter, while VTabletSinkV2 needs it too. So common code is moved to row distribution. Actually, we can layer code by rows' data flow, then the code is much more understood and maintainable. ScanNode -> Sink/Writer (RowDistribution -> IndexChannel / DeltaWriter) --- be/src/vec/sink/vrow_distribution.cpp | 312 +++++++++++----------- be/src/vec/sink/vrow_distribution.h | 67 +++-- be/src/vec/sink/vtablet_sink_v2.cpp | 61 ++--- be/src/vec/sink/vtablet_sink_v2.h | 14 +- be/src/vec/sink/writer/vtablet_writer.cpp | 105 ++++---- be/src/vec/sink/writer/vtablet_writer.h | 28 +- 6 files changed, 282 insertions(+), 305 deletions(-) diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 53784f0bd9bd037..fc0ed903dcf3af8 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -15,14 +15,18 @@ // specific language governing permissions and limitations // under the License. +#include "vec/sink/vrow_distribution.h" #include #include + #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "util/thrift_rpc_helper.h" -#include "vec/sink/vrow_distribution.h" +#include "vec/columns/column_const.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_vector.h" #include "vec/sink/writer/vtablet_writer.h" namespace doris::vectorized { @@ -33,12 +37,11 @@ VRowDistribution::_get_partition_function() { } void VRowDistribution::_save_missing_values(vectorized::ColumnPtr col, - vectorized::DataTypePtr value_type, - std::vector filter) { + vectorized::DataTypePtr value_type) { _partitions_need_create.clear(); std::set deduper; // de-duplication - for (auto row : filter) { + for (auto row : _missing_map) { deduper.emplace(value_type->to_string(*col, row)); } for (auto& value : deduper) { @@ -78,171 +81,149 @@ Status VRowDistribution::_automatic_create_partition() { return status; } -// Generate channel payload for sinking data to differenct node channel -// Payload = std::pair, std::vector>; -// first = row_id, second = vector -void VRowDistribution::_generate_rows_distribution_payload( - ChannelDistributionPayload& channel_to_payload, - const std::vector& partitions, - const std::vector& tablet_indexes, const std::vector& skip, - size_t row_cnt) { - for (int row_idx = 0; row_idx < row_cnt; row_idx++) { - if (skip[row_idx]) { +void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_idx, + std::vector& tablet_ids) { + tablet_ids.reserve(block->rows()); + for (int row_idx = 0; row_idx < block->rows(); row_idx++) { + if (_skip[row_idx]) { continue; } - const auto& partition = partitions[row_idx]; - const auto& tablet_index = tablet_indexes[row_idx]; - - for (int index_num = 0; index_num < partition->indexes.size(); - ++index_num) { // partition->indexes = [index, tablets...] - - auto tablet_id = partition->indexes[index_num].tablets[tablet_index]; - LOG(WARNING) << "got tablet it " << tablet_id << " at row " << row_idx; - auto it = (*_channels)[index_num]->_channels_by_tablet.find( - tablet_id); // (tablet_id, VNodeChannel) where this tablet locate - - DCHECK(it != (*_channels)[index_num]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_index; - - std::vector>& tablet_locations = it->second; - std::unordered_map& payloads_this_index = - channel_to_payload[index_num]; // payloads of this index in every node - - for (const auto& locate_node : tablet_locations) { - auto payload_it = - payloads_this_index.find(locate_node.get()); // - if (payload_it == payloads_this_index.end()) { - auto [tmp_it, _] = payloads_this_index.emplace( - locate_node.get(), - Payload {std::make_unique(), - std::vector()}); - payload_it = tmp_it; - payload_it->second.first->reserve(row_cnt); - payload_it->second.second.reserve(row_cnt); - } - payload_it->second.first->push_back(row_idx); - payload_it->second.second.push_back(tablet_id); - } - _number_output_rows++; - } + auto& partition = _partitions[row_idx]; + auto& tablet_index = _tablet_indexes[row_idx]; + auto& index = partition->indexes[index_idx]; + + auto tablet_id = index.tablets[tablet_index]; + tablet_ids[row_idx] = tablet_id; } } -Status VRowDistribution::_single_partition_generate(vectorized::Block* block, - ChannelDistributionPayload& channel_to_payload, - bool has_filtered_rows) { - auto num_rows = block->rows(); - // only need to calculate one value for single partition. - std::vector partitions {1, nullptr}; - std::vector skip; - skip.resize(1); - std::vector tablet_indexes; - tablet_indexes.resize(1); - bool stop_processing = false; - - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, 1, partitions, tablet_indexes, - stop_processing, skip)); - - const auto* partition = partitions[0]; - const auto& tablet_index = tablet_indexes[0]; - - if (partition == nullptr) { - return Status::OK(); +void VRowDistribution::_filter_block_by_skip(vectorized::Block* block, + RowPartTabletTuple& row_part_tablet_tuple) { + auto& row_ids = std::get<0>(row_part_tablet_tuple); + auto& partition_ids = std::get<1>(row_part_tablet_tuple); + auto& tablet_ids = std::get<2>(row_part_tablet_tuple); + + for (size_t i = 0; i < block->rows(); i++) { + if (!_skip[i]) { + row_ids.emplace_back(i); + partition_ids.emplace_back(_partitions[i]->id); + tablet_ids.emplace_back(_tablet_ids[i]); + } } - for (int j = 0; j < partition->indexes.size(); ++j) { - auto tid = partition->indexes[j].tablets[tablet_index]; - auto it = (*_channels)[j]->_channels_by_tablet.find(tid); - DCHECK(it != (*_channels)[j]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_index; - int64_t row_cnt = 0; - for (const auto& channel : it->second) { - if (!channel_to_payload[j].contains(channel.get())) { - channel_to_payload[j].insert( - {channel.get(), Payload {std::make_unique(), - std::vector()}}); +} + +Status VRowDistribution::_filter_block_by_skip_and_where_clause( + vectorized::Block* block, const vectorized::VExprContextSPtr& where_clause, + RowPartTabletTuple& row_part_tablet_tuple) { + // TODO + //SCOPED_RAW_TIMER(&_stat.where_clause_ns); + int result_index = -1; + size_t column_number = block->columns(); + RETURN_IF_ERROR(where_clause->execute(block, &result_index)); + + auto filter_column = block->get_by_position(result_index).column; + + auto& row_ids = std::get<0>(row_part_tablet_tuple); + auto& partition_ids = std::get<1>(row_part_tablet_tuple); + auto& tablet_ids = std::get<2>(row_part_tablet_tuple); + if (auto* nullable_column = + vectorized::check_and_get_column(*filter_column)) { + for (size_t i = 0; i < block->rows(); i++) { + if (nullable_column->get_bool_inline(i) && !_skip[i]) { + row_ids.emplace_back(i); + partition_ids.emplace_back(_partitions[i]->id); + tablet_ids.emplace_back(_tablet_ids[i]); } - auto& selector = channel_to_payload[j][channel.get()].first; - auto& tablet_ids = channel_to_payload[j][channel.get()].second; - for (int32_t i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { - continue; - } - selector->push_back(i); + } + } else if (auto* const_column = + vectorized::check_and_get_column(*filter_column)) { + bool ret = const_column->get_bool(0); + if (!ret) { + return Status::OK(); + } + // should we optimize? + _filter_block_by_skip(block, row_part_tablet_tuple); + } else { + auto& filter = assert_cast(*filter_column).get_data(); + for (size_t i = 0; i < block->rows(); i++) { + if (filter[i] != 0 && !_skip[i]) { + row_ids.emplace_back(i); + partition_ids.emplace_back(_partitions[i]->id); + tablet_ids.emplace_back(_tablet_ids[i]); } - tablet_ids.resize(selector->size(), tid); - row_cnt = selector->size(); } - _number_output_rows += row_cnt; + } + + for (size_t i = block->columns() - 1; i >= column_number; i--) { + block->erase(i); + } + return Status::OK(); +} + +Status VRowDistribution::_filter_block(vectorized::Block* block, + std::vector& row_part_tablet_tuples) { + for (int i = 0; i < _schema->indexes().size(); i++) { + _get_tablet_ids(block, i, _tablet_ids); + auto& where_clause = _schema->indexes()[i]->where_clause; + if (where_clause != nullptr) { + RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause, + row_part_tablet_tuples[i])); + } else { + _filter_block_by_skip(block, row_part_tablet_tuples[i]); + } } return Status::OK(); } Status VRowDistribution::_generate_rows_distribution_for_non_auto_parititon( - std::shared_ptr& block, bool has_filtered_rows, - ChannelDistributionPayload& channel_to_payload) { + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_tuples) { auto num_rows = block->rows(); - std::vector partitions {num_rows, nullptr}; - std::vector skip; - skip.resize(num_rows); - std::vector tablet_indexes; - tablet_indexes.resize(num_rows); bool stop_processing = false; - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, - tablet_indexes, stop_processing, skip)); - - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - skip[i] = skip[i] || _block_convertor->filter_map()[i]; - } - } - _generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes, skip, - num_rows); - return Status::OK(); + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip)); + + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; + } + } + RETURN_IF_ERROR(_filter_block(block, row_part_tablet_tuples)); + return Status::OK(); } Status VRowDistribution::_generate_rows_distribution_for_auto_parititon( - std::shared_ptr& block, bool has_filtered_rows, - ChannelDistributionPayload& channel_to_payload) { + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_tuples) { auto num_rows = block->rows(); std::vector partition_keys = _vpartition->get_partition_keys(); //TODO: use loop to create missing_vals for multi column. - CHECK(partition_keys.size() == 1) - << "now support only 1 partition column for auto partitions."; + CHECK(partition_keys.size() == 1) << "now support only 1 partition column for auto partitions."; auto partition_col = block->get_by_position(partition_keys[0]); - - std::vector missing_map; // indice of missing values in partition_col - missing_map.reserve(partition_col.column->size()); - - // try to find tablet and save missing value - std::vector partitions {num_rows, nullptr}; - std::vector skip; - skip.resize(num_rows); - std::vector tablet_indexes; - tablet_indexes.resize(num_rows); - + _missing_map.clear(); + _missing_map.reserve(partition_col.column->size()); bool stop_processing = false; //TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time. - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, - tablet_indexes, stop_processing, skip, - &missing_map)); + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip, + &_missing_map)); - if (missing_map.empty()) { + if (_missing_map.empty()) { // we don't calculate it distribution when have missing values if (has_filtered_rows) { for (int i = 0; i < num_rows; i++) { - skip[i] = skip[i] || _block_convertor->filter_map()[i]; + _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; } } - _generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes, - skip, num_rows); + RETURN_IF_ERROR(_filter_block(block, row_part_tablet_tuples)); } else { // for missing partition keys, calc the missing partition and save in _partitions_need_create // if there's projection of partition calc, we need to calc it first. auto [part_ctx, part_func] = _get_partition_function(); int result_idx = -1; if (_vpartition->is_projection_partition()) { // calc the start value of missing partition ranges. - RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &result_idx)); + RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block, &result_idx)); VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(); // change the column to compare to transformed. _vpartition->set_transformed_slots({(uint16_t)result_idx}); @@ -255,56 +236,71 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_parititon( if (const auto* nullable = check_and_get_column(*range_left_col)) { range_left_col = nullable->get_nested_column_ptr(); - return_type = - assert_cast(return_type.get()) - ->get_nested_type(); + return_type = assert_cast(return_type.get()) + ->get_nested_type(); } // calc the end value and save them. - _save_missing_values(range_left_col, return_type, missing_map); + _save_missing_values(range_left_col, return_type); // then call FE to create it. then FragmentExecutor will redo the load. RETURN_IF_ERROR(_automatic_create_partition()); // In the next round, we will _generate_rows_distribution_payload again to get right payload of new tablet LOG(INFO) << "Auto created partition. Send block again."; return Status::NeedSendAgain(""); - } // creating done + } // creating done return Status::OK(); } -Status VRowDistribution::generate_rows_distribution(vectorized::Block& input_block, - std::shared_ptr& block, - int64_t& filtered_rows, bool& has_filtered_rows, - ChannelDistributionPayload& channel_to_payload) { - auto rows = input_block.rows(); +void VRowDistribution::_reset_row_part_tablet_tuples( + std::vector& row_part_tablet_tuples, int64_t rows) { + for (auto& row_part_tablet_tuple : row_part_tablet_tuples) { + auto& row_ids = std::get<0>(row_part_tablet_tuple); + auto& partition_ids = std::get<1>(row_part_tablet_tuple); + auto& tablet_ids = std::get<2>(row_part_tablet_tuple); + + row_ids.clear(); + partition_ids.clear(); + tablet_ids.clear(); + row_ids.reserve(rows); + partition_ids.reserve(rows); + tablet_ids.reserve(rows); + } +} + +Status VRowDistribution::generate_rows_distribution( + vectorized::Block& input_block, std::shared_ptr& block, + int64_t& filtered_rows, bool& has_filtered_rows, + std::vector& row_part_tablet_tuples) { + + auto input_rows = input_block.rows(); + _reset_row_part_tablet_tuples(row_part_tablet_tuples, input_rows); int64_t prev_filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows(); RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( - _state, &input_block, block, *_vec_output_expr_ctxs, rows, has_filtered_rows)); + _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows)); - channel_to_payload.resize(_channels->size()); _tablet_finder->clear_for_new_batch(); _row_distribution_watch.start(); auto num_rows = block->rows(); _tablet_finder->filter_bitmap().Reset(num_rows); - size_t partition_num = _vpartition->get_partitions().size(); - if (!_vpartition->is_auto_partition() && partition_num == 1 && - _tablet_finder->is_find_tablet_every_sink()) { - RETURN_IF_ERROR(_single_partition_generate(block.get(), channel_to_payload, - has_filtered_rows)); - } else { - if (_vpartition->is_auto_partition()) { - RETURN_IF_ERROR(_generate_rows_distribution_for_auto_parititon(block, has_filtered_rows, - channel_to_payload)); - } else { // not auto partition - RETURN_IF_ERROR(_generate_rows_distribution_for_non_auto_parititon(block, has_filtered_rows, - channel_to_payload)); - } + + //reuse vars for find_tablets + _partitions.assign(num_rows, nullptr); + _skip.assign(num_rows, false); + _tablet_indexes.assign(num_rows, 0); + + if (_vpartition->is_auto_partition()) { + RETURN_IF_ERROR(_generate_rows_distribution_for_auto_parititon( + block.get(), has_filtered_rows, row_part_tablet_tuples)); + } else { // not auto partition + RETURN_IF_ERROR(_generate_rows_distribution_for_non_auto_parititon( + block.get(), has_filtered_rows, row_part_tablet_tuples)); } _row_distribution_watch.stop(); - filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() - prev_filtered_rows; + filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() - + prev_filtered_rows; return Status::OK(); } } // namespace doris::vectorized - diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 69a2d3908a8eb57..1843b4f07e3bb3a 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -38,13 +38,15 @@ namespace doris::vectorized { class IndexChannel; class VNodeChannel; -using Payload = std::pair, std::vector>; +// +using RowPartTabletTuple = + std::tuple, std::vector, std::vector>; typedef Status (*OnPartitionsCreated)(void*, TCreatePartitionResult*); class VRowDistributionContext { public: - RuntimeState* state = nullptr; // not owned, set when open + RuntimeState* state = nullptr; std::vector>* channels; OlapTableBlockConvertor* block_convertor = nullptr; OlapTabletFinder* tablet_finder = nullptr; @@ -56,14 +58,15 @@ class VRowDistributionContext { const VExprContextSPtrs* vec_output_expr_ctxs; OnPartitionsCreated on_partitions_created; void* caller; + std::shared_ptr schema; }; class VRowDistribution { public: - VRowDistribution() { - } + VRowDistribution() {} + virtual ~VRowDistribution() {} - void init(VRowDistributionContext *ctx) { + void init(VRowDistributionContext* ctx) { _state = ctx->state; _channels = ctx->channels; _block_convertor = ctx->block_convertor; @@ -76,10 +79,9 @@ class VRowDistribution { _vec_output_expr_ctxs = ctx->vec_output_expr_ctxs; _on_partitions_created = ctx->on_partitions_created; _caller = ctx->caller; + _schema = ctx->schema; } - using ChannelDistributionPayload = std::vector>; - // auto partition // mv where clause // v1 needs index->node->row_ids - tabletids @@ -87,36 +89,41 @@ class VRowDistribution { Status generate_rows_distribution(vectorized::Block& input_block, std::shared_ptr& block, int64_t& filtered_rows, bool& has_filtered_rows, - ChannelDistributionPayload& channel_to_payload); - + std::vector& row_part_tablet_tuples); + private: std::pair _get_partition_function(); - void _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type, - std::vector filter); + + void _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type); // create partitions when need for auto-partition table using #_partitions_need_create. Status _automatic_create_partition(); - Status _single_partition_generate(vectorized::Block* block, - ChannelDistributionPayload& channel_to_payload, - bool has_filtered_rows); + void _get_tablet_ids(vectorized::Block* block, int32_t index_idx, + std::vector& tablet_ids); + + void _filter_block_by_skip(vectorized::Block* block, RowPartTabletTuple& row_part_tablet_tuple); + + Status _filter_block_by_skip_and_where_clause(vectorized::Block* block, + const vectorized::VExprContextSPtr& where_clause, + RowPartTabletTuple& row_part_tablet_tuple); + + Status _filter_block(vectorized::Block* block, + std::vector& row_part_tablet_tuples); Status _generate_rows_distribution_for_auto_parititon( - std::shared_ptr& block, bool has_filtered_rows, - ChannelDistributionPayload& channel_to_payload); + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_tuples); Status _generate_rows_distribution_for_non_auto_parititon( - std::shared_ptr& block, bool has_filtered_rows, - ChannelDistributionPayload& channel_to_payload); + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_tuples); - void _generate_rows_distribution_payload( - ChannelDistributionPayload& channel_to_payload, - const std::vector& partitions, - const std::vector& tablet_indexes, const std::vector& skip, - size_t row_cnt); + void _reset_row_part_tablet_tuples(std::vector& row_part_tablet_tuples, + int64_t rows); private: - RuntimeState* _state = nullptr; // not owned, set when open + RuntimeState* _state = nullptr; // support only one partition column now std::vector> _partitions_need_create; @@ -131,10 +138,18 @@ class VRowDistribution { ObjectPool* _pool; OlapTableLocationParam* _location = nullptr; // std::function _on_partition_created; - int64_t _number_output_rows = 0; + // int64_t _number_output_rows = 0; const VExprContextSPtrs* _vec_output_expr_ctxs; OnPartitionsCreated _on_partitions_created = nullptr; - void *_caller; + void* _caller; + std::shared_ptr _schema; + + // reuse for find_tablet. + std::vector _partitions; + std::vector _skip; + std::vector _tablet_indexes; + std::vector _tablet_ids; + std::vector _missing_map; // indice of missing values in partition_col }; } // namespace doris::vectorized diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index f38785f53f469a8..cc89f5cd9934ade 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -222,29 +222,24 @@ void VOlapTableSinkV2::_build_tablet_node_mapping() { } void VOlapTableSinkV2::_generate_rows_for_tablet( - RowsForTablet& rows_for_tablet, const std::vector& partitions, - const std::vector& tablet_indexes, const std::vector& skip, - size_t row_cnt) { - for (int row_idx = 0; row_idx < row_cnt; row_idx++) { - if (skip[row_idx]) { - continue; - } - - auto& partition = partitions[row_idx]; - auto& tablet_index = tablet_indexes[row_idx]; - - for (const auto& index : partition->indexes) { - auto tablet_id = index.tablets[tablet_index]; + std::vector& row_part_tablet_tuples, RowsForTablet& rows_for_tablet) { + for (int index_idx = 0; index_idx < row_part_tablet_tuples.size(); index_idx++) { + auto& row_ids = std::get<0>(row_part_tablet_tuples[index_idx]); + auto& partition_ids = std::get<1>(row_part_tablet_tuples[index_idx]); + auto& tablet_ids = std::get<2>(row_part_tablet_tuples[index_idx]); + + for (int i = 0; i < row_ids.size(); i++) { + auto& tablet_id = tablet_ids[i]; auto it = rows_for_tablet.find(tablet_id); if (it == rows_for_tablet.end()) { Rows rows; - rows.partition_id = partition->id; - rows.index_id = index.index_id; - rows.row_idxes.reserve(row_cnt); + rows.partition_id = partition_ids[i]; + rows.index_id = _schema->indexes()[index_idx]->index_id; + rows.row_idxes.reserve(row_ids.size()); auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows}); it = tmp_it; } - it->second.row_idxes.push_back(row_idx); + it->second.row_idxes.push_back(row_ids[i]); _number_output_rows++; } } @@ -284,37 +279,19 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc DorisMetrics::instance()->load_rows->increment(input_rows); DorisMetrics::instance()->load_bytes->increment(input_bytes); - std::shared_ptr block; bool has_filtered_rows = false; - RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( - state, input_block, block, _output_vexpr_ctxs, input_rows, has_filtered_rows)); - - // clear and release the references of columns - input_block->clear(); + int64_t filtered_rows = 0; SCOPED_RAW_TIMER(&_send_data_ns); // This is just for passing compilation. - bool stop_processing = false; - RowsForTablet rows_for_tablet; - _tablet_finder->clear_for_new_batch(); _row_distribution_watch.start(); - const auto num_rows = input_rows; - const auto* __restrict filter_map = _block_convertor->filter_map(); - //reuse vars - _partitions.assign(num_rows, nullptr); - _skip.assign(num_rows, false); - _tablet_indexes.assign(num_rows, 0); - - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, _partitions, - _tablet_indexes, stop_processing, _skip)); - - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - _skip[i] = _skip[i] || filter_map[i]; - } - } - _generate_rows_for_tablet(rows_for_tablet, _partitions, _tablet_indexes, _skip, num_rows); + std::shared_ptr block; + std::vector row_part_tablet_tuples; + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( + *input_block, block, filtered_rows, has_filtered_rows, row_part_tablet_tuples)); + RowsForTablet rows_for_tablet; + _generate_rows_for_tablet(row_part_tablet_tuples, rows_for_tablet); _row_distribution_watch.stop(); diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index 42103fa03b1079f..ec6ecf81fdd2a88 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -64,6 +64,7 @@ #include "vec/core/block.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" +#include "vec/sink/vrow_distribution.h" namespace doris { class DeltaWriterV2; @@ -137,10 +138,8 @@ class VOlapTableSinkV2 final : public DataSink { void _build_tablet_node_mapping(); - void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet, - const std::vector& partitions, - const std::vector& tablet_indexes, - const std::vector& skip, size_t row_cnt); + void _generate_rows_for_tablet(std::vector& row_part_tablet_tuples, + RowsForTablet& rows_for_tablet); Status _write_memtable(std::shared_ptr block, int64_t tablet_id, const Rows& rows, const Streams& streams); @@ -186,11 +185,6 @@ class VOlapTableSinkV2 final : public DataSink { int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; - // reuse for find_tablet - std::vector _partitions; - std::vector _skip; - std::vector _tablet_indexes; - MonotonicStopWatch _row_distribution_watch; RuntimeProfile::Counter* _input_rows_counter = nullptr; @@ -230,6 +224,8 @@ class VOlapTableSinkV2 final : public DataSink { bthread::Mutex _tablet_success_map_mutex; bthread::Mutex _tablet_failure_map_mutex; + VRowDistribution _row_distribution; + friend class StreamSinkHandler; }; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index ab18be8e8bdd717..9868686ba2294b8 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -489,52 +489,6 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, _cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); } - std::unique_ptr temp_payload = nullptr; - if (_index_channel != nullptr && _index_channel->get_where_clause() != nullptr) { - SCOPED_RAW_TIMER(&_stat.where_clause_ns); - temp_payload.reset(new Payload( - std::unique_ptr(new vectorized::IColumn::Selector()), - std::vector())); - int result_index = -1; - size_t column_number = block->columns(); - RETURN_IF_ERROR(_index_channel->get_where_clause()->execute(block, &result_index)); - - auto& row_ids = *payload->first; - auto& tablets_ids = payload->second; - - auto filter_column = block->get_by_position(result_index).column; - - if (auto* nullable_column = - vectorized::check_and_get_column(*filter_column)) { - for (size_t i = 0; i < payload->second.size(); i++) { - if (nullable_column->get_bool_inline(row_ids[i])) { - temp_payload->first->emplace_back(row_ids[i]); - temp_payload->second.emplace_back(tablets_ids[i]); - } - } - payload = temp_payload.get(); - } else if (auto* const_column = vectorized::check_and_get_column( - *filter_column)) { - bool ret = const_column->get_bool(0); - if (!ret) { - return Status::OK(); - } - } else { - auto& filter = assert_cast(*filter_column).get_data(); - for (size_t i = 0; i < payload->second.size(); i++) { - if (filter[row_ids[i]] != 0) { - temp_payload->first->emplace_back(row_ids[i]); - temp_payload->second.emplace_back(tablets_ids[i]); - } - } - payload = temp_payload.get(); - } - - for (size_t i = block->columns() - 1; i >= column_number; i--) { - block->erase(i); - } - } - SCOPED_RAW_TIMER(&_stat.append_node_channel_ns); if (is_append) { // Do not split the data of the block by tablets but append it to a single delta writer. @@ -1128,7 +1082,8 @@ void VTabletWriter::_init_row_distribution() { ctx.location = _location; ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs; ctx.on_partitions_created = &vectorized::on_partitions_created; - ctx.caller = (void *)this; + ctx.caller = (void*)this; + ctx.schema = _schema; _row_distribution.init(&ctx); } @@ -1588,6 +1543,46 @@ Status VTabletWriter::close(Status exec_status) { return _close_status; } +void VTabletWriter::_generate_one_index_channel_payload( + RowPartTabletTuple& row_part_tablet_tuple, int32_t index_idx, + ChannelDistributionPayload& channel_payload) { + auto& row_ids = std::get<0>(row_part_tablet_tuple); + auto& tablet_ids = std::get<2>(row_part_tablet_tuple); + + size_t row_cnt = row_ids.size(); + + for (int i = 0; i < row_ids.size(); i++) { + // (tablet_id, VNodeChannel) where this tablet locate + auto it = _channels[index_idx]->_channels_by_tablet.find(tablet_ids[i]); + DCHECK(it != _channels[index_idx]->_channels_by_tablet.end()) + << "unknown tablet, tablet_id=" << tablet_ids[i]; + + std::vector>& tablet_locations = it->second; + for (const auto& locate_node : tablet_locations) { + auto payload_it = channel_payload.find(locate_node.get()); // + if (payload_it == channel_payload.end()) { + auto [tmp_it, _] = channel_payload.emplace( + locate_node.get(), + Payload {std::make_unique(), + std::vector()}); + payload_it = tmp_it; + payload_it->second.first->reserve(row_cnt); + payload_it->second.second.reserve(row_cnt); + } + payload_it->second.first->push_back(row_ids[i]); + payload_it->second.second.push_back(tablet_ids[i]); + } + } +} + +void VTabletWriter::_generate_index_channels_payloads( + std::vector& row_part_tablet_tuples, + ChannelDistributionPayloadVec& payload) { + for (int i = 0; i < _schema->indexes().size(); i++) { + _generate_one_index_channel_payload(row_part_tablet_tuples[i], i, payload[i]); + } +} + Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); @@ -1603,12 +1598,16 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { } SCOPED_TIMER(_profile->total_time_counter()); - ChannelDistributionPayload channel_to_payload; std::shared_ptr block; bool has_filtered_rows = false; int64_t filtered_rows = 0; - RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(input_block, block, filtered_rows, - has_filtered_rows, channel_to_payload)); + + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( + input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_tuples)); + + ChannelDistributionPayloadVec channel_to_payload; + + _generate_index_channels_payloads(_row_part_tablet_tuples, channel_to_payload); _number_input_rows += rows; // update incrementally so that FE can get the progress. @@ -1640,10 +1639,8 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { } if (_group_commit) { - _group_commit_block(&input_block, block->rows(), - _block_convertor->num_filtered_rows() + - _tablet_finder->num_filtered_rows() - filtered_rows, - _state, block.get(), _block_convertor.get(), _tablet_finder.get()); + _group_commit_block(&input_block, block->rows(), filtered_rows, _state, block.get(), + _block_convertor.get(), _tablet_finder.get()); } // TODO: Before load, we need to projection unuseful column // auto slots = _schema->tuple_desc()->slots(); diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index fd421db6f1a57b9..8302b2bab0ad10c 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -205,9 +205,6 @@ class ReusableClosure final : public google::protobuf::Closure { class IndexChannel; class VTabletWriter; -// pair -using Payload = std::pair, std::vector>; - class VNodeChannelStat { public: VNodeChannelStat& operator+=(const VNodeChannelStat& stat) { @@ -222,6 +219,9 @@ class VNodeChannelStat { int64_t append_node_channel_ns = 0; }; +// pair +using Payload = std::pair, std::vector>; + // every NodeChannel keeps a data transmission channel with one BE. for multiple times open, it has a dozen of requests and corresponding closures. class VNodeChannel { public: @@ -554,25 +554,19 @@ class VTabletWriter final : public AsyncResultWriter { friend class VNodeChannel; friend class IndexChannel; - using ChannelDistributionPayload = std::vector>; + using ChannelDistributionPayload = std::unordered_map; + using ChannelDistributionPayloadVec = std::vector>; void _init_row_distribution(); Status _init(RuntimeState* state, RuntimeProfile* profile); - // payload for every row - void _generate_row_distribution_payload(ChannelDistributionPayload& channel_to_payload, - const std::vector& partitions, - const std::vector& tablet_indexes, - const std::vector& skip, size_t row_cnt); + void _generate_one_index_channel_payload(RowPartTabletTuple& row_part_tablet_tuple, + int32_t index_idx, + ChannelDistributionPayload& channel_payload); - Status _generate_rows_distribution(vectorized::Block& input_block, - std::shared_ptr& block, - int64_t& filterd_rows, bool& has_filtered_rows, - ChannelDistributionPayload& channel_to_payload); - Status _single_partition_generate(RuntimeState* state, vectorized::Block* block, - ChannelDistributionPayload& channel_to_payload, - size_t num_rows, bool has_filtered_rows); + void _generate_index_channels_payloads(std::vector& row_part_tablet_tuples, + ChannelDistributionPayloadVec& payload); Status _cancel_channel_and_check_intolerable_failure(Status status, const std::string& err_msg, const std::shared_ptr ich, @@ -701,6 +695,8 @@ class VTabletWriter final : public AsyncResultWriter { std::shared_ptr _wal_writer = nullptr; VRowDistribution _row_distribution; + // reuse to avoid frequent memory allocation and release. + std::vector _row_part_tablet_tuples; int64_t _tb_id; int64_t _db_id;