Skip to content

Commit

Permalink
[refatcor](sink) split generate_row_distribution into parts
Browse files Browse the repository at this point in the history
make code much more readable.
  • Loading branch information
dataroaring committed Nov 6, 2023
1 parent 933fec6 commit 58b8687
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 83 deletions.
184 changes: 102 additions & 82 deletions be/src/vec/sink/vrow_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void VRowDistribution::_generate_rows_distribution_payload(
ChannelDistributionPayload& channel_to_payload,
const std::vector<VOlapTablePartition*>& partitions,
const std::vector<uint32_t>& tablet_indexes, const std::vector<bool>& skip,
size_t rows_cnt) {
size_t row_cnt) {
for (int row_idx = 0; row_idx < row_cnt; row_idx++) {
if (skip[row_idx]) {
continue;
Expand Down Expand Up @@ -130,7 +130,8 @@ void VRowDistribution::_generate_rows_distribution_payload(

Status VRowDistribution::_single_partition_generate(vectorized::Block* block,
ChannelDistributionPayload& channel_to_payload,
size_t num_rows, bool has_filtered_rows) {
bool has_filtered_rows) {
auto num_rows = block->rows();
// only need to calculate one value for single partition.
std::vector<VOlapTablePartition*> partitions {1, nullptr};
std::vector<bool> skip;
Expand Down Expand Up @@ -176,6 +177,100 @@ Status VRowDistribution::_single_partition_generate(vectorized::Block* block,
return Status::OK();
}

Status VRowDistribution::_generate_rows_distribution_for_non_auto_parititon(
std::shared_ptr<vectorized::Block>& block, bool has_filtered_rows,
ChannelDistributionPayload& channel_to_payload) {
auto num_rows = block->rows();
std::vector<VOlapTablePartition*> partitions {num_rows, nullptr};
std::vector<bool> skip;
skip.resize(num_rows);
std::vector<uint32_t> tablet_indexes;
tablet_indexes.resize(num_rows);

bool stop_processing = false;
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions,
tablet_indexes, stop_processing, skip));

if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
skip[i] = skip[i] || _block_convertor->filter_map()[i];
}
}
_generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes, skip,
num_rows);
return Status::OK();
}

Status VRowDistribution::_generate_rows_distribution_for_auto_parititon(
std::shared_ptr<vectorized::Block>& block, bool has_filtered_rows,
ChannelDistributionPayload& channel_to_payload) {
auto num_rows = block->rows();
std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();
//TODO: use loop to create missing_vals for multi column.
CHECK(partition_keys.size() == 1)
<< "now support only 1 partition column for auto partitions.";
auto partition_col = block->get_by_position(partition_keys[0]);

std::vector<int64_t> missing_map; // indice of missing values in partition_col
missing_map.reserve(partition_col.column->size());

// try to find tablet and save missing value
std::vector<VOlapTablePartition*> partitions {num_rows, nullptr};
std::vector<bool> skip;
skip.resize(num_rows);
std::vector<uint32_t> tablet_indexes;
tablet_indexes.resize(num_rows);

bool stop_processing = false;
//TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time.
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions,
tablet_indexes, stop_processing, skip,
&missing_map));

if (missing_map.empty()) {
// we don't calculate it distribution when have missing values
if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
skip[i] = skip[i] || _block_convertor->filter_map()[i];
}
}
_generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes,
skip, num_rows);
} else { // for missing partition keys, calc the missing partition and save in _partitions_need_create
// if there's projection of partition calc, we need to calc it first.
auto [part_ctx, part_func] = _get_partition_function();
int result_idx = -1;
if (_vpartition->is_projection_partition()) {
// calc the start value of missing partition ranges.
RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &result_idx));
VLOG_DEBUG << "Partition-calculated block:" << block->dump_data();
// change the column to compare to transformed.
_vpartition->set_transformed_slots({(uint16_t)result_idx});
}

auto return_type = part_func->data_type();

// expose the data column
vectorized::ColumnPtr range_left_col = block->get_by_position(result_idx).column;
if (const auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*range_left_col)) {
range_left_col = nullable->get_nested_column_ptr();
return_type =
assert_cast<const vectorized::DataTypeNullable*>(return_type.get())
->get_nested_type();
}
// calc the end value and save them.
_save_missing_values(range_left_col, return_type, missing_map);
// then call FE to create it. then FragmentExecutor will redo the load.
RETURN_IF_ERROR(_automatic_create_partition());
// In the next round, we will _generate_rows_distribution_payload again to get right payload of new tablet
LOG(INFO) << "Auto created partition. Send block again.";
return Status::NeedSendAgain("");
} // creating done

return Status::OK();
}

Status VRowDistribution::generate_rows_distribution(vectorized::Block& input_block,
std::shared_ptr<vectorized::Block>& block,
int64_t& filtered_rows, bool& has_filtered_rows,
Expand All @@ -187,8 +282,6 @@ Status VRowDistribution::generate_rows_distribution(vectorized::Block& input_blo
RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
_state, &input_block, block, *_vec_output_expr_ctxs, rows, has_filtered_rows));

// This is just for passing compilation.
bool stop_processing = false;
channel_to_payload.resize(_channels->size());
_tablet_finder->clear_for_new_batch();
_row_distribution_watch.start();
Expand All @@ -198,87 +291,14 @@ Status VRowDistribution::generate_rows_distribution(vectorized::Block& input_blo
if (!_vpartition->is_auto_partition() && partition_num == 1 &&
_tablet_finder->is_find_tablet_every_sink()) {
RETURN_IF_ERROR(_single_partition_generate(block.get(), channel_to_payload,
num_rows, has_filtered_rows));
has_filtered_rows));
} else {
// if there's projection of partition calc, we need to calc it first.
auto [part_ctx, part_func] = _get_partition_function();
int result_idx = -1;
if (_vpartition->is_projection_partition()) {
// calc the start value of missing partition ranges.
RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &result_idx));
VLOG_DEBUG << "Partition-calculated block:" << block->dump_data();
// change the column to compare to transformed.
_vpartition->set_transformed_slots({(uint16_t)result_idx});
}

if (_vpartition->is_auto_partition()) {
std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();
//TODO: use loop to create missing_vals for multi column.
CHECK(partition_keys.size() == 1)
<< "now support only 1 partition column for auto partitions.";
auto partition_col = block->get_by_position(partition_keys[0]);

std::vector<int64_t> missing_map; // indice of missing values in partition_col
missing_map.reserve(partition_col.column->size());

// try to find tablet and save missing value
std::vector<VOlapTablePartition*> partitions {num_rows, nullptr};
std::vector<bool> skip;
skip.resize(num_rows);
std::vector<uint32_t> tablet_indexes;
tablet_indexes.resize(num_rows);

//TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time.
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions,
tablet_indexes, stop_processing, skip,
&missing_map));

if (missing_map.empty()) {
// we don't calculate it distribution when have missing values
if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
skip[i] = skip[i] || _block_convertor->filter_map()[i];
}
}
_generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes,
skip, num_rows);
} else { // for missing partition keys, calc the missing partition and save in _partitions_need_create
auto return_type = part_func->data_type();

// expose the data column
vectorized::ColumnPtr range_left_col = block->get_by_position(result_idx).column;
if (const auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*range_left_col)) {
range_left_col = nullable->get_nested_column_ptr();
return_type =
assert_cast<const vectorized::DataTypeNullable*>(return_type.get())
->get_nested_type();
}
// calc the end value and save them.
_save_missing_values(range_left_col, return_type, missing_map);
// then call FE to create it. then FragmentExecutor will redo the load.
RETURN_IF_ERROR(_automatic_create_partition());
// In the next round, we will _generate_rows_distribution_payload again to get right payload of new tablet
LOG(INFO) << "Auto created partition. Send block again.";
return Status::NeedSendAgain("");
} // creating done
RETURN_IF_ERROR(_generate_rows_distribution_for_auto_parititon(block, has_filtered_rows,
channel_to_payload));
} else { // not auto partition
std::vector<VOlapTablePartition*> partitions {num_rows, nullptr};
std::vector<bool> skip;
skip.resize(num_rows);
std::vector<uint32_t> tablet_indexes;
tablet_indexes.resize(num_rows);

RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions,
tablet_indexes, stop_processing, skip));

if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
skip[i] = skip[i] || _block_convertor->filter_map()[i];
}
}
_generate_rows_distribution_payload(channel_to_payload, partitions, tablet_indexes, skip,
num_rows);
RETURN_IF_ERROR(_generate_rows_distribution_for_non_auto_parititon(block, has_filtered_rows,
channel_to_payload));
}
}
_row_distribution_watch.stop();
Expand Down
10 changes: 9 additions & 1 deletion be/src/vec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,15 @@ class VRowDistribution {

Status _single_partition_generate(vectorized::Block* block,
ChannelDistributionPayload& channel_to_payload,
size_t num_rows, bool has_filtered_rows);
bool has_filtered_rows);

Status _generate_rows_distribution_for_auto_parititon(
std::shared_ptr<vectorized::Block>& block, bool has_filtered_rows,
ChannelDistributionPayload& channel_to_payload);

Status _generate_rows_distribution_for_non_auto_parititon(
std::shared_ptr<vectorized::Block>& block, bool has_filtered_rows,
ChannelDistributionPayload& channel_to_payload);

void _generate_rows_distribution_payload(
ChannelDistributionPayload& channel_to_payload,
Expand Down

0 comments on commit 58b8687

Please sign in to comment.