diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index f05beedda28b1da..53784f0bd9bd037 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -85,7 +85,7 @@ void VRowDistribution::_generate_rows_distribution_payload( ChannelDistributionPayload& channel_to_payload, const std::vector& partitions, const std::vector& tablet_indexes, const std::vector& skip, - size_t rows_cnt) { + size_t row_cnt) { for (int row_idx = 0; row_idx < row_cnt; row_idx++) { if (skip[row_idx]) { continue; @@ -130,7 +130,8 @@ void VRowDistribution::_generate_rows_distribution_payload( Status VRowDistribution::_single_partition_generate(vectorized::Block* block, ChannelDistributionPayload& channel_to_payload, - size_t num_rows, bool has_filtered_rows) { + bool has_filtered_rows) { + auto num_rows = block->rows(); // only need to calculate one value for single partition. std::vector partitions {1, nullptr}; std::vector skip; @@ -176,6 +177,100 @@ Status VRowDistribution::_single_partition_generate(vectorized::Block* block, return Status::OK(); } +Status VRowDistribution::_generate_rows_distribution_for_non_auto_parititon( + std::shared_ptr& block, bool has_filtered_rows, + ChannelDistributionPayload& channel_to_payload) { + auto num_rows = block->rows(); + std::vector partitions {num_rows, nullptr}; + std::vector skip; + skip.resize(num_rows); + std::vector tablet_indexes; + tablet_indexes.resize(num_rows); + + bool stop_processing = false; + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, + tablet_indexes, stop_processing, skip)); + + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + skip[i] = skip[i] || _block_convertor->filter_map()[i]; + } + } + _generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes, skip, + num_rows); + return Status::OK(); +} + +Status VRowDistribution::_generate_rows_distribution_for_auto_parititon( + std::shared_ptr& block, bool has_filtered_rows, + ChannelDistributionPayload& channel_to_payload) { + auto num_rows = block->rows(); + std::vector partition_keys = _vpartition->get_partition_keys(); + //TODO: use loop to create missing_vals for multi column. + CHECK(partition_keys.size() == 1) + << "now support only 1 partition column for auto partitions."; + auto partition_col = block->get_by_position(partition_keys[0]); + + std::vector missing_map; // indice of missing values in partition_col + missing_map.reserve(partition_col.column->size()); + + // try to find tablet and save missing value + std::vector partitions {num_rows, nullptr}; + std::vector skip; + skip.resize(num_rows); + std::vector tablet_indexes; + tablet_indexes.resize(num_rows); + + bool stop_processing = false; + //TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time. + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, + tablet_indexes, stop_processing, skip, + &missing_map)); + + if (missing_map.empty()) { + // we don't calculate it distribution when have missing values + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + skip[i] = skip[i] || _block_convertor->filter_map()[i]; + } + } + _generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes, + skip, num_rows); + } else { // for missing partition keys, calc the missing partition and save in _partitions_need_create + // if there's projection of partition calc, we need to calc it first. + auto [part_ctx, part_func] = _get_partition_function(); + int result_idx = -1; + if (_vpartition->is_projection_partition()) { + // calc the start value of missing partition ranges. + RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &result_idx)); + VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(); + // change the column to compare to transformed. + _vpartition->set_transformed_slots({(uint16_t)result_idx}); + } + + auto return_type = part_func->data_type(); + + // expose the data column + vectorized::ColumnPtr range_left_col = block->get_by_position(result_idx).column; + if (const auto* nullable = + check_and_get_column(*range_left_col)) { + range_left_col = nullable->get_nested_column_ptr(); + return_type = + assert_cast(return_type.get()) + ->get_nested_type(); + } + // calc the end value and save them. + _save_missing_values(range_left_col, return_type, missing_map); + // then call FE to create it. then FragmentExecutor will redo the load. + RETURN_IF_ERROR(_automatic_create_partition()); + // In the next round, we will _generate_rows_distribution_payload again to get right payload of new tablet + LOG(INFO) << "Auto created partition. Send block again."; + return Status::NeedSendAgain(""); + } // creating done + + return Status::OK(); +} + Status VRowDistribution::generate_rows_distribution(vectorized::Block& input_block, std::shared_ptr& block, int64_t& filtered_rows, bool& has_filtered_rows, @@ -187,8 +282,6 @@ Status VRowDistribution::generate_rows_distribution(vectorized::Block& input_blo RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( _state, &input_block, block, *_vec_output_expr_ctxs, rows, has_filtered_rows)); - // This is just for passing compilation. - bool stop_processing = false; channel_to_payload.resize(_channels->size()); _tablet_finder->clear_for_new_batch(); _row_distribution_watch.start(); @@ -198,87 +291,14 @@ Status VRowDistribution::generate_rows_distribution(vectorized::Block& input_blo if (!_vpartition->is_auto_partition() && partition_num == 1 && _tablet_finder->is_find_tablet_every_sink()) { RETURN_IF_ERROR(_single_partition_generate(block.get(), channel_to_payload, - num_rows, has_filtered_rows)); + has_filtered_rows)); } else { - // if there's projection of partition calc, we need to calc it first. - auto [part_ctx, part_func] = _get_partition_function(); - int result_idx = -1; - if (_vpartition->is_projection_partition()) { - // calc the start value of missing partition ranges. - RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &result_idx)); - VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(); - // change the column to compare to transformed. - _vpartition->set_transformed_slots({(uint16_t)result_idx}); - } - if (_vpartition->is_auto_partition()) { - std::vector partition_keys = _vpartition->get_partition_keys(); - //TODO: use loop to create missing_vals for multi column. - CHECK(partition_keys.size() == 1) - << "now support only 1 partition column for auto partitions."; - auto partition_col = block->get_by_position(partition_keys[0]); - - std::vector missing_map; // indice of missing values in partition_col - missing_map.reserve(partition_col.column->size()); - - // try to find tablet and save missing value - std::vector partitions {num_rows, nullptr}; - std::vector skip; - skip.resize(num_rows); - std::vector tablet_indexes; - tablet_indexes.resize(num_rows); - - //TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time. - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, - tablet_indexes, stop_processing, skip, - &missing_map)); - - if (missing_map.empty()) { - // we don't calculate it distribution when have missing values - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - skip[i] = skip[i] || _block_convertor->filter_map()[i]; - } - } - _generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes, - skip, num_rows); - } else { // for missing partition keys, calc the missing partition and save in _partitions_need_create - auto return_type = part_func->data_type(); - - // expose the data column - vectorized::ColumnPtr range_left_col = block->get_by_position(result_idx).column; - if (const auto* nullable = - check_and_get_column(*range_left_col)) { - range_left_col = nullable->get_nested_column_ptr(); - return_type = - assert_cast(return_type.get()) - ->get_nested_type(); - } - // calc the end value and save them. - _save_missing_values(range_left_col, return_type, missing_map); - // then call FE to create it. then FragmentExecutor will redo the load. - RETURN_IF_ERROR(_automatic_create_partition()); - // In the next round, we will _generate_rows_distribution_payload again to get right payload of new tablet - LOG(INFO) << "Auto created partition. Send block again."; - return Status::NeedSendAgain(""); - } // creating done + RETURN_IF_ERROR(_generate_rows_distribution_for_auto_parititon(block, has_filtered_rows, + channel_to_payload)); } else { // not auto partition - std::vector partitions {num_rows, nullptr}; - std::vector skip; - skip.resize(num_rows); - std::vector tablet_indexes; - tablet_indexes.resize(num_rows); - - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, - tablet_indexes, stop_processing, skip)); - - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - skip[i] = skip[i] || _block_convertor->filter_map()[i]; - } - } - _generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes, skip, - num_rows); + RETURN_IF_ERROR(_generate_rows_distribution_for_non_auto_parititon(block, has_filtered_rows, + channel_to_payload)); } } _row_distribution_watch.stop(); diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 5bb63e646007176..69a2d3908a8eb57 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -99,7 +99,15 @@ class VRowDistribution { Status _single_partition_generate(vectorized::Block* block, ChannelDistributionPayload& channel_to_payload, - size_t num_rows, bool has_filtered_rows); + bool has_filtered_rows); + + Status _generate_rows_distribution_for_auto_parititon( + std::shared_ptr& block, bool has_filtered_rows, + ChannelDistributionPayload& channel_to_payload); + + Status _generate_rows_distribution_for_non_auto_parititon( + std::shared_ptr& block, bool has_filtered_rows, + ChannelDistributionPayload& channel_to_payload); void _generate_rows_distribution_payload( ChannelDistributionPayload& channel_to_payload,