Skip to content

Commit

Permalink
[feature](sql-block-rule) sql block rule support external table (apac…
Browse files Browse the repository at this point in the history
…he#37041)

Let SQL Block rule feature support external table:
```
create sql_block_rule r1 properties("partition_num" = "100", "global" = "true");
create sql_block_rule r2 properties("tablet_num" = "100", "global" = "true");
create sql_block_rule r3 properties("cardinality" = "100", "global" = "true");
```

For external table, `tablet_num` equals to `splitNum`
  • Loading branch information
morningman authored Jul 12, 2024
1 parent fa43afc commit bd05cba
Show file tree
Hide file tree
Showing 15 changed files with 132 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public void createScanRangeLocations() throws UserException {
if (splitAssignment.getSampleSplit() == null && !(getLocationType() == TFileType.FILE_STREAM)) {
return;
}
inputSplitsNum = numApproximateSplits();
selectedSplitNum = numApproximateSplits();

TFileType locationType;
FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
Expand All @@ -336,7 +336,7 @@ public void createScanRangeLocations() throws UserException {
} else {
locationType = getLocationType(fileSplit.getPath().toString());
}
totalFileSize = fileSplit.getLength() * inputSplitsNum;
totalFileSize = fileSplit.getLength() * selectedSplitNum;
long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime();
// Not accurate, only used to estimate concurrency.
int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends();
Expand Down Expand Up @@ -364,7 +364,7 @@ public void createScanRangeLocations() throws UserException {
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
inputSplitsNum = inputSplits.size();
selectedSplitNum = inputSplits.size();
if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,8 @@ public abstract class FileScanNode extends ExternalScanNode {
public static final long DEFAULT_SPLIT_SIZE = 8 * 1024 * 1024; // 8MB

// For explain
protected long inputSplitsNum = 0;
protected long totalFileSize = 0;
protected long totalPartitionNum = 0;
protected long readPartitionNum = 0;
protected long fileSplitSize;
public long rowCount = 0;

Expand Down Expand Up @@ -119,9 +117,9 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
if (isBatchMode()) {
output.append("(approximate)");
}
output.append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=")
output.append("inputSplitNum=").append(selectedSplitNum).append(", totalFileSize=")
.append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n");
output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum)
output.append(prefix).append("partition=").append(selectedPartitionNum).append("/").append(totalPartitionNum)
.append("\n");

if (detailLevel == TExplainLevel.VERBOSE) {
Expand Down Expand Up @@ -311,8 +309,4 @@ protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
long fileLength = last.getOffset() + last.getLength() - 1L;
throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength));
}

public long getReadPartitionNum() {
return this.readPartitionNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ protected List<HivePartition> getPartitions() throws AnalysisException {
partitionItems = selectedPartitions.selectedPartitions.values();
}
Preconditions.checkNotNull(partitionItems);
this.readPartitionNum = partitionItems.size();
this.selectedPartitionNum = partitionItems.size();

// get partitions from cache
List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(partitionItems.size());
Expand All @@ -198,7 +198,7 @@ protected List<HivePartition> getPartitions() throws AnalysisException {
hmsTable.getRemoteTable().getSd().getInputFormat(),
hmsTable.getRemoteTable().getSd().getLocation(), null, Maps.newHashMap());
this.totalPartitionNum = 1;
this.readPartitionNum = 1;
this.selectedPartitionNum = 1;
resPartitions.add(dummyPartition);
}
if (ConnectContext.get().getExecutor() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private List<HivePartition> getPrunedPartitions(
partitionValues.getSingleColumnRangeMap(),
true);
Collection<Long> filteredPartitionIds = pruner.prune();
this.readPartitionNum = filteredPartitionIds.size();
this.selectedPartitionNum = filteredPartitionIds.size();
// 3. get partitions from cache
String dbName = hmsTable.getDbName();
String tblName = hmsTable.getName();
Expand All @@ -305,7 +305,7 @@ private List<HivePartition> getPrunedPartitions(
hmsTable.getRemoteTable().getSd().getInputFormat(),
hmsTable.getRemoteTable().getSd().getLocation(), null, Maps.newHashMap());
this.totalPartitionNum = 1;
this.readPartitionNum = 1;
this.selectedPartitionNum = 1;
return Lists.newArrayList(dummyPartition);
}

Expand Down Expand Up @@ -497,7 +497,7 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
return super.getNodeExplainString(prefix, detailLevel);
} else {
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum.get(), inputSplitsNum);
+ String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum.get(), selectedSplitNum);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private List<Split> doGetSplits() throws UserException {
throw new UserException(e.getMessage(), e.getCause());
}

readPartitionNum = partitionPathSet.size();
selectedPartitionNum = partitionPathSet.size();

return splits;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private List<String> getPrunedPartitionSpecs() throws AnalysisException {
partitionValues.getSingleColumnRangeMap(),
false);
Collection<Long> filteredPartitionIds = pruner.prune();
this.readPartitionNum = filteredPartitionIds.size();
this.selectedPartitionNum = filteredPartitionIds.size();
// get partitions from cache
Map<Long, String> partitionIdToNameMap = partitionValues.getPartitionIdToNameMap();
filteredPartitionIds.forEach(id -> result.add(partitionIdToNameMap.get(id)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public List<Split> getSplits() throws UserException {
}
splitStats.add(splitStat);
}
this.readPartitionNum = selectedPartitionValues.size();
this.selectedPartitionNum = selectedPartitionValues.size();
// TODO: get total partition number
return splits;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void finalize(Analyzer analyzer) throws UserException {
LoadScanProvider scanProvider = scanProviders.get(i);
finalizeParamsForLoad(context, analyzer);
createScanRangeLocations(context, scanProvider, localBackendPolicy);
this.inputSplitsNum += scanProvider.getInputSplitNum();
this.selectedSplitNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,8 @@ public class OlapScanNode extends ScanNode {
private boolean canTurnOnPreAggr = true;
private boolean forceOpenPreAgg = false;
private OlapTable olapTable = null;
private long selectedTabletsNum = 0;
private long totalTabletsNum = 0;
private long selectedIndexId = -1;
private int selectedPartitionNum = 0;
private Collection<Long> selectedPartitionIds = Lists.newArrayList();
private long totalBytes = 0;
// tablet id to single replica bytes
Expand Down Expand Up @@ -295,14 +293,6 @@ public void setForceOpenPreAgg(boolean forceOpenPreAgg) {
this.forceOpenPreAgg = forceOpenPreAgg;
}

public Integer getSelectedPartitionNum() {
return selectedPartitionNum;
}

public Long getSelectedTabletsNum() {
return selectedTabletsNum;
}

public SortInfo getSortInfo() {
return sortInfo;
}
Expand Down Expand Up @@ -1218,7 +1208,7 @@ private void computeTabletInfo() throws UserException {
}

totalTabletsNum += selectedTable.getTablets().size();
selectedTabletsNum += tablets.size();
selectedSplitNum += tablets.size();
addScanRangeLocations(partition, tablets);
}
}
Expand Down Expand Up @@ -1380,7 +1370,7 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
.collect(Collectors.joining(","));
output.append(prefix).append(String.format("partitions=%s/%s (%s)", selectedPartitionNum,
olapTable.getPartitions().size(), selectedPartitions)).append("\n");
output.append(prefix).append(String.format("tablets=%s/%s", selectedTabletsNum, totalTabletsNum));
output.append(prefix).append(String.format("tablets=%s/%s", selectedSplitNum, totalTabletsNum));
// We print up to 3 tablet, and we print "..." if the number is more than 3
if (scanTabletIds.size() > 3) {
List<Long> firstTenTabletIds = scanTabletIds.subList(0, 3);
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator {
protected PartitionInfo partitionsInfo = null;
protected SplitAssignment splitAssignment = null;

protected long selectedPartitionNum = 0;
protected long selectedSplitNum = 0;

// create a mapping between output slot's id and project expr
Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();

Expand Down Expand Up @@ -833,4 +836,11 @@ public boolean useTopnFilter() {
return !topnFilterSortNodes.isEmpty();
}

public long getSelectedPartitionNum() {
return selectedPartitionNum;
}

public long getSelectedSplitNum() {
return selectedSplitNum;
}
}
11 changes: 6 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileScanNode;
import org.apache.doris.datasource.jdbc.client.JdbcClientException;
import org.apache.doris.datasource.tvf.source.TVFScanNode;
import org.apache.doris.load.EtlJobType;
Expand Down Expand Up @@ -658,13 +659,13 @@ public void checkBlockRulesByScan(Planner planner) throws AnalysisException {
}
List<ScanNode> scanNodeList = planner.getScanNodes();
for (ScanNode scanNode : scanNodeList) {
if (scanNode instanceof OlapScanNode) {
OlapScanNode olapScanNode = (OlapScanNode) scanNode;
if (scanNode instanceof OlapScanNode || scanNode instanceof FileScanNode) {
Env.getCurrentEnv().getSqlBlockRuleMgr().checkLimitations(
olapScanNode.getSelectedPartitionNum().longValue(),
olapScanNode.getSelectedTabletsNum(),
olapScanNode.getCardinality(),
scanNode.getSelectedPartitionNum(),
scanNode.getSelectedSplitNum(),
scanNode.getCardinality(),
context.getQualifiedUser());

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ private CacheTable buildCacheTableForOlapScanNode(OlapScanNode node) {
private CacheTable buildCacheTableForHiveScanNode(HiveScanNode node) {
CacheTable cacheTable = new CacheTable();
cacheTable.table = node.getTargetTable();
cacheTable.partitionNum = node.getReadPartitionNum();
cacheTable.partitionNum = node.getSelectedPartitionNum();
cacheTable.latestPartitionTime = cacheTable.table.getUpdateTime();
TableIf tableIf = cacheTable.table;
DatabaseIf database = tableIf.getDatabase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void sqlAggWithColocateTable() throws Exception {
Assert.assertTrue(scanNodeList.get(0) instanceof OlapScanNode);
OlapScanNode olapScanNode = (OlapScanNode) scanNodeList.get(0);
Assert.assertEquals(olapScanNode.getSelectedPartitionIds().size(), 2);
long selectedTablet = Deencapsulation.getField(olapScanNode, "selectedTabletsNum");
long selectedTablet = Deencapsulation.getField(olapScanNode, "selectedSplitNum");
Assert.assertEquals(selectedTablet, 2);

List<QueryStatisticsItem.FragmentInstanceInfo> instanceInfo = coordinator.getFragmentInstanceInfos();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql01 --
6179 21 4 20 5.00 4600.10 0.05 0.00 A F 1994-06-05 1994-07-27 1994-06-26 COLLECT COD MAIL silent deposits. furiously us chicago
6273 2552 1 51 33.00 31384.65 0.04 0.08 R F 1995-04-23 1995-05-02 1995-05-13 DELIVER IN PERSON TRUCK ges. unusual, pending packages accordi us chicago
8645 7554 4 53 34.00 32403.70 0.03 0.03 N O 1996-12-29 1997-01-25 1997-01-16 TAKE BACK RETURN FOB ackages are carefully above the jp tokyo
5121 7580 6 79 2.00 1958.14 0.04 0.07 R F 1992-08-10 1992-06-28 1992-08-11 NONE FOB final, regular account us washington
2883 2592 1 91 33.00 32705.97 0.08 0.07 R F 1995-02-26 1995-03-04 1995-03-01 NONE RAIL s. final i cn shanghai
807 5150 7 149 19.00 19933.66 0.08 0.05 A F 1994-02-10 1994-02-20 1994-03-06 NONE SHIP ns haggle quickly across the furi cn beijing
4452 7650 2 149 47.00 49309.58 0.01 0.06 A F 1994-10-08 1994-08-09 1994-10-09 TAKE BACK RETURN TRUCK ts. slyly regular cour us washington
4102 5176 5 175 32.00 34405.44 0.08 0.01 N O 1996-05-14 1996-04-29 1996-05-29 NONE RAIL the even requests; regular pinto us washington
2117 2680 6 179 27.00 29137.59 0.09 0.08 N O 1997-06-30 1997-06-27 1997-07-11 TAKE BACK RETURN REG AIR the carefully ironic ideas cn shanghai
548 7683 3 182 21.00 22725.78 0.03 0.08 A F 1995-01-13 1994-12-18 1995-01-25 NONE AIR ideas. special accounts above the furiou cn beijing

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_external_sql_block_rule", "external_docker,hive,external_docker_hive,p0,external") {
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("diable Hive test.")
return;
}

String hivePrefix = "hive2";
String catalog_name = "test_${hivePrefix}_external_sql_block_rule";
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")

sql """drop catalog if exists ${catalog_name} """

sql """CREATE CATALOG ${catalog_name} PROPERTIES (
'type'='hms',
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
'hadoop.username' = 'hive'
);"""

sql "use ${catalog_name}.`default`";
qt_sql01 """select * from parquet_partition_table order by l_linenumber,l_orderkey limit 10;"""

sql """drop sql_block_rule if exists external_hive_partition"""
sql """create sql_block_rule external_hive_partition properties("partition_num" = "3", "global" = "false");"""
sql """drop sql_block_rule if exists external_hive_partition2"""
sql """create sql_block_rule external_hive_partition2 properties("tablet_num" = "3", "global" = "false");"""
sql """drop sql_block_rule if exists external_hive_partition3"""
sql """create sql_block_rule external_hive_partition3 properties("cardinality" = "3", "global" = "false");"""
// create 3 users
sql """drop user if exists external_block_user1"""
sql """create user external_block_user1;"""
sql """SET PROPERTY FOR 'external_block_user1' 'sql_block_rules' = 'external_hive_partition';"""
sql """grant all on *.*.* to external_block_user1;"""

sql """drop user if exists external_block_user2"""
sql """create user external_block_user2;"""
sql """SET PROPERTY FOR 'external_block_user2' 'sql_block_rules' = 'external_hive_partition2';"""
sql """grant all on *.*.* to external_block_user2;"""

sql """drop user if exists external_block_user3"""
sql """create user external_block_user3;"""
sql """SET PROPERTY FOR 'external_block_user3' 'sql_block_rules' = 'external_hive_partition3';"""
sql """grant all on *.*.* to external_block_user3;"""

// login as external_block_user1
def result1 = connect(user = 'external_block_user1', password = '', url = context.config.jdbcUrl) {
test {
sql """select * from ${catalog_name}.`default`.parquet_partition_table order by l_linenumber limit 10;"""
exception """sql hits sql block rule: external_hive_partition, reach partition_num : 3"""
}
}
// login as external_block_user2
def result2 = connect(user = 'external_block_user2', password = '', url = context.config.jdbcUrl) {
test {
sql """select * from ${catalog_name}.`default`.parquet_partition_table order by l_linenumber limit 10;"""
exception """sql hits sql block rule: external_hive_partition2, reach tablet_num : 3"""
}
}
// login as external_block_user3
def result3 = connect(user = 'external_block_user3', password = '', url = context.config.jdbcUrl) {
test {
sql """select * from ${catalog_name}.`default`.parquet_partition_table order by l_linenumber limit 10;"""
exception """sql hits sql block rule: external_hive_partition3, reach cardinality : 3"""
}
}
}

0 comments on commit bd05cba

Please sign in to comment.