diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index e9797d1a40ae7c..46568ef417897c 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -17,9 +17,6 @@ #include "parallel_scanner_builder.h" -#include "cloud/cloud_meta_mgr.h" -#include "cloud/cloud_tablet.h" -#include "cloud/config.h" #include "olap/rowset/beta_rowset.h" #include "pipeline/exec/olap_scan_operator.h" #include "vec/exec/scan/new_olap_scanner.h" @@ -44,17 +41,6 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid( std::list& scanners) { DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner); - if (config::is_cloud_mode()) { - std::vector> tasks; - tasks.reserve(_tablets.size()); - for (auto&& [tablet, version] : _tablets) { - tasks.emplace_back([tablet, version]() { - return std::dynamic_pointer_cast(tablet)->sync_rowsets(version); - }); - } - RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10)); - } - for (auto&& [tablet, version] : _tablets) { DCHECK(_all_rowsets.contains(tablet->tablet_id())); auto& rowsets = _all_rowsets[tablet->tablet_id()]; diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index cc9270f08093d1..e33293c721506f 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -264,34 +264,38 @@ Status OlapScanLocalState::_init_scanners(std::list* s bool has_cpu_limit = state()->query_options().__isset.resource_limit && state()->query_options().resource_limit.__isset.cpu_limit; + std::vector tablets; + tablets.reserve(_scan_ranges.size()); + for (auto&& scan_range : _scan_ranges) { + // TODO(plat1ko): Get cloud tablet in parallel + auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); + int64_t version = 0; + std::from_chars(scan_range->version.data(), + scan_range->version.data() + scan_range->version.size(), version); + tablets.emplace_back(std::move(tablet), version); + } + + if (config::is_cloud_mode()) { + std::vector> tasks; + tasks.reserve(_scan_ranges.size()); + for (auto&& [tablet, version] : tablets) { + tasks.emplace_back([tablet, version]() { + return std::dynamic_pointer_cast(tablet)->sync_rowsets(version); + }); + } + RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10)); + } + if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit && p._push_down_agg_type == TPushAggOp::NONE) { - std::vector tablets; bool is_dup_mow_key = true; - for (auto&& scan_range : _scan_ranges) { - auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); + for (auto&& [tablet, _] : tablets) { is_dup_mow_key = tablet->keys_type() == DUP_KEYS || (tablet->keys_type() == UNIQUE_KEYS && tablet->enable_unique_key_merge_on_write()); if (!is_dup_mow_key) { break; } - - int64_t version = 0; - std::from_chars(scan_range->version.data(), - scan_range->version.data() + scan_range->version.size(), version); - tablets.emplace_back(TabletWithVersion {std::move(tablet), version}); - } - - if (config::is_cloud_mode()) { - std::vector> tasks; - tasks.reserve(tablets.size()); - for (auto&& [tablet, version] : tablets) { - tasks.emplace_back([tablet, version]() { - return std::dynamic_pointer_cast(tablet)->sync_rowsets(version); - }); - } - RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10)); } if (is_dup_mow_key) { @@ -351,9 +355,10 @@ Status OlapScanLocalState::_init_scanners(std::list* s }); RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts)); scanner->set_compound_filters(_compound_filters); - scanners->push_back(scanner); + scanners->push_back(std::move(scanner)); return Status::OK(); }; + for (auto& scan_range : _scan_ranges) { auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); int64_t version = 0; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index d08fff4ba9b7fa..cb9240014fdb9c 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -1303,8 +1303,8 @@ Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) { std::list scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); // Init scanner wrapper - for (auto it = scanners.begin(); it != scanners.end(); ++it) { - _scanners.emplace_back(std::make_shared(*it)); + for (auto& scanner : scanners) { + _scanners.emplace_back(std::make_shared(scanner)); } if (scanners.empty()) { _eos = true; diff --git a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy index ba7214cc43ccbb..8e8e4d1378449c 100644 --- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy +++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy @@ -19,4 +19,4 @@ testGroups = "p0" testDirectories = "ddl_p0,database_p0,load,load_p0,query_p0,table_p0,account_p0,autobucket,bitmap_functions,bloom_filter_p0,cast_decimal_to_boolean,cast_double_to_decimal,compression_p0,connector_p0,correctness,correctness_p0,csv_header_p0,data_model_p0,database_p0,datatype_p0,delete_p0,demo_p0,empty_relation,export_p0,external_table_p0,fault_injection_p0,flink_connector_p0,insert_overwrite_p0,insert_p0,internal_schema_p0,javaudf_p0,job_p0,json_p0,jsonb_p0,meta_action_p0,metrics_p0,mtmv_p0,mysql_fulltext,mysql_ssl_p0,mysql_tupleconvert_p0,mysqldump_p0,nereids_arith_p0,nereids_function_p0,nereids_p0,nereids_rules_p0,nereids_syntax_p0,nereids_tpcds_p0,nereids_tpch_p0,partition_p0,performance_p0,postgres,query_profile,row_store,show_p0,source_p0,sql_block_rule_p0,ssb_unique_load_zstd_p0,ssb_unique_sql_zstd_p0,statistics,table_p0,tpch_unique_sql_zstd_p0,trino_p0,types,types_p0,update,version_p0,view_p0,with_clause_p0,workload_manager_p0,schema_change_p0,variant_p0,variant_github_events_p0_new,variant_github_events_p0,unique_with_mow_p0" //exclude groups and exclude suites is more prior than include groups and include suites. excludeSuites = "test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel" -excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster" +excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,point_query_p0,nereids_rules_p0/mv"