Skip to content

Commit

Permalink
[Improve](topn opt) avoid crash when rpc returned row contains duplic…
Browse files Browse the repository at this point in the history
…ated row entry (apache#29872)

1. Add more info to trace potential bug and avoid crash
2. use correct permutation size to do `column->permute`
  • Loading branch information
eldenmoon authored Jan 15, 2024
1 parent d1f3d41 commit 0ffd6cf
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "bthread/countdown_event.h"
#include "common/config.h"
#include "common/consts.h"
#include "common/exception.h"
#include "exec/tablet_info.h" // DorisNodesInfo
#include "olap/olap_common.h"
#include "olap/tablet_schema.h"
Expand Down Expand Up @@ -230,7 +231,10 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr& column_row_ids,
std::vector<PRowLocation> rows_locs;
rows_locs.reserve(rows_locs.size());
RETURN_IF_ERROR(_merge_rpc_results(mget_req, resps, cntls, res_block, &rows_locs));

if (rows_locs.size() != res_block->rows()) {
return Status::InternalError("Miss matched return row loc count {}, expected {}, input {}",
rows_locs.size(), res_block->rows(), column_row_ids->size());
}
// Final sort by row_ids sequence, since row_ids is already sorted if need
std::map<GlobalRowLoacation, size_t> positions;
for (size_t i = 0; i < rows_locs.size(); ++i) {
Expand All @@ -240,17 +244,22 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr& column_row_ids,
rows_locs[i].ordinal_id());
positions[grl] = i;
};
// TODO remove this warning code
if (positions.size() < rows_locs.size()) {
LOG(WARNING) << "contains duplicated row entry";
}
vectorized::IColumn::Permutation permutation;
permutation.reserve(column_row_ids->size());
for (size_t i = 0; i < column_row_ids->size(); ++i) {
auto location =
reinterpret_cast<const GlobalRowLoacation*>(column_row_ids->get_data_at(i).data);
permutation.push_back(positions[*location]);
}
size_t num_rows = res_block->rows();
// Check row consistency
RETURN_IF_CATCH_EXCEPTION(res_block->check_number_of_rows());
for (size_t i = 0; i < res_block->columns(); ++i) {
res_block->get_by_position(i).column =
res_block->get_by_position(i).column->permute(permutation, num_rows);
res_block->get_by_position(i).column->permute(permutation, permutation.size());
}
// shrink for char type
std::vector<size_t> char_type_idx;
Expand Down

0 comments on commit 0ffd6cf

Please sign in to comment.