Skip to content

Commit

Permalink
[bug](scan) Fix missing sync rowsets in cloud mode (apache#31756)
Browse files Browse the repository at this point in the history
  • Loading branch information
platoneko committed Mar 4, 2024
1 parent a47d15f commit dae59ad
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 37 deletions.
14 changes: 0 additions & 14 deletions be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

#include "parallel_scanner_builder.h"

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "olap/rowset/beta_rowset.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "vec/exec/scan/new_olap_scanner.h"
Expand All @@ -44,17 +41,6 @@ Status ParallelScannerBuilder<ParentType>::_build_scanners_by_rowid(
std::list<VScannerSPtr>& scanners) {
DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);

if (config::is_cloud_mode()) {
std::vector<std::function<Status()>> tasks;
tasks.reserve(_tablets.size());
for (auto&& [tablet, version] : _tablets) {
tasks.emplace_back([tablet, version]() {
return std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
});
}
RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
}

for (auto&& [tablet, version] : _tablets) {
DCHECK(_all_rowsets.contains(tablet->tablet_id()));
auto& rowsets = _all_rowsets[tablet->tablet_id()];
Expand Down
45 changes: 25 additions & 20 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,34 +264,38 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
state()->query_options().resource_limit.__isset.cpu_limit;

std::vector<TabletWithVersion> tablets;
tablets.reserve(_scan_ranges.size());
for (auto&& scan_range : _scan_ranges) {
// TODO(plat1ko): Get cloud tablet in parallel
auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
int64_t version = 0;
std::from_chars(scan_range->version.data(),
scan_range->version.data() + scan_range->version.size(), version);
tablets.emplace_back(std::move(tablet), version);
}

if (config::is_cloud_mode()) {
std::vector<std::function<Status()>> tasks;
tasks.reserve(_scan_ranges.size());
for (auto&& [tablet, version] : tablets) {
tasks.emplace_back([tablet, version]() {
return std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
});
}
RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
}

if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
p._push_down_agg_type == TPushAggOp::NONE) {
std::vector<TabletWithVersion> tablets;
bool is_dup_mow_key = true;
for (auto&& scan_range : _scan_ranges) {
auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
for (auto&& [tablet, _] : tablets) {
is_dup_mow_key =
tablet->keys_type() == DUP_KEYS || (tablet->keys_type() == UNIQUE_KEYS &&
tablet->enable_unique_key_merge_on_write());
if (!is_dup_mow_key) {
break;
}

int64_t version = 0;
std::from_chars(scan_range->version.data(),
scan_range->version.data() + scan_range->version.size(), version);
tablets.emplace_back(TabletWithVersion {std::move(tablet), version});
}

if (config::is_cloud_mode()) {
std::vector<std::function<Status()>> tasks;
tasks.reserve(tablets.size());
for (auto&& [tablet, version] : tablets) {
tasks.emplace_back([tablet, version]() {
return std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
});
}
RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
}

if (is_dup_mow_key) {
Expand Down Expand Up @@ -351,9 +355,10 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
});
RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
scanner->set_compound_filters(_compound_filters);
scanners->push_back(scanner);
scanners->push_back(std::move(scanner));
return Status::OK();
};

for (auto& scan_range : _scan_ranges) {
auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
int64_t version = 0;
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1303,8 +1303,8 @@ Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) {
std::list<VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
// Init scanner wrapper
for (auto it = scanners.begin(); it != scanners.end(); ++it) {
_scanners.emplace_back(std::make_shared<ScannerDelegate>(*it));
for (auto& scanner : scanners) {
_scanners.emplace_back(std::make_shared<ScannerDelegate>(scanner));
}
if (scanners.empty()) {
_eos = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ testGroups = "p0"
testDirectories = "ddl_p0,database_p0,load,load_p0,query_p0,table_p0,account_p0,autobucket,bitmap_functions,bloom_filter_p0,cast_decimal_to_boolean,cast_double_to_decimal,compression_p0,connector_p0,correctness,correctness_p0,csv_header_p0,data_model_p0,database_p0,datatype_p0,delete_p0,demo_p0,empty_relation,export_p0,external_table_p0,fault_injection_p0,flink_connector_p0,insert_overwrite_p0,insert_p0,internal_schema_p0,javaudf_p0,job_p0,json_p0,jsonb_p0,meta_action_p0,metrics_p0,mtmv_p0,mysql_fulltext,mysql_ssl_p0,mysql_tupleconvert_p0,mysqldump_p0,nereids_arith_p0,nereids_function_p0,nereids_p0,nereids_rules_p0,nereids_syntax_p0,nereids_tpcds_p0,nereids_tpch_p0,partition_p0,performance_p0,postgres,query_profile,row_store,show_p0,source_p0,sql_block_rule_p0,ssb_unique_load_zstd_p0,ssb_unique_sql_zstd_p0,statistics,table_p0,tpch_unique_sql_zstd_p0,trino_p0,types,types_p0,update,version_p0,view_p0,with_clause_p0,workload_manager_p0,schema_change_p0,variant_p0,variant_github_events_p0_new,variant_github_events_p0,unique_with_mow_p0"
//exclude groups and exclude suites is more prior than include groups and include suites.
excludeSuites = "test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel"
excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster"
excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,point_query_p0,nereids_rules_p0/mv"

0 comments on commit dae59ad

Please sign in to comment.