Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SHOW FUNCTIONS #13799

Merged
merged 7 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 41 additions & 26 deletions datafusion/core/src/catalog_common/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use arrow_array::builder::BooleanBuilder;
use arrow_array::builder::{BooleanBuilder, UInt8Builder};
use async_trait::async_trait;
use datafusion_common::error::Result;
use datafusion_common::DataFusionError;
Expand Down Expand Up @@ -247,6 +247,7 @@ impl InformationSchemaConfig {
return_type,
"SCALAR",
udf.documentation().map(|d| d.description.to_string()),
udf.documentation().map(|d| d.syntax_example.to_string()),
)
}
}
Expand All @@ -266,6 +267,7 @@ impl InformationSchemaConfig {
return_type,
"AGGREGATE",
udaf.documentation().map(|d| d.description.to_string()),
udaf.documentation().map(|d| d.syntax_example.to_string()),
)
}
}
Expand All @@ -285,6 +287,7 @@ impl InformationSchemaConfig {
return_type,
"WINDOW",
udwf.documentation().map(|d| d.description.to_string()),
udwf.documentation().map(|d| d.syntax_example.to_string()),
)
}
}
Expand All @@ -308,7 +311,8 @@ impl InformationSchemaConfig {
args: Option<&Vec<(String, String)>>,
arg_types: Vec<String>,
return_type: Option<String>,
is_variadic: bool| {
is_variadic: bool,
rid: u8| {
for (position, type_name) in arg_types.iter().enumerate() {
let param_name =
args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
Expand All @@ -322,6 +326,7 @@ impl InformationSchemaConfig {
type_name,
None::<&str>,
is_variadic,
rid,
);
}
if let Some(return_type) = return_type {
Expand All @@ -335,48 +340,52 @@ impl InformationSchemaConfig {
return_type.as_str(),
None::<&str>,
false,
rid,
);
}
};

for (func_name, udf) in udfs {
let args = udf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udf_args_and_return_types(udf)?;
for (arg_types, return_type) in combinations {
for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udf.signature()),
rid as u8,
);
}
}

for (func_name, udaf) in udafs {
let args = udaf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udaf_args_and_return_types(udaf)?;
for (arg_types, return_type) in combinations {
for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udaf.signature()),
rid as u8,
);
}
}

for (func_name, udwf) in udwfs {
let args = udwf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udwf_args_and_return_types(udwf)?;
for (arg_types, return_type) in combinations {
for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udwf.signature()),
rid as u8,
);
}
}
Expand Down Expand Up @@ -1095,6 +1104,7 @@ impl InformationSchemaRoutines {
Field::new("data_type", DataType::Utf8, true),
Field::new("function_type", DataType::Utf8, true),
Field::new("description", DataType::Utf8, true),
Field::new("syntax_example", DataType::Utf8, true),
]));

Self { schema, config }
Expand All @@ -1114,6 +1124,7 @@ impl InformationSchemaRoutines {
data_type: StringBuilder::new(),
function_type: StringBuilder::new(),
description: StringBuilder::new(),
syntax_example: StringBuilder::new(),
}
}
}
Expand All @@ -1131,6 +1142,7 @@ struct InformationSchemaRoutinesBuilder {
data_type: StringBuilder,
function_type: StringBuilder,
description: StringBuilder,
syntax_example: StringBuilder,
}

impl InformationSchemaRoutinesBuilder {
Expand All @@ -1145,6 +1157,7 @@ impl InformationSchemaRoutinesBuilder {
data_type: Option<impl AsRef<str>>,
function_type: impl AsRef<str>,
description: Option<impl AsRef<str>>,
syntax_example: Option<impl AsRef<str>>,
) {
self.specific_catalog.append_value(catalog_name.as_ref());
self.specific_schema.append_value(schema_name.as_ref());
Expand All @@ -1157,6 +1170,7 @@ impl InformationSchemaRoutinesBuilder {
self.data_type.append_option(data_type.as_ref());
self.function_type.append_value(function_type.as_ref());
self.description.append_option(description);
self.syntax_example.append_option(syntax_example);
}

fn finish(&mut self) -> RecordBatch {
Expand All @@ -1174,6 +1188,7 @@ impl InformationSchemaRoutinesBuilder {
Arc::new(self.data_type.finish()),
Arc::new(self.function_type.finish()),
Arc::new(self.description.finish()),
Arc::new(self.syntax_example.finish()),
],
)
.unwrap()
Expand Down Expand Up @@ -1222,6 +1237,12 @@ impl InformationSchemaParameters {
Field::new("data_type", DataType::Utf8, false),
Field::new("parameter_default", DataType::Utf8, true),
Field::new("is_variadic", DataType::Boolean, false),
// `rid` (short for `routine id`) is used to differentiate parameters from different signatures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for the documentation

// (It serves as the group-by key when generating the `SHOW FUNCTIONS` query).
// For example, the following signatures have different `rid` values:
// - `datetrunc(Utf8, Timestamp(Microsecond, Some("+TZ"))) -> Timestamp(Microsecond, Some("+TZ"))`
// - `datetrunc(Utf8View, Timestamp(Nanosecond, None)) -> Timestamp(Nanosecond, None)`
Field::new("rid", DataType::UInt8, false),
]));

Self { schema, config }
Expand All @@ -1239,7 +1260,7 @@ impl InformationSchemaParameters {
data_type: StringBuilder::new(),
parameter_default: StringBuilder::new(),
is_variadic: BooleanBuilder::new(),
inserted: HashSet::new(),
rid: UInt8Builder::new(),
}
}
}
Expand All @@ -1255,8 +1276,7 @@ struct InformationSchemaParametersBuilder {
data_type: StringBuilder,
parameter_default: StringBuilder,
is_variadic: BooleanBuilder,
// use HashSet to avoid duplicate rows. The key is (specific_name, ordinal_position, parameter_mode, data_type)
inserted: HashSet<(String, u64, String, String)>,
rid: UInt8Builder,
}

impl InformationSchemaParametersBuilder {
Expand All @@ -1272,25 +1292,19 @@ impl InformationSchemaParametersBuilder {
data_type: impl AsRef<str>,
parameter_default: Option<impl AsRef<str>>,
is_variadic: bool,
rid: u8,
) {
let key = (
specific_name.as_ref().to_string(),
ordinal_position,
parameter_mode.as_ref().to_string(),
data_type.as_ref().to_string(),
);
if self.inserted.insert(key) {
self.specific_catalog
.append_value(specific_catalog.as_ref());
self.specific_schema.append_value(specific_schema.as_ref());
self.specific_name.append_value(specific_name.as_ref());
self.ordinal_position.append_value(ordinal_position);
self.parameter_mode.append_value(parameter_mode.as_ref());
self.parameter_name.append_option(parameter_name.as_ref());
self.data_type.append_value(data_type.as_ref());
self.parameter_default.append_option(parameter_default);
self.is_variadic.append_value(is_variadic);
}
self.specific_catalog
.append_value(specific_catalog.as_ref());
self.specific_schema.append_value(specific_schema.as_ref());
self.specific_name.append_value(specific_name.as_ref());
self.ordinal_position.append_value(ordinal_position);
self.parameter_mode.append_value(parameter_mode.as_ref());
self.parameter_name.append_option(parameter_name.as_ref());
self.data_type.append_value(data_type.as_ref());
self.parameter_default.append_option(parameter_default);
self.is_variadic.append_value(is_variadic);
self.rid.append_value(rid);
}

fn finish(&mut self) -> RecordBatch {
Expand All @@ -1306,6 +1320,7 @@ impl InformationSchemaParametersBuilder {
Arc::new(self.data_type.finish()),
Arc::new(self.parameter_default.finish()),
Arc::new(self.is_variadic.finish()),
Arc::new(self.rid.finish()),
],
)
.unwrap()
Expand Down
88 changes: 88 additions & 0 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
filter,
} => self.show_columns_to_plan(extended, full, table_name, filter),

Statement::ShowFunctions { filter, .. } => {
self.show_functions_to_plan(filter)
}

Statement::Insert(Insert {
or,
into,
Expand Down Expand Up @@ -1879,6 +1883,90 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
}

/// Rewrite `SHOW FUNCTIONS` to another SQL query
/// The query is based on the `information_schema.routines` and `information_schema.parameters` tables
///
/// The output columns:
/// - function_name: The name of function
/// - return_type: The return type of the function
/// - parameters: The name of parameters (ordered by the ordinal position)
/// - parameter_types: The type of parameters (ordered by the ordinal position)
/// - description: The description of the function (the description defined in the document)
/// - syntax_example: The syntax_example of the function (the syntax_example defined in the document)
fn show_functions_to_plan(
&self,
filter: Option<ShowStatementFilter>,
) -> Result<LogicalPlan> {
let where_clause = if let Some(filter) = filter {
match filter {
ShowStatementFilter::Like(like) => {
format!("WHERE p.function_name like '{like}'")
}
_ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
}
} else {
"".to_string()
};

let query = format!(
r#"
SELECT DISTINCT
p.*,
r.function_type function_type,
r.description description,
r.syntax_example syntax_example
FROM
(
SELECT
i.specific_name function_name,
o.data_type return_type,
array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
FROM (
SELECT
specific_catalog,
specific_schema,
specific_name,
ordinal_position,
parameter_name,
data_type,
rid
FROM
information_schema.parameters
WHERE
parameter_mode = 'IN'
) i
JOIN
(
SELECT
specific_catalog,
specific_schema,
specific_name,
ordinal_position,
parameter_name,
data_type,
rid
FROM
information_schema.parameters
WHERE
parameter_mode = 'OUT'
) o
ON i.specific_catalog = o.specific_catalog
AND i.specific_schema = o.specific_schema
AND i.specific_name = o.specific_name
AND i.rid = o.rid
GROUP BY 1, 2, i.rid
) as p
JOIN information_schema.routines r
ON p.function_name = r.routine_name
{where_clause}
"#
);
let mut rewrite = DFParser::parse_sql(&query)?;
assert_eq!(rewrite.len(), 1);
self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
}

fn show_create_table_to_plan(
&self,
sql_table_name: ObjectName,
Expand Down
Loading
Loading