From 686e48f26841e418a229987a3bffb8992c9bb16e Mon Sep 17 00:00:00 2001 From: Tiewei Fang <43782773+BePPPower@users.noreply.github.com> Date: Fri, 24 May 2024 12:05:52 +0800 Subject: [PATCH] [Enhencement](tvf) select tvf supports using resource (#35139) Create an S3/HDFS resource that TVF can use it directly to access the data source. --- .../ExternalFileTableValuedFunction.java | 12 +- .../tvf/test_s3_tvf_with_resource.out | 70 +++++++ .../external_table_p0/tvf/test_s3_tvf.groovy | 4 - .../tvf/test_s3_tvf_with_resource.groovy | 178 ++++++++++++++++++ 4 files changed, 259 insertions(+), 5 deletions(-) create mode 100644 regression-test/data/external_table_p0/tvf/test_s3_tvf_with_resource.out create mode 100644 regression-test/suites/external_table_p0/tvf/test_s3_tvf_with_resource.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index d692e6c9d30009..10c57f6865567b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; @@ -174,9 +175,18 @@ protected void parseFile() throws AnalysisException { //The keys in properties map need to be lowercase. protected Map parseCommonProperties(Map properties) throws AnalysisException { + Map mergedProperties = Maps.newHashMap(); + if (properties.containsKey("resource")) { + Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(properties.get("resource")); + if (resource == null) { + throw new AnalysisException("Can not find resource: " + properties.get("resource")); + } + mergedProperties = resource.getCopiedProperties(); + } + mergedProperties.putAll(properties); // Copy the properties, because we will remove the key from properties. Map copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - copiedProps.putAll(properties); + copiedProps.putAll(mergedProperties); String formatString = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FORMAT, "").toLowerCase(); String defaultColumnSeparator = FileFormatConstants.DEFAULT_COLUMN_SEPARATOR; diff --git a/regression-test/data/external_table_p0/tvf/test_s3_tvf_with_resource.out b/regression-test/data/external_table_p0/tvf/test_s3_tvf_with_resource.out new file mode 100644 index 00000000000000..e7ed7f6af85804 --- /dev/null +++ b/regression-test/data/external_table_p0/tvf/test_s3_tvf_with_resource.out @@ -0,0 +1,70 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_base -- +1 doris1 18 +2 doris2 19 +3 doris3 99 +4 doris4 \N +5 doris5 15 + +-- !select_1 -- +1 doris1 18 +2 doris2 19 +3 doris3 99 +4 doris4 \N +5 doris5 15 + +-- !select_2 -- +1 doris1 18 +2 doris2 19 +3 doris3 99 +4 doris4 \N +5 doris5 15 + +-- !select_3 -- +1 doris1 18 +2 doris2 19 +3 doris3 99 +4 doris4 \N +5 doris5 15 + +-- !select_4 -- +12 abcdef 1.23 +12313 kkkkkk 5.77 +123456 abcdef 1.2 +126 abcdef 1.25 +13 abcdef 1.24 +156 ahef 1.26 +323456 oooodef 1.27 +5456 abadsasf 1.28 +723456 text 1.3 +823456 hive 1.32 +923456 helloworld 1.89 + +-- !select_5 -- +12 abcdef 1.23 +12313 kkkkkk 5.77 +123456 abcdef 1.2 +126 abcdef 1.25 +13 abcdef 1.24 +156 ahef 1.26 +323456 oooodef 1.27 +5456 abadsasf 1.28 +723456 text 1.3 +823456 hive 1.32 +923456 helloworld 1.89 + +-- !select_6 -- +12313 kkkkkk 5.77 +923456 helloworld 1.89 + +-- !select_7 -- +12313 kkkkkk 5.77 +123456 abcdef 1.2 +126 abcdef 1.25 +156 ahef 1.26 +323456 oooodef 1.27 +5456 abadsasf 1.28 +723456 text 1.3 +823456 hive 1.32 +923456 helloworld 1.89 + diff --git a/regression-test/suites/external_table_p0/tvf/test_s3_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_s3_tvf.groovy index 4f09680ba53b7e..24dda4196d742c 100644 --- a/regression-test/suites/external_table_p0/tvf/test_s3_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_s3_tvf.groovy @@ -124,7 +124,6 @@ suite("test_s3_tvf", "p0") { "s3.access_key"= "${ak}", "s3.secret_key" = "${sk}", "format" = "hive_text", - "use_path_style" = "true", "region" = "${region}" ) order by c1,c2,c3; """ @@ -137,7 +136,6 @@ suite("test_s3_tvf", "p0") { "s3.access_key"= "${ak}", "s3.secret_key" = "${sk}", "format" = "hive_text", - "use_path_style" = "true", "region" = "${region}", "csv_schema"="k1:int;k2:string;k3:double" ) order by k1,k2,k3; @@ -151,7 +149,6 @@ suite("test_s3_tvf", "p0") { "s3.access_key"= "${ak}", "s3.secret_key" = "${sk}", "format" = "hive_text", - "use_path_style" = "true", "region" = "${region}", "csv_schema"="k1:int;k2:string;k3:double" ) where k3 > 1.5 order by k3,k2,k1; @@ -165,7 +162,6 @@ suite("test_s3_tvf", "p0") { "s3.access_key"= "${ak}", "s3.secret_key" = "${sk}", "format" = "hive_text", - "use_path_style" = "true", "region" = "${region}", "csv_schema"="k1:int;k2:string;k3:double" ) where k1 > 100 order by k3,k2,k1; diff --git a/regression-test/suites/external_table_p0/tvf/test_s3_tvf_with_resource.groovy b/regression-test/suites/external_table_p0/tvf/test_s3_tvf_with_resource.groovy new file mode 100644 index 00000000000000..0f5f43ccb8e392 --- /dev/null +++ b/regression-test/suites/external_table_p0/tvf/test_s3_tvf_with_resource.groovy @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_s3_tvf_with_resource", "p0") { + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + + def export_table_name = "test_s3_tvf_with_resource_export_test" + def outFilePath = "${bucket}/est_s3_tvf/export_test/exp_" + def resource_name = "test_s3_tvf_resource" + + + def create_table = {table_name -> + sql """ DROP TABLE IF EXISTS ${table_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + `user_id` LARGEINT NOT NULL COMMENT "用户id", + `name` STRING COMMENT "用户名称", + `age` INT COMMENT "用户年龄" + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + } + + def create_s3_resource = { + sql """ DROP RESOURCE IF EXISTS '${resource_name}' """ + sql """ + CREATE RESOURCE "${resource_name}" + PROPERTIES + ( + "type" = "s3", + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.access_key"= "${ak}", + "s3.secret_key" = "${sk}", + "s3.bucket" = "${bucket}" + ); + """ + } + + def outfile_to_S3 = { + // select ... into outfile ... + def res = sql """ + SELECT * FROM ${export_table_name} t ORDER BY user_id + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS ORC + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + + return res[0][3] + } + + // create table to export data + create_table(export_table_name) + + // create s3 resource + create_s3_resource() + + // insert data + sql """ insert into ${export_table_name} values (1, 'doris1', 18); """ + sql """ insert into ${export_table_name} values (2, 'doris2', 19); """ + sql """ insert into ${export_table_name} values (3, 'doris3', 99); """ + sql """ insert into ${export_table_name} values (4, 'doris4', null); """ + sql """ insert into ${export_table_name} values (5, 'doris5', 15); """ + + // test base data + qt_select_base """ SELECT * FROM ${export_table_name} t ORDER BY user_id; """ + + // test outfile to s3 + def outfile_url = outfile_to_S3() + + // 1. normal + try { + order_qt_select_1 """ SELECT * FROM S3 ( + "uri" = "http://${s3_endpoint}${outfile_url.substring(4, outfile_url.length() - 1)}0.orc", + "format" = "orc", + "use_path_style" = "true", + "resource" = "${resource_name}" + ); + """ + } finally { + } + + + // 2. test endpoint property + try { + order_qt_select_2 """ SELECT * FROM S3 ( + "uri" = "http://${outfile_url.substring(5)}0.orc", + "format" = "orc", + "resource" = "${resource_name}" + ); + """ + } finally { + } + + // 3.test use_path_style + try { + order_qt_select_3 """ SELECT * FROM S3 ( + "uri" = "http://${s3_endpoint}${outfile_url.substring(4, outfile_url.length() - 1)}0.orc", + "format" = "orc", + "use_path_style" = "true", + "resource" = "${resource_name}" + ); + """ + } finally { + } + + try { + order_qt_select_4 """ SELECT * FROM S3 ( + "uri" = "https://${bucket}.${s3_endpoint}/regression/tvf/test_hive_text.text", + "format" = "hive_text", + "resource" = "${resource_name}" + ) order by c1,c2,c3; + """ + } finally { + } + + try { + order_qt_select_5 """ SELECT * FROM S3 ( + "uri" = "https://${bucket}.${s3_endpoint}/regression/tvf/test_hive_text.text", + "format" = "hive_text", + "csv_schema"="k1:int;k2:string;k3:double", + "resource" = "${resource_name}" + ) order by k1,k2,k3; + """ + } finally { + } + + try { + order_qt_select_6 """ SELECT * FROM S3 ( + "uri" = "https://${bucket}.${s3_endpoint}/regression/tvf/test_hive_text.text", + "format" = "hive_text", + "csv_schema"="k1:int;k2:string;k3:double", + "resource" = "${resource_name}" + ) where k3 > 1.5 order by k3,k2,k1; + """ + } finally { + } + + try { + order_qt_select_7 """ SELECT * FROM S3 ( + "uri" = "https://${bucket}.${s3_endpoint}/regression/tvf/test_hive_text.text", + "format" = "hive_text", + "csv_schema"="k1:int;k2:string;k3:double", + "resource" = "${resource_name}" + ) where k1 > 100 order by k3,k2,k1; + """ + } finally { + } + +}