From 933fec6f621ff61e1091ba4a758b8938cf4b083d Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Sun, 5 Nov 2023 17:09:03 +0800 Subject: [PATCH] abstract row distribution code from sink --- be/src/vec/sink/vrow_distribution.cpp | 290 ++++++++++++++++++++ be/src/vec/sink/vrow_distribution.h | 132 ++++++++++ be/src/vec/sink/writer/vtablet_writer.cpp | 307 +++------------------- be/src/vec/sink/writer/vtablet_writer.h | 13 +- 4 files changed, 475 insertions(+), 267 deletions(-) create mode 100644 be/src/vec/sink/vrow_distribution.cpp create mode 100644 be/src/vec/sink/vrow_distribution.h diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp new file mode 100644 index 000000000000000..f05beedda28b1da --- /dev/null +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include +#include +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/thrift_rpc_helper.h" +#include "vec/sink/vrow_distribution.h" +#include "vec/sink/writer/vtablet_writer.h" + +namespace doris::vectorized { + +std::pair +VRowDistribution::_get_partition_function() { + return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; +} + +void VRowDistribution::_save_missing_values(vectorized::ColumnPtr col, + vectorized::DataTypePtr value_type, + std::vector filter) { + _partitions_need_create.clear(); + std::set deduper; + // de-duplication + for (auto row : filter) { + deduper.emplace(value_type->to_string(*col, row)); + } + for (auto& value : deduper) { + TStringLiteral node; + node.value = value; + _partitions_need_create.emplace_back(std::vector {node}); // only 1 partition column now + } +} + +Status VRowDistribution::_automatic_create_partition() { + SCOPED_TIMER(_add_partition_request_timer); + TCreatePartitionRequest request; + TCreatePartitionResult result; + request.__set_txn_id(_txn_id); + request.__set_db_id(_vpartition->db_id()); + request.__set_table_id(_vpartition->table_id()); + request.__set_partitionValues(_partitions_need_create); + + VLOG(1) << "automatic partition rpc begin request " << request; + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + int time_out = _state->execution_timeout() * 1000; + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->createPartition(result, request); + }, + time_out)); + + Status status(Status::create(result.status)); + VLOG(1) << "automatic partition rpc end response " << result; + if (result.status.status_code == TStatusCode::OK) { + // add new created partitions + RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); + RETURN_IF_ERROR(_on_partitions_created(_caller, &result)); + } + + return status; +} + +// Generate channel payload for sinking data to differenct node channel +// Payload = std::pair, std::vector>; +// first = row_id, second = vector +void VRowDistribution::_generate_rows_distribution_payload( + ChannelDistributionPayload& channel_to_payload, + const std::vector& partitions, + const std::vector& tablet_indexes, const std::vector& skip, + size_t rows_cnt) { + for (int row_idx = 0; row_idx < row_cnt; row_idx++) { + if (skip[row_idx]) { + continue; + } + const auto& partition = partitions[row_idx]; + const auto& tablet_index = tablet_indexes[row_idx]; + + for (int index_num = 0; index_num < partition->indexes.size(); + ++index_num) { // partition->indexes = [index, tablets...] + + auto tablet_id = partition->indexes[index_num].tablets[tablet_index]; + LOG(WARNING) << "got tablet it " << tablet_id << " at row " << row_idx; + auto it = (*_channels)[index_num]->_channels_by_tablet.find( + tablet_id); // (tablet_id, VNodeChannel) where this tablet locate + + DCHECK(it != (*_channels)[index_num]->_channels_by_tablet.end()) + << "unknown tablet, tablet_id=" << tablet_index; + + std::vector>& tablet_locations = it->second; + std::unordered_map& payloads_this_index = + channel_to_payload[index_num]; // payloads of this index in every node + + for (const auto& locate_node : tablet_locations) { + auto payload_it = + payloads_this_index.find(locate_node.get()); // + if (payload_it == payloads_this_index.end()) { + auto [tmp_it, _] = payloads_this_index.emplace( + locate_node.get(), + Payload {std::make_unique(), + std::vector()}); + payload_it = tmp_it; + payload_it->second.first->reserve(row_cnt); + payload_it->second.second.reserve(row_cnt); + } + payload_it->second.first->push_back(row_idx); + payload_it->second.second.push_back(tablet_id); + } + _number_output_rows++; + } + } +} + +Status VRowDistribution::_single_partition_generate(vectorized::Block* block, + ChannelDistributionPayload& channel_to_payload, + size_t num_rows, bool has_filtered_rows) { + // only need to calculate one value for single partition. + std::vector partitions {1, nullptr}; + std::vector skip; + skip.resize(1); + std::vector tablet_indexes; + tablet_indexes.resize(1); + bool stop_processing = false; + + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, 1, partitions, tablet_indexes, + stop_processing, skip)); + + const auto* partition = partitions[0]; + const auto& tablet_index = tablet_indexes[0]; + + if (partition == nullptr) { + return Status::OK(); + } + for (int j = 0; j < partition->indexes.size(); ++j) { + auto tid = partition->indexes[j].tablets[tablet_index]; + auto it = (*_channels)[j]->_channels_by_tablet.find(tid); + DCHECK(it != (*_channels)[j]->_channels_by_tablet.end()) + << "unknown tablet, tablet_id=" << tablet_index; + int64_t row_cnt = 0; + for (const auto& channel : it->second) { + if (!channel_to_payload[j].contains(channel.get())) { + channel_to_payload[j].insert( + {channel.get(), Payload {std::make_unique(), + std::vector()}}); + } + auto& selector = channel_to_payload[j][channel.get()].first; + auto& tablet_ids = channel_to_payload[j][channel.get()].second; + for (int32_t i = 0; i < num_rows; ++i) { + if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { + continue; + } + selector->push_back(i); + } + tablet_ids.resize(selector->size(), tid); + row_cnt = selector->size(); + } + _number_output_rows += row_cnt; + } + return Status::OK(); +} + +Status VRowDistribution::generate_rows_distribution(vectorized::Block& input_block, + std::shared_ptr& block, + int64_t& filtered_rows, bool& has_filtered_rows, + ChannelDistributionPayload& channel_to_payload) { + auto rows = input_block.rows(); + + int64_t prev_filtered_rows = + _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows(); + RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( + _state, &input_block, block, *_vec_output_expr_ctxs, rows, has_filtered_rows)); + + // This is just for passing compilation. + bool stop_processing = false; + channel_to_payload.resize(_channels->size()); + _tablet_finder->clear_for_new_batch(); + _row_distribution_watch.start(); + auto num_rows = block->rows(); + _tablet_finder->filter_bitmap().Reset(num_rows); + size_t partition_num = _vpartition->get_partitions().size(); + if (!_vpartition->is_auto_partition() && partition_num == 1 && + _tablet_finder->is_find_tablet_every_sink()) { + RETURN_IF_ERROR(_single_partition_generate(block.get(), channel_to_payload, + num_rows, has_filtered_rows)); + } else { + // if there's projection of partition calc, we need to calc it first. + auto [part_ctx, part_func] = _get_partition_function(); + int result_idx = -1; + if (_vpartition->is_projection_partition()) { + // calc the start value of missing partition ranges. + RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &result_idx)); + VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(); + // change the column to compare to transformed. + _vpartition->set_transformed_slots({(uint16_t)result_idx}); + } + + if (_vpartition->is_auto_partition()) { + std::vector partition_keys = _vpartition->get_partition_keys(); + //TODO: use loop to create missing_vals for multi column. + CHECK(partition_keys.size() == 1) + << "now support only 1 partition column for auto partitions."; + auto partition_col = block->get_by_position(partition_keys[0]); + + std::vector missing_map; // indice of missing values in partition_col + missing_map.reserve(partition_col.column->size()); + + // try to find tablet and save missing value + std::vector partitions {num_rows, nullptr}; + std::vector skip; + skip.resize(num_rows); + std::vector tablet_indexes; + tablet_indexes.resize(num_rows); + + //TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time. + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, + tablet_indexes, stop_processing, skip, + &missing_map)); + + if (missing_map.empty()) { + // we don't calculate it distribution when have missing values + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + skip[i] = skip[i] || _block_convertor->filter_map()[i]; + } + } + _generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes, + skip, num_rows); + } else { // for missing partition keys, calc the missing partition and save in _partitions_need_create + auto return_type = part_func->data_type(); + + // expose the data column + vectorized::ColumnPtr range_left_col = block->get_by_position(result_idx).column; + if (const auto* nullable = + check_and_get_column(*range_left_col)) { + range_left_col = nullable->get_nested_column_ptr(); + return_type = + assert_cast(return_type.get()) + ->get_nested_type(); + } + // calc the end value and save them. + _save_missing_values(range_left_col, return_type, missing_map); + // then call FE to create it. then FragmentExecutor will redo the load. + RETURN_IF_ERROR(_automatic_create_partition()); + // In the next round, we will _generate_rows_distribution_payload again to get right payload of new tablet + LOG(INFO) << "Auto created partition. Send block again."; + return Status::NeedSendAgain(""); + } // creating done + } else { // not auto partition + std::vector partitions {num_rows, nullptr}; + std::vector skip; + skip.resize(num_rows); + std::vector tablet_indexes; + tablet_indexes.resize(num_rows); + + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, + tablet_indexes, stop_processing, skip)); + + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + skip[i] = skip[i] || _block_convertor->filter_map()[i]; + } + } + _generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes, skip, + num_rows); + } + } + _row_distribution_watch.stop(); + filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() - prev_filtered_rows; + return Status::OK(); +} + +} // namespace doris::vectorized + diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h new file mode 100644 index 000000000000000..5bb63e646007176 --- /dev/null +++ b/be/src/vec/sink/vrow_distribution.h @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +// IWYU pragma: no_include +#include +#include +#include + +#include "common/status.h" +#include "exec/tablet_info.h" +#include "runtime/types.h" +#include "util/runtime_profile.h" +#include "util/stopwatch.hpp" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/sink/vtablet_block_convertor.h" +#include "vec/sink/vtablet_finder.h" + +namespace doris::vectorized { + +class IndexChannel; +class VNodeChannel; + +using Payload = std::pair, std::vector>; + +typedef Status (*OnPartitionsCreated)(void*, TCreatePartitionResult*); + +class VRowDistributionContext { +public: + RuntimeState* state = nullptr; // not owned, set when open + std::vector>* channels; + OlapTableBlockConvertor* block_convertor = nullptr; + OlapTabletFinder* tablet_finder = nullptr; + VOlapTablePartitionParam* vpartition = nullptr; + RuntimeProfile::Counter* add_partition_request_timer = nullptr; + int64_t txn_id = -1; + ObjectPool* pool; + OlapTableLocationParam* location; + const VExprContextSPtrs* vec_output_expr_ctxs; + OnPartitionsCreated on_partitions_created; + void* caller; +}; + +class VRowDistribution { +public: + VRowDistribution() { + } + + void init(VRowDistributionContext *ctx) { + _state = ctx->state; + _channels = ctx->channels; + _block_convertor = ctx->block_convertor; + _tablet_finder = ctx->tablet_finder; + _vpartition = ctx->vpartition; + _add_partition_request_timer = ctx->add_partition_request_timer; + _txn_id = ctx->txn_id; + _pool = ctx->pool; + _location = ctx->location; + _vec_output_expr_ctxs = ctx->vec_output_expr_ctxs; + _on_partitions_created = ctx->on_partitions_created; + _caller = ctx->caller; + } + + using ChannelDistributionPayload = std::vector>; + + // auto partition + // mv where clause + // v1 needs index->node->row_ids - tabletids + // v2 needs index,tablet->rowids + Status generate_rows_distribution(vectorized::Block& input_block, + std::shared_ptr& block, + int64_t& filtered_rows, bool& has_filtered_rows, + ChannelDistributionPayload& channel_to_payload); + +private: + std::pair _get_partition_function(); + void _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type, + std::vector filter); + + // create partitions when need for auto-partition table using #_partitions_need_create. + Status _automatic_create_partition(); + + Status _single_partition_generate(vectorized::Block* block, + ChannelDistributionPayload& channel_to_payload, + size_t num_rows, bool has_filtered_rows); + + void _generate_rows_distribution_payload( + ChannelDistributionPayload& channel_to_payload, + const std::vector& partitions, + const std::vector& tablet_indexes, const std::vector& skip, + size_t row_cnt); + +private: + RuntimeState* _state = nullptr; // not owned, set when open + + // support only one partition column now + std::vector> _partitions_need_create; + std::vector>* _channels; + + MonotonicStopWatch _row_distribution_watch; + OlapTableBlockConvertor* _block_convertor = nullptr; + OlapTabletFinder* _tablet_finder = nullptr; + VOlapTablePartitionParam* _vpartition = nullptr; + RuntimeProfile::Counter* _add_partition_request_timer = nullptr; + int64_t _txn_id = -1; + ObjectPool* _pool; + OlapTableLocationParam* _location = nullptr; + // std::function _on_partition_created; + int64_t _number_output_rows = 0; + const VExprContextSPtrs* _vec_output_expr_ctxs; + OnPartitionsCreated _on_partitions_created = nullptr; + void *_caller; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 1de1a146b99946b..ab18be8e8bdd717 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -152,6 +152,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vectorprepare(state, *_parent->_output_row_desc)); RETURN_IF_ERROR(_where_clause->open(state)); } + return Status::OK(); } @@ -1095,6 +1096,43 @@ Status VTabletWriter::open(doris::RuntimeState* state, doris::RuntimeProfile* pr return Status::OK(); } +Status VTabletWriter::on_partitions_created(TCreatePartitionResult* result) { + // add new tablet locations. it will use by address. so add to pool + auto* new_locations = _pool->add(new std::vector(result->tablets)); + _location->add_locations(*new_locations); + + // update new node info + _nodes_info->add_nodes(result->nodes); + + // incremental open node channel + RETURN_IF_ERROR(_incremental_open_node_channel(result->partitions)); + + return Status::OK(); +} + +Status on_partitions_created(void* writer, TCreatePartitionResult* result) { + return static_cast(writer)->on_partitions_created(result); +} + +void VTabletWriter::_init_row_distribution() { + VRowDistributionContext ctx; + + ctx.state = _state; + ctx.channels = &_channels; + ctx.block_convertor = _block_convertor.get(); + ctx.tablet_finder = _tablet_finder.get(); + ctx.vpartition = _vpartition; + ctx.add_partition_request_timer = _add_partition_request_timer; + ctx.txn_id = _txn_id; + ctx.pool = _pool; + ctx.location = _location; + ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs; + ctx.on_partitions_created = &vectorized::on_partitions_created; + ctx.caller = (void *)this; + + _row_distribution.init(&ctx); +} + Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { DCHECK(_t_sink.__isset.olap_table_sink); auto& table_sink = _t_sink.olap_table_sink; @@ -1243,49 +1281,12 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer)); } + _init_row_distribution(); + _inited = true; return Status::OK(); } -Status VTabletWriter::_automatic_create_partition() { - SCOPED_TIMER(_add_partition_request_timer); - TCreatePartitionRequest request; - TCreatePartitionResult result; - request.__set_txn_id(_txn_id); - request.__set_db_id(_vpartition->db_id()); - request.__set_table_id(_vpartition->table_id()); - request.__set_partitionValues(_partitions_need_create); - - VLOG(1) << "automatic partition rpc begin request " << request; - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; - int time_out = _state->execution_timeout() * 1000; - RETURN_IF_ERROR(ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &result](FrontendServiceConnection& client) { - client->createPartition(result, request); - }, - time_out)); - - Status status(Status::create(result.status)); - VLOG(1) << "automatic partition rpc end response " << result; - if (result.status.status_code == TStatusCode::OK) { - // add new created partitions - RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); - - // add new tablet locations. it will use by address. so add to pool - auto* new_locations = _pool->add(new std::vector(result.tablets)); - _location->add_locations(*new_locations); - - // update new node info - _nodes_info->add_nodes(result.nodes); - - // incremental open node channel - RETURN_IF_ERROR(_incremental_open_node_channel(result.partitions)); - } - - return status; -} - Status VTabletWriter::_incremental_open_node_channel( const std::vector& partitions) { // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions. @@ -1337,125 +1338,11 @@ Status VTabletWriter::_incremental_open_node_channel( return Status::OK(); } -// Generate channel payload for sinking data to differenct node channel -// Payload = std::pair, std::vector>; -// first = row_id, second = vector -void VTabletWriter::_generate_row_distribution_payload( - ChannelDistributionPayload& channel_to_payload, - const std::vector& partitions, - const std::vector& tablet_indexes, const std::vector& skip, - size_t row_cnt) { - for (int row_idx = 0; row_idx < row_cnt; row_idx++) { - if (skip[row_idx]) { - continue; - } - const auto& partition = partitions[row_idx]; - const auto& tablet_index = tablet_indexes[row_idx]; - - for (int index_num = 0; index_num < partition->indexes.size(); - ++index_num) { // partition->indexes = [index, tablets...] - - auto tablet_id = partition->indexes[index_num].tablets[tablet_index]; - LOG(WARNING) << "got tablet it " << tablet_id << " at row " << row_idx; - auto it = _channels[index_num]->_channels_by_tablet.find( - tablet_id); // (tablet_id, VNodeChannel) where this tablet locate - - DCHECK(it != _channels[index_num]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_index; - - std::vector>& tablet_locations = it->second; - std::unordered_map& payloads_this_index = - channel_to_payload[index_num]; // payloads of this index in every node - - for (const auto& locate_node : tablet_locations) { - auto payload_it = - payloads_this_index.find(locate_node.get()); // - if (payload_it == payloads_this_index.end()) { - auto [tmp_it, _] = payloads_this_index.emplace( - locate_node.get(), - Payload {std::make_unique(), - std::vector()}); - payload_it = tmp_it; - payload_it->second.first->reserve(row_cnt); - payload_it->second.second.reserve(row_cnt); - } - payload_it->second.first->push_back(row_idx); - payload_it->second.second.push_back(tablet_id); - } - _number_output_rows++; - } - } -} - -Status VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized::Block* block, - ChannelDistributionPayload& channel_to_payload, - size_t num_rows, bool has_filtered_rows) { - // only need to calculate one value for single partition. - std::vector partitions {1, nullptr}; - std::vector skip; - skip.resize(1); - std::vector tablet_indexes; - tablet_indexes.resize(1); - bool stop_processing = false; - - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, 1, partitions, tablet_indexes, - stop_processing, skip)); - - const auto* partition = partitions[0]; - const auto& tablet_index = tablet_indexes[0]; - - if (partition == nullptr) { - return Status::OK(); - } - for (int j = 0; j < partition->indexes.size(); ++j) { - auto tid = partition->indexes[j].tablets[tablet_index]; - auto it = _channels[j]->_channels_by_tablet.find(tid); - DCHECK(it != _channels[j]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_index; - int64_t row_cnt = 0; - for (const auto& channel : it->second) { - if (!channel_to_payload[j].contains(channel.get())) { - channel_to_payload[j].insert( - {channel.get(), Payload {std::make_unique(), - std::vector()}}); - } - auto& selector = channel_to_payload[j][channel.get()].first; - auto& tablet_ids = channel_to_payload[j][channel.get()].second; - for (int32_t i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { - continue; - } - selector->push_back(i); - } - tablet_ids.resize(selector->size(), tid); - row_cnt = selector->size(); - } - _number_output_rows += row_cnt; - } - return Status::OK(); -} - std::pair VTabletWriter::_get_partition_function() { return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; } -void VTabletWriter::_save_missing_values(vectorized::ColumnPtr col, - vectorized::DataTypePtr value_type, - std::vector filter) { - _partitions_need_create.clear(); - std::set deduper; - // de-duplication - for (auto row : filter) { - deduper.emplace(value_type->to_string(*col, row)); - } - for (auto& value : deduper) { - TStringLiteral node; - node.value = value; - _partitions_need_create.emplace_back(std::vector {node}); // only 1 partition column now - } -} - Status VTabletWriter::_cancel_channel_and_check_intolerable_failure( Status status, const std::string& err_msg, const std::shared_ptr ich, const std::shared_ptr nch) { @@ -1701,117 +1588,6 @@ Status VTabletWriter::close(Status exec_status) { return _close_status; } -Status VTabletWriter::_generate_rows_distribution(vectorized::Block& input_block, - std::shared_ptr& block, - int64_t& filtered_rows, bool& has_filtered_rows, - ChannelDistributionPayload& channel_to_payload) { - auto rows = input_block.rows(); - - int64_t prev_filtered_rows = - _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows(); - RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( - _state, &input_block, block, _vec_output_expr_ctxs, rows, has_filtered_rows)); - - SCOPED_RAW_TIMER(&_send_data_ns); - // This is just for passing compilation. - bool stop_processing = false; - channel_to_payload.resize(_channels.size()); - _tablet_finder->clear_for_new_batch(); - _row_distribution_watch.start(); - auto num_rows = block->rows(); - _tablet_finder->filter_bitmap().Reset(num_rows); - size_t partition_num = _vpartition->get_partitions().size(); - if (!_vpartition->is_auto_partition() && partition_num == 1 && - _tablet_finder->is_find_tablet_every_sink()) { - RETURN_IF_ERROR(_single_partition_generate(_state, block.get(), channel_to_payload, - num_rows, has_filtered_rows)); - } else { - // if there's projection of partition calc, we need to calc it first. - auto [part_ctx, part_func] = _get_partition_function(); - int result_idx = -1; - if (_vpartition->is_projection_partition()) { - // calc the start value of missing partition ranges. - RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &result_idx)); - VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(); - // change the column to compare to transformed. - _vpartition->set_transformed_slots({(uint16_t)result_idx}); - } - - if (_vpartition->is_auto_partition()) { - std::vector partition_keys = _vpartition->get_partition_keys(); - //TODO: use loop to create missing_vals for multi column. - CHECK(partition_keys.size() == 1) - << "now support only 1 partition column for auto partitions."; - auto partition_col = block->get_by_position(partition_keys[0]); - - std::vector missing_map; // indice of missing values in partition_col - missing_map.reserve(partition_col.column->size()); - - // try to find tablet and save missing value - std::vector partitions {num_rows, nullptr}; - std::vector skip; - skip.resize(num_rows); - std::vector tablet_indexes; - tablet_indexes.resize(num_rows); - - //TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time. - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, - tablet_indexes, stop_processing, skip, - &missing_map)); - - if (missing_map.empty()) { - // we don't calculate it distribution when have missing values - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - skip[i] = skip[i] || _block_convertor->filter_map()[i]; - } - } - _generate_row_distribution_payload(channel_to_payload, partitions, tablet_indexes, - skip, num_rows); - } else { // for missing partition keys, calc the missing partition and save in _partitions_need_create - auto return_type = part_func->data_type(); - - // expose the data column - vectorized::ColumnPtr range_left_col = block->get_by_position(result_idx).column; - if (const auto* nullable = - check_and_get_column(*range_left_col)) { - range_left_col = nullable->get_nested_column_ptr(); - return_type = - assert_cast(return_type.get()) - ->get_nested_type(); - } - // calc the end value and save them. - _save_missing_values(range_left_col, return_type, missing_map); - // then call FE to create it. then FragmentExecutor will redo the load. - RETURN_IF_ERROR(_automatic_create_partition()); - // In the next round, we will _generate_row_distribution_payload again to get right payload of new tablet - LOG(INFO) << "Auto created partition. Send block again."; - return Status::NeedSendAgain(""); - } // creating done - } else { // not auto partition - std::vector partitions {num_rows, nullptr}; - std::vector skip; - skip.resize(num_rows); - std::vector tablet_indexes; - tablet_indexes.resize(num_rows); - - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, - tablet_indexes, stop_processing, skip)); - - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - skip[i] = skip[i] || _block_convertor->filter_map()[i]; - } - } - _generate_row_distribution_payload(channel_to_payload, partitions, tablet_indexes, skip, - num_rows); - } - } - _row_distribution_watch.stop(); - filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() - prev_filtered_rows; - return Status::OK(); -} - Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); @@ -1831,7 +1607,8 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { std::shared_ptr block; bool has_filtered_rows = false; int64_t filtered_rows = 0; - RETURN_IF_ERROR(_generate_rows_distribution(input_block, block, filtered_rows, has_filtered_rows, channel_to_payload)); + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(input_block, block, filtered_rows, + has_filtered_rows, channel_to_payload)); _number_input_rows += rows; // update incrementally so that FE can get the progress. diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index d2e1d6a46ccfd40..fd421db6f1a57b9 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -74,6 +74,7 @@ #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/runtime/vfile_format_transformer.h" +#include "vec/sink/vrow_distribution.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" #include "vec/sink/writer/async_result_writer.h" @@ -485,6 +486,7 @@ class IndexChannel { private: friend class VNodeChannel; friend class VTabletWriter; + friend class VRowDistribution; VTabletWriter* _parent; int64_t _index_id; @@ -546,12 +548,16 @@ class VTabletWriter final : public AsyncResultWriter { bool is_close_done(); + Status on_partitions_created(TCreatePartitionResult* result); + private: friend class VNodeChannel; friend class IndexChannel; using ChannelDistributionPayload = std::vector>; + void _init_row_distribution(); + Status _init(RuntimeState* state, RuntimeProfile* profile); // payload for every row @@ -572,10 +578,10 @@ class VTabletWriter final : public AsyncResultWriter { const std::shared_ptr ich, const std::shared_ptr nch); - void _cancel_all_channel(Status status); - std::pair _get_partition_function(); + void _cancel_all_channel(Status status); + void _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type, std::vector filter); @@ -693,6 +699,9 @@ class VTabletWriter final : public AsyncResultWriter { RuntimeProfile* _profile = nullptr; // not owned, set when open bool _group_commit = false; std::shared_ptr _wal_writer = nullptr; + + VRowDistribution _row_distribution; + int64_t _tb_id; int64_t _db_id; int64_t _wal_id;