diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 49b529facc9d..bcbaae0d2462 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -138,7 +138,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aef82843a0ec9f8b19567445ad2421ceeb1d711514384bdd3d49fe37102ee13" dependencies = [ "bigdecimal", - "bzip2", + "bzip2 0.4.4", "crc32fast", "digest", "libflate", @@ -389,7 +389,7 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522" dependencies = [ - "bzip2", + "bzip2 0.4.4", "flate2", "futures-core", "futures-io", @@ -906,6 +906,16 @@ dependencies = [ "libc", ] +[[package]] +name = "bzip2" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bafdbf26611df8c14810e268ddceda071c297570a5fb360ceddf617fe417ef58" +dependencies = [ + "bzip2-sys", + "libc", +] + [[package]] name = "bzip2-sys" version = "0.1.11+1.0.8" @@ -1210,7 +1220,7 @@ dependencies = [ "async-compression", "async-trait", "bytes", - "bzip2", + "bzip2 0.5.0", "chrono", "dashmap", "datafusion-catalog", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 01bf03f32e8e..9bf530a9d6ac 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -92,7 +92,7 @@ async-compression = { version = "0.4.0", features = [ ], optional = true } async-trait = { workspace = true } bytes = { workspace = true } -bzip2 = { version = "0.4.3", optional = true } +bzip2 = { version = "0.5.0", optional = true } chrono = { workspace = true } dashmap = { workspace = true } datafusion-catalog = { workspace = true } diff --git a/datafusion/core/src/catalog_common/information_schema.rs b/datafusion/core/src/catalog_common/information_schema.rs index 1d4a3c15f7ca..ce3092acfdf1 100644 --- a/datafusion/core/src/catalog_common/information_schema.rs +++ b/datafusion/core/src/catalog_common/information_schema.rs @@ -34,7 +34,7 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; -use arrow_array::builder::BooleanBuilder; +use arrow_array::builder::{BooleanBuilder, UInt8Builder}; use async_trait::async_trait; use datafusion_common::error::Result; use datafusion_common::DataFusionError; @@ -247,6 +247,7 @@ impl InformationSchemaConfig { return_type, "SCALAR", udf.documentation().map(|d| d.description.to_string()), + udf.documentation().map(|d| d.syntax_example.to_string()), ) } } @@ -266,6 +267,7 @@ impl InformationSchemaConfig { return_type, "AGGREGATE", udaf.documentation().map(|d| d.description.to_string()), + udaf.documentation().map(|d| d.syntax_example.to_string()), ) } } @@ -285,6 +287,7 @@ impl InformationSchemaConfig { return_type, "WINDOW", udwf.documentation().map(|d| d.description.to_string()), + udwf.documentation().map(|d| d.syntax_example.to_string()), ) } } @@ -308,7 +311,8 @@ impl InformationSchemaConfig { args: Option<&Vec<(String, String)>>, arg_types: Vec, return_type: Option, - is_variadic: bool| { + is_variadic: bool, + rid: u8| { for (position, type_name) in arg_types.iter().enumerate() { let param_name = args.and_then(|a| a.get(position).map(|arg| arg.0.as_str())); @@ -322,6 +326,7 @@ impl InformationSchemaConfig { type_name, None::<&str>, is_variadic, + rid, ); } if let Some(return_type) = return_type { @@ -335,6 +340,7 @@ impl InformationSchemaConfig { return_type.as_str(), None::<&str>, false, + rid, ); } }; @@ -342,13 +348,14 @@ impl InformationSchemaConfig { for (func_name, udf) in udfs { let args = udf.documentation().and_then(|d| d.arguments.clone()); let combinations = get_udf_args_and_return_types(udf)?; - for (arg_types, return_type) in combinations { + for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() { add_parameters( func_name, args.as_ref(), arg_types, return_type, Self::is_variadic(udf.signature()), + rid as u8, ); } } @@ -356,13 +363,14 @@ impl InformationSchemaConfig { for (func_name, udaf) in udafs { let args = udaf.documentation().and_then(|d| d.arguments.clone()); let combinations = get_udaf_args_and_return_types(udaf)?; - for (arg_types, return_type) in combinations { + for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() { add_parameters( func_name, args.as_ref(), arg_types, return_type, Self::is_variadic(udaf.signature()), + rid as u8, ); } } @@ -370,13 +378,14 @@ impl InformationSchemaConfig { for (func_name, udwf) in udwfs { let args = udwf.documentation().and_then(|d| d.arguments.clone()); let combinations = get_udwf_args_and_return_types(udwf)?; - for (arg_types, return_type) in combinations { + for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() { add_parameters( func_name, args.as_ref(), arg_types, return_type, Self::is_variadic(udwf.signature()), + rid as u8, ); } } @@ -1095,6 +1104,7 @@ impl InformationSchemaRoutines { Field::new("data_type", DataType::Utf8, true), Field::new("function_type", DataType::Utf8, true), Field::new("description", DataType::Utf8, true), + Field::new("syntax_example", DataType::Utf8, true), ])); Self { schema, config } @@ -1114,6 +1124,7 @@ impl InformationSchemaRoutines { data_type: StringBuilder::new(), function_type: StringBuilder::new(), description: StringBuilder::new(), + syntax_example: StringBuilder::new(), } } } @@ -1131,6 +1142,7 @@ struct InformationSchemaRoutinesBuilder { data_type: StringBuilder, function_type: StringBuilder, description: StringBuilder, + syntax_example: StringBuilder, } impl InformationSchemaRoutinesBuilder { @@ -1145,6 +1157,7 @@ impl InformationSchemaRoutinesBuilder { data_type: Option>, function_type: impl AsRef, description: Option>, + syntax_example: Option>, ) { self.specific_catalog.append_value(catalog_name.as_ref()); self.specific_schema.append_value(schema_name.as_ref()); @@ -1157,6 +1170,7 @@ impl InformationSchemaRoutinesBuilder { self.data_type.append_option(data_type.as_ref()); self.function_type.append_value(function_type.as_ref()); self.description.append_option(description); + self.syntax_example.append_option(syntax_example); } fn finish(&mut self) -> RecordBatch { @@ -1174,6 +1188,7 @@ impl InformationSchemaRoutinesBuilder { Arc::new(self.data_type.finish()), Arc::new(self.function_type.finish()), Arc::new(self.description.finish()), + Arc::new(self.syntax_example.finish()), ], ) .unwrap() @@ -1222,6 +1237,12 @@ impl InformationSchemaParameters { Field::new("data_type", DataType::Utf8, false), Field::new("parameter_default", DataType::Utf8, true), Field::new("is_variadic", DataType::Boolean, false), + // `rid` (short for `routine id`) is used to differentiate parameters from different signatures + // (It serves as the group-by key when generating the `SHOW FUNCTIONS` query). + // For example, the following signatures have different `rid` values: + // - `datetrunc(Utf8, Timestamp(Microsecond, Some("+TZ"))) -> Timestamp(Microsecond, Some("+TZ"))` + // - `datetrunc(Utf8View, Timestamp(Nanosecond, None)) -> Timestamp(Nanosecond, None)` + Field::new("rid", DataType::UInt8, false), ])); Self { schema, config } @@ -1239,7 +1260,7 @@ impl InformationSchemaParameters { data_type: StringBuilder::new(), parameter_default: StringBuilder::new(), is_variadic: BooleanBuilder::new(), - inserted: HashSet::new(), + rid: UInt8Builder::new(), } } } @@ -1255,8 +1276,7 @@ struct InformationSchemaParametersBuilder { data_type: StringBuilder, parameter_default: StringBuilder, is_variadic: BooleanBuilder, - // use HashSet to avoid duplicate rows. The key is (specific_name, ordinal_position, parameter_mode, data_type) - inserted: HashSet<(String, u64, String, String)>, + rid: UInt8Builder, } impl InformationSchemaParametersBuilder { @@ -1272,25 +1292,19 @@ impl InformationSchemaParametersBuilder { data_type: impl AsRef, parameter_default: Option>, is_variadic: bool, + rid: u8, ) { - let key = ( - specific_name.as_ref().to_string(), - ordinal_position, - parameter_mode.as_ref().to_string(), - data_type.as_ref().to_string(), - ); - if self.inserted.insert(key) { - self.specific_catalog - .append_value(specific_catalog.as_ref()); - self.specific_schema.append_value(specific_schema.as_ref()); - self.specific_name.append_value(specific_name.as_ref()); - self.ordinal_position.append_value(ordinal_position); - self.parameter_mode.append_value(parameter_mode.as_ref()); - self.parameter_name.append_option(parameter_name.as_ref()); - self.data_type.append_value(data_type.as_ref()); - self.parameter_default.append_option(parameter_default); - self.is_variadic.append_value(is_variadic); - } + self.specific_catalog + .append_value(specific_catalog.as_ref()); + self.specific_schema.append_value(specific_schema.as_ref()); + self.specific_name.append_value(specific_name.as_ref()); + self.ordinal_position.append_value(ordinal_position); + self.parameter_mode.append_value(parameter_mode.as_ref()); + self.parameter_name.append_option(parameter_name.as_ref()); + self.data_type.append_value(data_type.as_ref()); + self.parameter_default.append_option(parameter_default); + self.is_variadic.append_value(is_variadic); + self.rid.append_value(rid); } fn finish(&mut self) -> RecordBatch { @@ -1306,6 +1320,7 @@ impl InformationSchemaParametersBuilder { Arc::new(self.data_type.finish()), Arc::new(self.parameter_default.finish()), Arc::new(self.is_variadic.finish()), + Arc::new(self.rid.finish()), ], ) .unwrap() diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index e5ce28e73806..b4167900d4c2 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -106,6 +106,28 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result(BzEncoder); + +#[cfg(feature = "compression")] +impl Write for AutoFinishBzEncoder { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.0.flush() + } +} + +#[cfg(feature = "compression")] +impl Drop for AutoFinishBzEncoder { + fn drop(&mut self) { + let _ = self.0.try_finish(); + } +} + /// Returns file groups [`Vec>`] for scanning `partitions` of `filename` pub fn partitioned_file_groups( path: &str, @@ -147,9 +169,10 @@ pub fn partitioned_file_groups( Box::new(encoder) } #[cfg(feature = "compression")] - FileCompressionType::BZIP2 => { - Box::new(BzEncoder::new(file, BzCompression::default())) - } + FileCompressionType::BZIP2 => Box::new(AutoFinishBzEncoder(BzEncoder::new( + file, + BzCompression::default(), + ))), #[cfg(not(feature = "compression"))] FileCompressionType::GZIP | FileCompressionType::BZIP2 @@ -183,8 +206,8 @@ pub fn partitioned_file_groups( } } - // Must drop the stream before creating ObjectMeta below as drop - // triggers finish for ZstdEncoder which writes additional data + // Must drop the stream before creating ObjectMeta below as drop triggers + // finish for ZstdEncoder/BzEncoder which writes additional data for mut w in writers.into_iter() { w.flush().unwrap(); } diff --git a/datafusion/functions-nested/src/distance.rs b/datafusion/functions-nested/src/distance.rs index 381ddeb59a0b..704e840da3f5 100644 --- a/datafusion/functions-nested/src/distance.rs +++ b/datafusion/functions-nested/src/distance.rs @@ -17,24 +17,23 @@ //! [ScalarUDFImpl] definitions for array_distance function. -use crate::utils::{downcast_arg, make_scalar_function}; +use crate::utils::make_scalar_function; use arrow_array::{ Array, ArrayRef, Float64Array, LargeListArray, ListArray, OffsetSizeTrait, }; use arrow_schema::DataType; use arrow_schema::DataType::{FixedSizeList, Float64, LargeList, List}; -use core::any::type_name; use datafusion_common::cast::{ as_float32_array, as_float64_array, as_generic_list_array, as_int32_array, as_int64_array, }; use datafusion_common::utils::coerced_fixed_size_list_to_list; -use datafusion_common::DataFusionError; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{exec_err, internal_datafusion_err, Result}; use datafusion_expr::scalar_doc_sections::DOC_SECTION_ARRAY; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; +use datafusion_functions::{downcast_arg, downcast_named_arg}; use std::any::Any; use std::sync::{Arc, OnceLock}; diff --git a/datafusion/functions-nested/src/length.rs b/datafusion/functions-nested/src/length.rs index 3f92cb3ebb21..2f03842cbeeb 100644 --- a/datafusion/functions-nested/src/length.rs +++ b/datafusion/functions-nested/src/length.rs @@ -17,20 +17,19 @@ //! [`ScalarUDFImpl`] definitions for array_length function. -use crate::utils::{downcast_arg, make_scalar_function}; +use crate::utils::make_scalar_function; use arrow_array::{ Array, ArrayRef, Int64Array, LargeListArray, ListArray, OffsetSizeTrait, UInt64Array, }; use arrow_schema::DataType; use arrow_schema::DataType::{FixedSizeList, LargeList, List, UInt64}; -use core::any::type_name; use datafusion_common::cast::{as_generic_list_array, as_int64_array}; -use datafusion_common::DataFusionError; -use datafusion_common::{exec_err, plan_err, Result}; +use datafusion_common::{exec_err, internal_datafusion_err, plan_err, Result}; use datafusion_expr::scalar_doc_sections::DOC_SECTION_ARRAY; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; +use datafusion_functions::{downcast_arg, downcast_named_arg}; use std::any::Any; use std::sync::{Arc, OnceLock}; diff --git a/datafusion/functions-nested/src/string.rs b/datafusion/functions-nested/src/string.rs index 143a3d06a32a..9288b374dacb 100644 --- a/datafusion/functions-nested/src/string.rs +++ b/datafusion/functions-nested/src/string.rs @@ -26,11 +26,13 @@ use arrow::array::{ use arrow::datatypes::{DataType, Field}; use datafusion_expr::TypeSignature; -use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; +use datafusion_common::{ + internal_datafusion_err, not_impl_err, plan_err, DataFusionError, Result, +}; -use std::any::{type_name, Any}; +use std::any::Any; -use crate::utils::{downcast_arg, make_scalar_function}; +use crate::utils::make_scalar_function; use arrow::compute::cast; use arrow_array::builder::{ArrayBuilder, LargeStringBuilder, StringViewBuilder}; use arrow_array::cast::AsArray; @@ -45,6 +47,7 @@ use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; use datafusion_functions::strings::StringArrayType; +use datafusion_functions::{downcast_arg, downcast_named_arg}; use std::sync::{Arc, OnceLock}; macro_rules! call_array_function { diff --git a/datafusion/functions-nested/src/utils.rs b/datafusion/functions-nested/src/utils.rs index 4b7b5ebd8ba1..c54d6d49cecc 100644 --- a/datafusion/functions-nested/src/utils.rs +++ b/datafusion/functions-nested/src/utils.rs @@ -28,23 +28,12 @@ use arrow_array::{ use arrow_buffer::OffsetBuffer; use arrow_schema::{Field, Fields}; use datafusion_common::cast::{as_large_list_array, as_list_array}; -use datafusion_common::{exec_err, internal_err, plan_err, Result, ScalarValue}; +use datafusion_common::{ + exec_err, internal_datafusion_err, internal_err, plan_err, Result, ScalarValue, +}; -use core::any::type_name; -use datafusion_common::DataFusionError; use datafusion_expr::ColumnarValue; - -macro_rules! downcast_arg { - ($ARG:expr, $ARRAY_TYPE:ident) => {{ - $ARG.as_any().downcast_ref::<$ARRAY_TYPE>().ok_or_else(|| { - DataFusionError::Internal(format!( - "could not cast to {}", - type_name::<$ARRAY_TYPE>() - )) - })? - }}; -} -pub(crate) use downcast_arg; +use datafusion_functions::{downcast_arg, downcast_named_arg}; pub(crate) fn check_datatypes(name: &str, args: &[&ArrayRef]) -> Result<()> { let data_type = args[0].data_type(); diff --git a/datafusion/functions/src/core/greatest.rs b/datafusion/functions/src/core/greatest.rs index 3ea2eadf22ee..e91ec2b0c4d8 100644 --- a/datafusion/functions/src/core/greatest.rs +++ b/datafusion/functions/src/core/greatest.rs @@ -15,20 +15,19 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{make_comparator, Array, ArrayRef, BooleanArray}; +use crate::core::greatest_least_utils::GreatestLeastOperator; +use arrow::array::{make_comparator, Array, BooleanArray}; use arrow::compute::kernels::cmp; -use arrow::compute::kernels::zip::zip; use arrow::compute::SortOptions; use arrow::datatypes::DataType; use arrow_buffer::BooleanBuffer; -use datafusion_common::{exec_err, plan_err, Result, ScalarValue}; +use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_doc::Documentation; -use datafusion_expr::binary::type_union_resolution; use datafusion_expr::scalar_doc_sections::DOC_SECTION_CONDITIONAL; use datafusion_expr::ColumnarValue; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; use std::any::Any; -use std::sync::{Arc, OnceLock}; +use std::sync::OnceLock; const SORT_OPTIONS: SortOptions = SortOptions { // We want greatest first @@ -57,79 +56,57 @@ impl GreatestFunc { } } -fn get_logical_null_count(arr: &dyn Array) -> usize { - arr.logical_nulls() - .map(|n| n.null_count()) - .unwrap_or_default() -} +impl GreatestLeastOperator for GreatestFunc { + const NAME: &'static str = "greatest"; -/// Return boolean array where `arr[i] = lhs[i] >= rhs[i]` for all i, where `arr` is the result array -/// Nulls are always considered smaller than any other value -fn get_larger(lhs: &dyn Array, rhs: &dyn Array) -> Result { - // Fast path: - // If both arrays are not nested, have the same length and no nulls, we can use the faster vectorised kernel - // - If both arrays are not nested: Nested types, such as lists, are not supported as the null semantics are not well-defined. - // - both array does not have any nulls: cmp::gt_eq will return null if any of the input is null while we want to return false in that case - if !lhs.data_type().is_nested() - && get_logical_null_count(lhs) == 0 - && get_logical_null_count(rhs) == 0 - { - return cmp::gt_eq(&lhs, &rhs).map_err(|e| e.into()); - } + fn keep_scalar<'a>( + lhs: &'a ScalarValue, + rhs: &'a ScalarValue, + ) -> Result<&'a ScalarValue> { + if !lhs.data_type().is_nested() { + return if lhs >= rhs { Ok(lhs) } else { Ok(rhs) }; + } - let cmp = make_comparator(lhs, rhs, SORT_OPTIONS)?; + // If complex type we can't compare directly as we want null values to be smaller + let cmp = make_comparator( + lhs.to_array()?.as_ref(), + rhs.to_array()?.as_ref(), + SORT_OPTIONS, + )?; - if lhs.len() != rhs.len() { - return exec_err!( - "All arrays should have the same length for greatest comparison" - ); + if cmp(0, 0).is_ge() { + Ok(lhs) + } else { + Ok(rhs) + } } - let values = BooleanBuffer::collect_bool(lhs.len(), |i| cmp(i, i).is_ge()); - - // No nulls as we only want to keep the values that are larger, its either true or false - Ok(BooleanArray::new(values, None)) -} - -/// Return array where the largest value at each index is kept -fn keep_larger(lhs: ArrayRef, rhs: ArrayRef) -> Result { - // True for values that we should keep from the left array - let keep_lhs = get_larger(lhs.as_ref(), rhs.as_ref())?; - - let larger = zip(&keep_lhs, &lhs, &rhs)?; + /// Return boolean array where `arr[i] = lhs[i] >= rhs[i]` for all i, where `arr` is the result array + /// Nulls are always considered smaller than any other value + fn get_indexes_to_keep(lhs: &dyn Array, rhs: &dyn Array) -> Result { + // Fast path: + // If both arrays are not nested, have the same length and no nulls, we can use the faster vectorised kernel + // - If both arrays are not nested: Nested types, such as lists, are not supported as the null semantics are not well-defined. + // - both array does not have any nulls: cmp::gt_eq will return null if any of the input is null while we want to return false in that case + if !lhs.data_type().is_nested() + && lhs.logical_null_count() == 0 + && rhs.logical_null_count() == 0 + { + return cmp::gt_eq(&lhs, &rhs).map_err(|e| e.into()); + } - Ok(larger) -} + let cmp = make_comparator(lhs, rhs, SORT_OPTIONS)?; -fn keep_larger_scalar<'a>( - lhs: &'a ScalarValue, - rhs: &'a ScalarValue, -) -> Result<&'a ScalarValue> { - if !lhs.data_type().is_nested() { - return if lhs >= rhs { Ok(lhs) } else { Ok(rhs) }; - } - - // If complex type we can't compare directly as we want null values to be smaller - let cmp = make_comparator( - lhs.to_array()?.as_ref(), - rhs.to_array()?.as_ref(), - SORT_OPTIONS, - )?; + if lhs.len() != rhs.len() { + return internal_err!( + "All arrays should have the same length for greatest comparison" + ); + } - if cmp(0, 0).is_ge() { - Ok(lhs) - } else { - Ok(rhs) - } -} + let values = BooleanBuffer::collect_bool(lhs.len(), |i| cmp(i, i).is_ge()); -fn find_coerced_type(data_types: &[DataType]) -> Result { - if data_types.is_empty() { - plan_err!("greatest was called without any arguments. It requires at least 1.") - } else if let Some(coerced_type) = type_union_resolution(data_types) { - Ok(coerced_type) - } else { - plan_err!("Cannot find a common type for arguments") + // No nulls as we only want to keep the values that are larger, its either true or false + Ok(BooleanArray::new(values, None)) } } @@ -151,74 +128,12 @@ impl ScalarUDFImpl for GreatestFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - if args.is_empty() { - return exec_err!( - "greatest was called with no arguments. It requires at least 1." - ); - } - - // Some engines (e.g. SQL Server) allow greatest with single arg, it's a noop - if args.len() == 1 { - return Ok(args[0].clone()); - } - - // Split to scalars and arrays for later optimization - let (scalars, arrays): (Vec<_>, Vec<_>) = args.iter().partition(|x| match x { - ColumnarValue::Scalar(_) => true, - ColumnarValue::Array(_) => false, - }); - - let mut arrays_iter = arrays.iter().map(|x| match x { - ColumnarValue::Array(a) => a, - _ => unreachable!(), - }); - - let first_array = arrays_iter.next(); - - let mut largest: ArrayRef; - - // Optimization: merge all scalars into one to avoid recomputing - if !scalars.is_empty() { - let mut scalars_iter = scalars.iter().map(|x| match x { - ColumnarValue::Scalar(s) => s, - _ => unreachable!(), - }); - - // We have at least one scalar - let mut largest_scalar = scalars_iter.next().unwrap(); - - for scalar in scalars_iter { - largest_scalar = keep_larger_scalar(largest_scalar, scalar)?; - } - - // If we only have scalars, return the largest one - if arrays.is_empty() { - return Ok(ColumnarValue::Scalar(largest_scalar.clone())); - } - - // We have at least one array - let first_array = first_array.unwrap(); - - // Start with the largest value - largest = keep_larger( - Arc::clone(first_array), - largest_scalar.to_array_of_size(first_array.len())?, - )?; - } else { - // If we only have arrays, start with the first array - // (We must have at least one array) - largest = Arc::clone(first_array.unwrap()); - } - - for array in arrays_iter { - largest = keep_larger(Arc::clone(array), largest)?; - } - - Ok(ColumnarValue::Array(largest)) + super::greatest_least_utils::execute_conditional::(args) } fn coerce_types(&self, arg_types: &[DataType]) -> Result> { - let coerced_type = find_coerced_type(arg_types)?; + let coerced_type = + super::greatest_least_utils::find_coerced_type::(arg_types)?; Ok(vec![coerced_type; arg_types.len()]) } diff --git a/datafusion/functions/src/core/greatest_least_utils.rs b/datafusion/functions/src/core/greatest_least_utils.rs new file mode 100644 index 000000000000..46b3645e703a --- /dev/null +++ b/datafusion/functions/src/core/greatest_least_utils.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, BooleanArray}; +use arrow::compute::kernels::zip::zip; +use arrow::datatypes::DataType; +use datafusion_common::{internal_err, plan_err, Result, ScalarValue}; +use datafusion_expr_common::columnar_value::ColumnarValue; +use datafusion_expr_common::type_coercion::binary::type_union_resolution; +use std::sync::Arc; + +pub(super) trait GreatestLeastOperator { + const NAME: &'static str; + + fn keep_scalar<'a>( + lhs: &'a ScalarValue, + rhs: &'a ScalarValue, + ) -> Result<&'a ScalarValue>; + + /// Return array with true for values that we should keep from the lhs array + fn get_indexes_to_keep(lhs: &dyn Array, rhs: &dyn Array) -> Result; +} + +fn keep_array( + lhs: ArrayRef, + rhs: ArrayRef, +) -> Result { + // True for values that we should keep from the left array + let keep_lhs = Op::get_indexes_to_keep(lhs.as_ref(), rhs.as_ref())?; + + let result = zip(&keep_lhs, &lhs, &rhs)?; + + Ok(result) +} + +pub(super) fn execute_conditional( + args: &[ColumnarValue], +) -> Result { + if args.is_empty() { + return internal_err!( + "{} was called with no arguments. It requires at least 1.", + Op::NAME + ); + } + + // Some engines (e.g. SQL Server) allow greatest/least with single arg, it's a noop + if args.len() == 1 { + return Ok(args[0].clone()); + } + + // Split to scalars and arrays for later optimization + let (scalars, arrays): (Vec<_>, Vec<_>) = args.iter().partition(|x| match x { + ColumnarValue::Scalar(_) => true, + ColumnarValue::Array(_) => false, + }); + + let mut arrays_iter = arrays.iter().map(|x| match x { + ColumnarValue::Array(a) => a, + _ => unreachable!(), + }); + + let first_array = arrays_iter.next(); + + let mut result: ArrayRef; + + // Optimization: merge all scalars into one to avoid recomputing (constant folding) + if !scalars.is_empty() { + let mut scalars_iter = scalars.iter().map(|x| match x { + ColumnarValue::Scalar(s) => s, + _ => unreachable!(), + }); + + // We have at least one scalar + let mut result_scalar = scalars_iter.next().unwrap(); + + for scalar in scalars_iter { + result_scalar = Op::keep_scalar(result_scalar, scalar)?; + } + + // If we only have scalars, return the one that we should keep (largest/least) + if arrays.is_empty() { + return Ok(ColumnarValue::Scalar(result_scalar.clone())); + } + + // We have at least one array + let first_array = first_array.unwrap(); + + // Start with the result value + result = keep_array::( + Arc::clone(first_array), + result_scalar.to_array_of_size(first_array.len())?, + )?; + } else { + // If we only have arrays, start with the first array + // (We must have at least one array) + result = Arc::clone(first_array.unwrap()); + } + + for array in arrays_iter { + result = keep_array::(Arc::clone(array), result)?; + } + + Ok(ColumnarValue::Array(result)) +} + +pub(super) fn find_coerced_type( + data_types: &[DataType], +) -> Result { + if data_types.is_empty() { + plan_err!( + "{} was called without any arguments. It requires at least 1.", + Op::NAME + ) + } else if let Some(coerced_type) = type_union_resolution(data_types) { + Ok(coerced_type) + } else { + plan_err!("Cannot find a common type for arguments") + } +} diff --git a/datafusion/functions/src/core/least.rs b/datafusion/functions/src/core/least.rs new file mode 100644 index 000000000000..b9ea65cdb732 --- /dev/null +++ b/datafusion/functions/src/core/least.rs @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::core::greatest_least_utils::GreatestLeastOperator; +use arrow::array::{make_comparator, Array, BooleanArray}; +use arrow::compute::kernels::cmp; +use arrow::compute::SortOptions; +use arrow::datatypes::DataType; +use arrow_buffer::BooleanBuffer; +use datafusion_common::{internal_err, Result, ScalarValue}; +use datafusion_doc::Documentation; +use datafusion_expr::scalar_doc_sections::DOC_SECTION_CONDITIONAL; +use datafusion_expr::ColumnarValue; +use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; +use std::any::Any; +use std::sync::OnceLock; + +const SORT_OPTIONS: SortOptions = SortOptions { + // Having the smallest result first + descending: false, + + // NULL will be greater than any other value + nulls_first: false, +}; + +#[derive(Debug)] +pub struct LeastFunc { + signature: Signature, +} + +impl Default for LeastFunc { + fn default() -> Self { + LeastFunc::new() + } +} + +impl LeastFunc { + pub fn new() -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + } + } +} + +impl GreatestLeastOperator for LeastFunc { + const NAME: &'static str = "least"; + + fn keep_scalar<'a>( + lhs: &'a ScalarValue, + rhs: &'a ScalarValue, + ) -> Result<&'a ScalarValue> { + // Manual checking for nulls as: + // 1. If we're going to use <=, in Rust None is smaller than Some(T), which we don't want + // 2. And we can't use make_comparator as it has no natural order (Arrow error) + if lhs.is_null() { + return Ok(rhs); + } + + if rhs.is_null() { + return Ok(lhs); + } + + if !lhs.data_type().is_nested() { + return if lhs <= rhs { Ok(lhs) } else { Ok(rhs) }; + } + + // Not using <= as in Rust None is smaller than Some(T) + + // If complex type we can't compare directly as we want null values to be larger + let cmp = make_comparator( + lhs.to_array()?.as_ref(), + rhs.to_array()?.as_ref(), + SORT_OPTIONS, + )?; + + if cmp(0, 0).is_le() { + Ok(lhs) + } else { + Ok(rhs) + } + } + + /// Return boolean array where `arr[i] = lhs[i] <= rhs[i]` for all i, where `arr` is the result array + /// Nulls are always considered larger than any other value + fn get_indexes_to_keep(lhs: &dyn Array, rhs: &dyn Array) -> Result { + // Fast path: + // If both arrays are not nested, have the same length and no nulls, we can use the faster vectorised kernel + // - If both arrays are not nested: Nested types, such as lists, are not supported as the null semantics are not well-defined. + // - both array does not have any nulls: cmp::lt_eq will return null if any of the input is null while we want to return false in that case + if !lhs.data_type().is_nested() + && lhs.logical_null_count() == 0 + && rhs.logical_null_count() == 0 + { + return cmp::lt_eq(&lhs, &rhs).map_err(|e| e.into()); + } + + let cmp = make_comparator(lhs, rhs, SORT_OPTIONS)?; + + if lhs.len() != rhs.len() { + return internal_err!( + "All arrays should have the same length for least comparison" + ); + } + + let values = BooleanBuffer::collect_bool(lhs.len(), |i| cmp(i, i).is_le()); + + // No nulls as we only want to keep the values that are smaller, its either true or false + Ok(BooleanArray::new(values, None)) + } +} + +impl ScalarUDFImpl for LeastFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "least" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + Ok(arg_types[0].clone()) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + super::greatest_least_utils::execute_conditional::(args) + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + let coerced_type = + super::greatest_least_utils::find_coerced_type::(arg_types)?; + + Ok(vec![coerced_type; arg_types.len()]) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(get_smallest_doc()) + } +} +static DOCUMENTATION: OnceLock = OnceLock::new(); + +fn get_smallest_doc() -> &'static Documentation { + DOCUMENTATION.get_or_init(|| { + Documentation::builder( + DOC_SECTION_CONDITIONAL, + "Returns the smallest value in a list of expressions. Returns _null_ if all expressions are _null_.", + "least(expression1[, ..., expression_n])") + .with_sql_example(r#"```sql +> select least(4, 7, 5); ++---------------------------+ +| least(4,7,5) | ++---------------------------+ +| 4 | ++---------------------------+ +```"#, + ) + .with_argument( + "expression1, expression_n", + "Expressions to compare and return the smallest value. Can be a constant, column, or function, and any combination of arithmetic operators. Pass as many expression arguments as necessary." + ) + .build() + }) +} + +#[cfg(test)] +mod test { + use crate::core::least::LeastFunc; + use arrow::datatypes::DataType; + use datafusion_expr::ScalarUDFImpl; + + #[test] + fn test_least_return_types_without_common_supertype_in_arg_type() { + let least = LeastFunc::new(); + let return_type = least + .coerce_types(&[DataType::Decimal128(10, 3), DataType::Decimal128(10, 4)]) + .unwrap(); + assert_eq!( + return_type, + vec![DataType::Decimal128(11, 4), DataType::Decimal128(11, 4)] + ); + } +} diff --git a/datafusion/functions/src/core/mod.rs b/datafusion/functions/src/core/mod.rs index bd8305cd56d8..ba8255d2e472 100644 --- a/datafusion/functions/src/core/mod.rs +++ b/datafusion/functions/src/core/mod.rs @@ -26,6 +26,8 @@ pub mod coalesce; pub mod expr_ext; pub mod getfield; pub mod greatest; +mod greatest_least_utils; +pub mod least; pub mod named_struct; pub mod nullif; pub mod nvl; @@ -45,6 +47,7 @@ make_udf_function!(named_struct::NamedStructFunc, named_struct); make_udf_function!(getfield::GetFieldFunc, get_field); make_udf_function!(coalesce::CoalesceFunc, coalesce); make_udf_function!(greatest::GreatestFunc, greatest); +make_udf_function!(least::LeastFunc, least); make_udf_function!(version::VersionFunc, version); pub mod expr_fn { @@ -86,6 +89,10 @@ pub mod expr_fn { greatest, "Returns `greatest(args...)`, which evaluates to the greatest value in the list of expressions or NULL if all the expressions are NULL", args, + ),( + least, + "Returns `least(args...)`, which evaluates to the smallest value in the list of expressions or NULL if all the expressions are NULL", + args, )); #[doc = "Returns the value of the field with the given name from the struct"] @@ -113,6 +120,7 @@ pub fn functions() -> Vec> { get_field(), coalesce(), greatest(), + least(), version(), r#struct(), ] diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs index 82308601490c..48eff4fcd423 100644 --- a/datafusion/functions/src/macros.rs +++ b/datafusion/functions/src/macros.rs @@ -109,24 +109,37 @@ macro_rules! make_stub_package { }; } -/// Downcast an argument to a specific array type, returning an internal error +/// Downcast a named argument to a specific array type, returning an internal error /// if the cast fails /// /// $ARG: ArrayRef /// $NAME: name of the argument (for error messages) /// $ARRAY_TYPE: the type of array to cast the argument to -macro_rules! downcast_arg { +#[macro_export] +macro_rules! downcast_named_arg { ($ARG:expr, $NAME:expr, $ARRAY_TYPE:ident) => {{ $ARG.as_any().downcast_ref::<$ARRAY_TYPE>().ok_or_else(|| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "could not cast {} to {}", $NAME, std::any::type_name::<$ARRAY_TYPE>() - )) + ) })? }}; } +/// Downcast an argument to a specific array type, returning an internal error +/// if the cast fails +/// +/// $ARG: ArrayRef +/// $ARRAY_TYPE: the type of array to cast the argument to +#[macro_export] +macro_rules! downcast_arg { + ($ARG:expr, $ARRAY_TYPE:ident) => {{ + downcast_named_arg!($ARG, "", $ARRAY_TYPE) + }}; +} + /// Macro to create a unary math UDF. /// /// A unary math function takes an argument of type Float32 or Float64, diff --git a/datafusion/functions/src/math/abs.rs b/datafusion/functions/src/math/abs.rs index e3d448083e26..1af5e0dfaf37 100644 --- a/datafusion/functions/src/math/abs.rs +++ b/datafusion/functions/src/math/abs.rs @@ -26,7 +26,7 @@ use arrow::array::{ }; use arrow::datatypes::DataType; use arrow::error::ArrowError; -use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result}; +use datafusion_common::{exec_err, internal_datafusion_err, not_impl_err, Result}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ @@ -39,7 +39,7 @@ type MathArrayFunction = fn(&Vec) -> Result; macro_rules! make_abs_function { ($ARRAY_TYPE:ident) => {{ |args: &Vec| { - let array = downcast_arg!(&args[0], "abs arg", $ARRAY_TYPE); + let array = downcast_named_arg!(&args[0], "abs arg", $ARRAY_TYPE); let res: $ARRAY_TYPE = array.unary(|x| x.abs()); Ok(Arc::new(res) as ArrayRef) } @@ -49,7 +49,7 @@ macro_rules! make_abs_function { macro_rules! make_try_abs_function { ($ARRAY_TYPE:ident) => {{ |args: &Vec| { - let array = downcast_arg!(&args[0], "abs arg", $ARRAY_TYPE); + let array = downcast_named_arg!(&args[0], "abs arg", $ARRAY_TYPE); let res: $ARRAY_TYPE = array.try_unary(|x| { x.checked_abs().ok_or_else(|| { ArrowError::ComputeError(format!( @@ -67,7 +67,7 @@ macro_rules! make_try_abs_function { macro_rules! make_decimal_abs_function { ($ARRAY_TYPE:ident) => {{ |args: &Vec| { - let array = downcast_arg!(&args[0], "abs arg", $ARRAY_TYPE); + let array = downcast_named_arg!(&args[0], "abs arg", $ARRAY_TYPE); let res: $ARRAY_TYPE = array .unary(|x| x.wrapping_abs()) .with_data_type(args[0].data_type().clone()); diff --git a/datafusion/functions/src/math/factorial.rs b/datafusion/functions/src/math/factorial.rs index 083936eb185a..fcc6cb9f067c 100644 --- a/datafusion/functions/src/math/factorial.rs +++ b/datafusion/functions/src/math/factorial.rs @@ -26,7 +26,9 @@ use arrow::datatypes::DataType; use arrow::datatypes::DataType::Int64; use crate::utils::make_scalar_function; -use datafusion_common::{arrow_datafusion_err, exec_err, DataFusionError, Result}; +use datafusion_common::{ + arrow_datafusion_err, exec_err, internal_datafusion_err, DataFusionError, Result, +}; use datafusion_expr::scalar_doc_sections::DOC_SECTION_MATH; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, @@ -99,7 +101,7 @@ fn get_factorial_doc() -> &'static Documentation { fn factorial(args: &[ArrayRef]) -> Result { match args[0].data_type() { Int64 => { - let arg = downcast_arg!((&args[0]), "value", Int64Array); + let arg = downcast_named_arg!((&args[0]), "value", Int64Array); Ok(arg .iter() .map(|a| match a { diff --git a/datafusion/functions/src/math/gcd.rs b/datafusion/functions/src/math/gcd.rs index f4119cd975ab..36c90889666c 100644 --- a/datafusion/functions/src/math/gcd.rs +++ b/datafusion/functions/src/math/gcd.rs @@ -25,7 +25,9 @@ use arrow::datatypes::DataType; use arrow::datatypes::DataType::Int64; use crate::utils::make_scalar_function; -use datafusion_common::{arrow_datafusion_err, exec_err, DataFusionError, Result}; +use datafusion_common::{ + arrow_datafusion_err, exec_err, internal_datafusion_err, DataFusionError, Result, +}; use datafusion_expr::scalar_doc_sections::DOC_SECTION_MATH; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, @@ -100,8 +102,8 @@ fn get_gcd_doc() -> &'static Documentation { fn gcd(args: &[ArrayRef]) -> Result { match args[0].data_type() { Int64 => { - let arg1 = downcast_arg!(&args[0], "x", Int64Array); - let arg2 = downcast_arg!(&args[1], "y", Int64Array); + let arg1 = downcast_named_arg!(&args[0], "x", Int64Array); + let arg2 = downcast_named_arg!(&args[1], "y", Int64Array); Ok(arg1 .iter() diff --git a/datafusion/functions/src/math/lcm.rs b/datafusion/functions/src/math/lcm.rs index 4e5a9b64f6f5..6e9a2a123f3f 100644 --- a/datafusion/functions/src/math/lcm.rs +++ b/datafusion/functions/src/math/lcm.rs @@ -23,7 +23,9 @@ use arrow::datatypes::DataType; use arrow::datatypes::DataType::Int64; use arrow::error::ArrowError; -use datafusion_common::{arrow_datafusion_err, exec_err, DataFusionError, Result}; +use datafusion_common::{ + arrow_datafusion_err, exec_err, internal_datafusion_err, DataFusionError, Result, +}; use datafusion_expr::scalar_doc_sections::DOC_SECTION_MATH; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, @@ -121,8 +123,8 @@ fn lcm(args: &[ArrayRef]) -> Result { match args[0].data_type() { Int64 => { - let arg1 = downcast_arg!(&args[0], "x", Int64Array); - let arg2 = downcast_arg!(&args[1], "y", Int64Array); + let arg1 = downcast_named_arg!(&args[0], "x", Int64Array); + let arg2 = downcast_named_arg!(&args[1], "y", Int64Array); Ok(arg1 .iter() diff --git a/datafusion/functions/src/math/power.rs b/datafusion/functions/src/math/power.rs index 92dd8966b66c..296b2dd3fece 100644 --- a/datafusion/functions/src/math/power.rs +++ b/datafusion/functions/src/math/power.rs @@ -24,8 +24,8 @@ use super::log::LogFunc; use arrow::array::{ArrayRef, AsArray, Int64Array}; use arrow::datatypes::{ArrowNativeTypeOp, DataType, Float64Type}; use datafusion_common::{ - arrow_datafusion_err, exec_datafusion_err, exec_err, plan_datafusion_err, - DataFusionError, Result, ScalarValue, + arrow_datafusion_err, exec_datafusion_err, exec_err, internal_datafusion_err, + plan_datafusion_err, DataFusionError, Result, ScalarValue, }; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::scalar_doc_sections::DOC_SECTION_MATH; @@ -103,8 +103,8 @@ impl ScalarUDFImpl for PowerFunc { Arc::new(result) as _ } DataType::Int64 => { - let bases = downcast_arg!(&args[0], "base", Int64Array); - let exponents = downcast_arg!(&args[1], "exponent", Int64Array); + let bases = downcast_named_arg!(&args[0], "base", Int64Array); + let exponents = downcast_named_arg!(&args[1], "exponent", Int64Array); bases .iter() .zip(exponents.iter()) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index f17b99d81d7b..b8cb7b313bc1 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1578,7 +1578,7 @@ impl SortMergeJoinStream { .append_nulls(num_rows); self.output_record_batches .batch_ids - .extend(vec![0; num_rows]); + .resize(self.output_record_batches.batch_ids.len() + num_rows, 0); self.output_record_batches.batches.push(record_batch); } @@ -1622,7 +1622,7 @@ impl SortMergeJoinStream { .append_nulls(num_rows); self.output_record_batches .batch_ids - .extend(vec![0; num_rows]); + .resize(self.output_record_batches.batch_ids.len() + num_rows, 0); self.output_record_batches.batches.push(record_batch); } buffered_batch.join_filter_not_matched_map.clear(); @@ -1757,10 +1757,10 @@ impl SortMergeJoinStream { self.output_record_batches.filter_mask.extend(pre_mask); } self.output_record_batches.row_indices.extend(&left_indices); - self.output_record_batches.batch_ids.extend(vec![ - self.streamed_batch_counter.load(Relaxed); - left_indices.len() - ]); + self.output_record_batches.batch_ids.resize( + self.output_record_batches.batch_ids.len() + left_indices.len(), + self.streamed_batch_counter.load(Relaxed), + ); // For outer joins, we need to push the null joined rows to the output if // all joined rows are failed on the join filter. diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 458c1c29c0cf..258e234b35c7 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -99,19 +99,25 @@ pub(crate) struct SortPreservingMergeStream { /// Configuration parameter to enable round-robin selection of tied winners of loser tree. /// - /// To address the issue of unbalanced polling between partitions due to tie-breakers being based - /// on partition index, especially in cases of low cardinality, we are making changes to the winner - /// selection mechanism. Previously, partitions with smaller indices were consistently chosen as the winners, - /// leading to an uneven distribution of polling. This caused upstream operator buffers for the other partitions - /// to grow excessively, as they continued receiving data without consuming it. + /// This option controls the tie-breaker strategy and attempts to avoid the + /// issue of unbalanced polling between partitions /// - /// For example, an upstream operator like a repartition execution would keep sending data to certain partitions, - /// but those partitions wouldn't consume the data if they weren't selected as winners. This resulted in inefficient buffer usage. + /// If `true`, when multiple partitions have the same value, the partition + /// that has the fewest poll counts is selected. This strategy ensures that + /// multiple partitions with the same value are chosen equally, distributing + /// the polling load in a round-robin fashion. This approach balances the + /// workload more effectively across partitions and avoids excessive buffer + /// growth. /// - /// To resolve this, we are modifying the tie-breaking logic. Instead of always choosing the partition with the smallest index, - /// we now select the partition that has the fewest poll counts for the same value. - /// This ensures that multiple partitions with the same value are chosen equally, distributing the polling load in a round-robin fashion. - /// This approach balances the workload more effectively across partitions and avoids excessive buffer growth. + /// if `false`, partitions with smaller indices are consistently chosen as + /// the winners, which can lead to an uneven distribution of polling and potentially + /// causing upstream operator buffers for the other partitions to grow + /// excessively, as they continued receiving data without consuming it. + /// + /// For example, an upstream operator like `RepartitonExec` execution would + /// keep sending data to certain partitions, but those partitions wouldn't + /// consume the data if they weren't selected as winners. This resulted in + /// inefficient buffer usage. enable_round_robin_tie_breaker: bool, /// Flag indicating whether we are in the mode of round-robin diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 21597fb85662..adcb28e538fd 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Defines the sort preserving merge plan +//! [`SortPreservingMergeExec`] merges multiple sorted streams into one sorted stream. use std::any::Any; use std::sync::Arc; @@ -38,10 +38,22 @@ use log::{debug, trace}; /// Sort preserving merge execution plan /// -/// This takes an input execution plan and a list of sort expressions, and -/// provided each partition of the input plan is sorted with respect to -/// these sort expressions, this operator will yield a single partition -/// that is also sorted with respect to them +/// # Overview +/// +/// This operator implements a K-way merge. It is used to merge multiple sorted +/// streams into a single sorted stream and is highly optimized. +/// +/// ## Inputs: +/// +/// 1. A list of sort expressions +/// 2. An input plan, where each partition is sorted with respect to +/// these sort expressions. +/// +/// ## Output: +/// +/// 1. A single partition that is also sorted with respect to the expressions +/// +/// ## Diagram /// /// ```text /// ┌─────────────────────────┐ @@ -55,12 +67,12 @@ use log::{debug, trace}; /// ┌─────────────────────────┐ │ └───────────────────┘ └─┬─────┴───────────────────────┘ /// │ ╔═══╦═══╗ │ │ /// │ ║ B ║ E ║ ... │──┘ │ -/// │ ╚═══╩═══╝ │ Note Stable Sort: the merged stream -/// └─────────────────────────┘ places equal rows from stream 1 +/// │ ╚═══╩═══╝ │ Stable sort if `enable_round_robin_repartition=false`: +/// └─────────────────────────┘ the merged stream places equal rows from stream 1 /// Stream 2 /// /// -/// Input Streams Output stream +/// Input Partitions Output Partition /// (sorted) (sorted) /// ``` /// @@ -70,7 +82,7 @@ use log::{debug, trace}; /// the output and inputs are not polled again. #[derive(Debug, Clone)] pub struct SortPreservingMergeExec { - /// Input plan + /// Input plan with sorted partitions input: Arc, /// Sort expressions expr: LexOrdering, @@ -80,7 +92,9 @@ pub struct SortPreservingMergeExec { fetch: Option, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, - /// Configuration parameter to enable round-robin selection of tied winners of loser tree. + /// Use round-robin selection of tied winners of loser tree + /// + /// See [`Self::with_round_robin_repartition`] for more information. enable_round_robin_repartition: bool, } @@ -105,6 +119,14 @@ impl SortPreservingMergeExec { } /// Sets the selection strategy of tied winners of the loser tree algorithm + /// + /// If true (the default) equal output rows are placed in the merged stream + /// in round robin fashion. This approach consumes input streams at more + /// even rates when there are many rows with the same sort key. + /// + /// If false, equal output rows are always placed in the merged stream in + /// the order of the inputs, resulting in potentially slower execution but a + /// stable output order. pub fn with_round_robin_repartition( mut self, enable_round_robin_repartition: bool, @@ -128,7 +150,8 @@ impl SortPreservingMergeExec { self.fetch } - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + /// Creates the cache object that stores the plan properties + /// such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( input: &Arc, ordering: LexOrdering, diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 2178cc012a10..448d70760de1 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -120,6 +120,10 @@ impl<'a> StreamingMergeBuilder<'a> { self } + /// See [SortPreservingMergeExec::with_round_robin_repartition] for more + /// information. + /// + /// [SortPreservingMergeExec::with_round_robin_repartition]: crate::sorts::sort_preserving_merge::SortPreservingMergeExec::with_round_robin_repartition pub fn with_round_robin_tie_breaker( mut self, enable_round_robin_tie_breaker: bool, diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index e264b9083cc0..4fa359ebe00d 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -62,8 +62,8 @@ use sqlparser::ast::{ Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable, CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert, ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr, - ShowCreateObject, Statement, TableConstraint, TableFactor, TableWithJoins, - TransactionMode, UnaryOperator, Value, + ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor, + TableWithJoins, TransactionMode, UnaryOperator, Value, }; use sqlparser::parser::ParserError::ParserError; @@ -811,6 +811,10 @@ impl SqlToRel<'_, S> { self.show_columns_to_plan(extended, full, table_name) } + Statement::ShowFunctions { filter, .. } => { + self.show_functions_to_plan(filter) + } + Statement::Insert(Insert { or, into, @@ -1980,6 +1984,90 @@ impl SqlToRel<'_, S> { self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1 } + /// Rewrite `SHOW FUNCTIONS` to another SQL query + /// The query is based on the `information_schema.routines` and `information_schema.parameters` tables + /// + /// The output columns: + /// - function_name: The name of function + /// - return_type: The return type of the function + /// - parameters: The name of parameters (ordered by the ordinal position) + /// - parameter_types: The type of parameters (ordered by the ordinal position) + /// - description: The description of the function (the description defined in the document) + /// - syntax_example: The syntax_example of the function (the syntax_example defined in the document) + fn show_functions_to_plan( + &self, + filter: Option, + ) -> Result { + let where_clause = if let Some(filter) = filter { + match filter { + ShowStatementFilter::Like(like) => { + format!("WHERE p.function_name like '{like}'") + } + _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"), + } + } else { + "".to_string() + }; + + let query = format!( + r#" +SELECT DISTINCT + p.*, + r.function_type function_type, + r.description description, + r.syntax_example syntax_example +FROM + ( + SELECT + i.specific_name function_name, + o.data_type return_type, + array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters, + array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types + FROM ( + SELECT + specific_catalog, + specific_schema, + specific_name, + ordinal_position, + parameter_name, + data_type, + rid + FROM + information_schema.parameters + WHERE + parameter_mode = 'IN' + ) i + JOIN + ( + SELECT + specific_catalog, + specific_schema, + specific_name, + ordinal_position, + parameter_name, + data_type, + rid + FROM + information_schema.parameters + WHERE + parameter_mode = 'OUT' + ) o + ON i.specific_catalog = o.specific_catalog + AND i.specific_schema = o.specific_schema + AND i.specific_name = o.specific_name + AND i.rid = o.rid + GROUP BY 1, 2, i.rid + ) as p +JOIN information_schema.routines r +ON p.function_name = r.routine_name +{where_clause} + "# + ); + let mut rewrite = DFParser::parse_sql(&query)?; + assert_eq!(rewrite.len(), 1); + self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1 + } + fn show_create_table_to_plan( &self, sql_table_name: ObjectName, diff --git a/datafusion/sqllogictest/test_files/functions.slt b/datafusion/sqllogictest/test_files/functions.slt index 4b770a19fe20..4213de0235e4 100644 --- a/datafusion/sqllogictest/test_files/functions.slt +++ b/datafusion/sqllogictest/test_files/functions.slt @@ -955,3 +955,201 @@ Infinity statement ok drop table t1 + +# test for least +statement ok +CREATE TABLE t1 (a int, b int, c int) as VALUES +(4, NULL, NULL), +(1, 2, 3), +(3, 1, 2), +(1, NULL, -1), +(NULL, NULL, NULL), +(3, 0, -1); + +query I +SELECT least(a, b, c) FROM t1 +---- +4 +1 +1 +-1 +NULL +-1 + +statement ok +drop table t1 + +query I +SELECT least(1) +---- +1 + +query I +SELECT least(1, 2) +---- +1 + +query I +SELECT least(3, 1) +---- +1 + +query ? +SELECT least(NULL) +---- +NULL + +query I +SELECT least(1, NULL, -1) +---- +-1 + +query I +SELECT least((3), (0), (-1)); +---- +-1 + +query ? +SELECT least([4, 3], [4, 2], [4, 4]); +---- +[4, 2] + +query ? +SELECT least([2, 3], [1, 4], [5, 0]); +---- +[1, 4] + +query I +SELECT least(1::int, 2::text) +---- +1 + +query R +SELECT least(-1, 1, 2.3, 123456789, 3 + 5, -(-4)) +---- +-1 + +query R +SELECT least(-1.123, 1.21313, 2.3, 123456789.321, 3 + 5.3213, -(-4.3213), abs(-9)) +---- +-1.123 + +query R +SELECT least(-1, 1, 2.3, 123456789, 3 + 5, -(-4), abs(-9.0)) +---- +-1 + + +query error least does not support zero arguments +SELECT least() + +query I +SELECT least(4, 5, 7, 1, 2) +---- +1 + +query I +SELECT least(4, NULL, 7, 1, 2) +---- +1 + +query I +SELECT least(NULL, NULL, 7, NULL, 2) +---- +2 + +query I +SELECT least(NULL, NULL, NULL, NULL, 2) +---- +2 + +query I +SELECT least(2, NULL, NULL, NULL, NULL) +---- +2 + +query ? +SELECT least(NULL, NULL, NULL) +---- +NULL + +query I +SELECT least(2, '4') +---- +2 + +query T +SELECT least('foo', 'bar', 'foobar') +---- +bar + +query R +SELECT least(1, 1.2) +---- +1 + +statement ok +CREATE TABLE foo (a int) + +statement ok +INSERT INTO foo (a) VALUES (1) + +# Test homogenous functions that can't be constant folded. +query I +SELECT least(NULL, a, 5, NULL) FROM foo +---- +1 + +query I +SELECT least(NULL, NULL, NULL, a, -1) FROM foo +---- +-1 + +statement ok +drop table foo + +query R +select least(arrow_cast('NAN','Float64'), arrow_cast('NAN','Float64')) +---- +NaN + +query R +select least(arrow_cast('NAN','Float64'), arrow_cast('NAN','Float32')) +---- +NaN + +query R +select least(arrow_cast('NAN','Float64'), '+Inf'::Double) +---- +Infinity + +query R +select least(arrow_cast('NAN','Float64'), NULL) +---- +NaN + +query R +select least(NULL, '+Inf'::Double) +---- +Infinity + +query R +select least(NULL, '-Inf'::Double) +---- +-Infinity + +statement ok +CREATE TABLE t1 (a double, b double, c double) as VALUES +(1, arrow_cast('NAN', 'Float64'), '+Inf'::Double), +(NULL, arrow_cast('NAN','Float64'), '+Inf'::Double), +(1, '+Inf'::Double, NULL); + +query R +SELECT least(a, b, c) FROM t1 +---- +1 +Infinity +1 + +statement ok +drop table t1 diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b25d78cc39ce..a02975b2ef85 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -623,19 +623,19 @@ select routine_name, data_type, function_type from information_schema.routines w string_agg LargeUtf8 AGGREGATE # test every function type are included in the result -query TTTTTTTBTTT rowsort -select * from information_schema.routines where routine_name = 'date_trunc' OR routine_name = 'string_agg' OR routine_name = 'rank'; ----- -datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Microsecond, None) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Microsecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Millisecond, None) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Millisecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Nanosecond, None) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Nanosecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Second, None) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Second, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. -datafusion public rank datafusion public rank FUNCTION true NULL WINDOW Returns the rank of the current row within its partition, allowing gaps between ranks. This function provides a ranking similar to `row_number`, but skips ranks for identical values. -datafusion public string_agg datafusion public string_agg FUNCTION true LargeUtf8 AGGREGATE Concatenates the values of string expressions and places separator values between them. +query TTTTTTTBTTTT rowsort +select * from information_schema.routines where routine_name = 'date_trunc' OR routine_name = 'string_agg' OR routine_name = 'rank' ORDER BY routine_name +---- +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Microsecond, None) SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Microsecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Millisecond, None) SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Millisecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Nanosecond, None) SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Nanosecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Second, None) SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Second, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +datafusion public rank datafusion public rank FUNCTION true NULL WINDOW Returns the rank of the current row within its partition, allowing gaps between ranks. This function provides a ranking similar to `row_number`, but skips ranks for identical values. rank() +datafusion public string_agg datafusion public string_agg FUNCTION true LargeUtf8 AGGREGATE Concatenates the values of string expressions and places separator values between them. string_agg(expression, delimiter) query B select is_deterministic from information_schema.routines where routine_name = 'now'; @@ -643,51 +643,111 @@ select is_deterministic from information_schema.routines where routine_name = 'n false # test every function type are included in the result -query TTTITTTTB rowsort -select * from information_schema.parameters where specific_name = 'date_trunc' OR specific_name = 'string_agg' OR specific_name = 'rank'; ----- -datafusion public date_trunc 1 IN precision Utf8 NULL false -datafusion public date_trunc 1 IN precision Utf8View NULL false -datafusion public date_trunc 1 OUT NULL Timestamp(Microsecond, None) NULL false -datafusion public date_trunc 1 OUT NULL Timestamp(Microsecond, Some("+TZ")) NULL false -datafusion public date_trunc 1 OUT NULL Timestamp(Millisecond, None) NULL false -datafusion public date_trunc 1 OUT NULL Timestamp(Millisecond, Some("+TZ")) NULL false -datafusion public date_trunc 1 OUT NULL Timestamp(Nanosecond, None) NULL false -datafusion public date_trunc 1 OUT NULL Timestamp(Nanosecond, Some("+TZ")) NULL false -datafusion public date_trunc 1 OUT NULL Timestamp(Second, None) NULL false -datafusion public date_trunc 1 OUT NULL Timestamp(Second, Some("+TZ")) NULL false -datafusion public date_trunc 2 IN expression Timestamp(Microsecond, None) NULL false -datafusion public date_trunc 2 IN expression Timestamp(Microsecond, Some("+TZ")) NULL false -datafusion public date_trunc 2 IN expression Timestamp(Millisecond, None) NULL false -datafusion public date_trunc 2 IN expression Timestamp(Millisecond, Some("+TZ")) NULL false -datafusion public date_trunc 2 IN expression Timestamp(Nanosecond, None) NULL false -datafusion public date_trunc 2 IN expression Timestamp(Nanosecond, Some("+TZ")) NULL false -datafusion public date_trunc 2 IN expression Timestamp(Second, None) NULL false -datafusion public date_trunc 2 IN expression Timestamp(Second, Some("+TZ")) NULL false -datafusion public string_agg 1 IN expression LargeUtf8 NULL false -datafusion public string_agg 1 OUT NULL LargeUtf8 NULL false -datafusion public string_agg 2 IN delimiter LargeUtf8 NULL false -datafusion public string_agg 2 IN delimiter Null NULL false -datafusion public string_agg 2 IN delimiter Utf8 NULL false +query TTTITTTTBI +select * from information_schema.parameters where specific_name = 'date_trunc' OR specific_name = 'string_agg' OR specific_name = 'rank' ORDER BY specific_name, rid; +---- +datafusion public date_trunc 1 IN precision Utf8 NULL false 0 +datafusion public date_trunc 2 IN expression Timestamp(Nanosecond, None) NULL false 0 +datafusion public date_trunc 1 OUT NULL Timestamp(Nanosecond, None) NULL false 0 +datafusion public date_trunc 1 IN precision Utf8View NULL false 1 +datafusion public date_trunc 2 IN expression Timestamp(Nanosecond, None) NULL false 1 +datafusion public date_trunc 1 OUT NULL Timestamp(Nanosecond, None) NULL false 1 +datafusion public date_trunc 1 IN precision Utf8 NULL false 2 +datafusion public date_trunc 2 IN expression Timestamp(Nanosecond, Some("+TZ")) NULL false 2 +datafusion public date_trunc 1 OUT NULL Timestamp(Nanosecond, Some("+TZ")) NULL false 2 +datafusion public date_trunc 1 IN precision Utf8View NULL false 3 +datafusion public date_trunc 2 IN expression Timestamp(Nanosecond, Some("+TZ")) NULL false 3 +datafusion public date_trunc 1 OUT NULL Timestamp(Nanosecond, Some("+TZ")) NULL false 3 +datafusion public date_trunc 1 IN precision Utf8 NULL false 4 +datafusion public date_trunc 2 IN expression Timestamp(Microsecond, None) NULL false 4 +datafusion public date_trunc 1 OUT NULL Timestamp(Microsecond, None) NULL false 4 +datafusion public date_trunc 1 IN precision Utf8View NULL false 5 +datafusion public date_trunc 2 IN expression Timestamp(Microsecond, None) NULL false 5 +datafusion public date_trunc 1 OUT NULL Timestamp(Microsecond, None) NULL false 5 +datafusion public date_trunc 1 IN precision Utf8 NULL false 6 +datafusion public date_trunc 2 IN expression Timestamp(Microsecond, Some("+TZ")) NULL false 6 +datafusion public date_trunc 1 OUT NULL Timestamp(Microsecond, Some("+TZ")) NULL false 6 +datafusion public date_trunc 1 IN precision Utf8View NULL false 7 +datafusion public date_trunc 2 IN expression Timestamp(Microsecond, Some("+TZ")) NULL false 7 +datafusion public date_trunc 1 OUT NULL Timestamp(Microsecond, Some("+TZ")) NULL false 7 +datafusion public date_trunc 1 IN precision Utf8 NULL false 8 +datafusion public date_trunc 2 IN expression Timestamp(Millisecond, None) NULL false 8 +datafusion public date_trunc 1 OUT NULL Timestamp(Millisecond, None) NULL false 8 +datafusion public date_trunc 1 IN precision Utf8View NULL false 9 +datafusion public date_trunc 2 IN expression Timestamp(Millisecond, None) NULL false 9 +datafusion public date_trunc 1 OUT NULL Timestamp(Millisecond, None) NULL false 9 +datafusion public date_trunc 1 IN precision Utf8 NULL false 10 +datafusion public date_trunc 2 IN expression Timestamp(Millisecond, Some("+TZ")) NULL false 10 +datafusion public date_trunc 1 OUT NULL Timestamp(Millisecond, Some("+TZ")) NULL false 10 +datafusion public date_trunc 1 IN precision Utf8View NULL false 11 +datafusion public date_trunc 2 IN expression Timestamp(Millisecond, Some("+TZ")) NULL false 11 +datafusion public date_trunc 1 OUT NULL Timestamp(Millisecond, Some("+TZ")) NULL false 11 +datafusion public date_trunc 1 IN precision Utf8 NULL false 12 +datafusion public date_trunc 2 IN expression Timestamp(Second, None) NULL false 12 +datafusion public date_trunc 1 OUT NULL Timestamp(Second, None) NULL false 12 +datafusion public date_trunc 1 IN precision Utf8View NULL false 13 +datafusion public date_trunc 2 IN expression Timestamp(Second, None) NULL false 13 +datafusion public date_trunc 1 OUT NULL Timestamp(Second, None) NULL false 13 +datafusion public date_trunc 1 IN precision Utf8 NULL false 14 +datafusion public date_trunc 2 IN expression Timestamp(Second, Some("+TZ")) NULL false 14 +datafusion public date_trunc 1 OUT NULL Timestamp(Second, Some("+TZ")) NULL false 14 +datafusion public date_trunc 1 IN precision Utf8View NULL false 15 +datafusion public date_trunc 2 IN expression Timestamp(Second, Some("+TZ")) NULL false 15 +datafusion public date_trunc 1 OUT NULL Timestamp(Second, Some("+TZ")) NULL false 15 +datafusion public string_agg 1 IN expression LargeUtf8 NULL false 0 +datafusion public string_agg 2 IN delimiter Utf8 NULL false 0 +datafusion public string_agg 1 OUT NULL LargeUtf8 NULL false 0 +datafusion public string_agg 1 IN expression LargeUtf8 NULL false 1 +datafusion public string_agg 2 IN delimiter LargeUtf8 NULL false 1 +datafusion public string_agg 1 OUT NULL LargeUtf8 NULL false 1 +datafusion public string_agg 1 IN expression LargeUtf8 NULL false 2 +datafusion public string_agg 2 IN delimiter Null NULL false 2 +datafusion public string_agg 1 OUT NULL LargeUtf8 NULL false 2 # test variable length arguments -query TTTB rowsort -select specific_name, data_type, parameter_mode, is_variadic from information_schema.parameters where specific_name = 'concat'; +query TTTBI rowsort +select specific_name, data_type, parameter_mode, is_variadic, rid from information_schema.parameters where specific_name = 'concat'; ---- -concat LargeUtf8 IN true -concat LargeUtf8 OUT false -concat Utf8 IN true -concat Utf8 OUT false -concat Utf8View IN true -concat Utf8View OUT false +concat LargeUtf8 IN true 2 +concat LargeUtf8 OUT false 2 +concat Utf8 IN true 1 +concat Utf8 OUT false 1 +concat Utf8View IN true 0 +concat Utf8View OUT false 0 # test ceorcion signature -query TTIT rowsort -select specific_name, data_type, ordinal_position, parameter_mode from information_schema.parameters where specific_name = 'repeat'; ----- -repeat Int64 2 IN -repeat LargeUtf8 1 IN -repeat LargeUtf8 1 OUT -repeat Utf8 1 IN -repeat Utf8 1 OUT -repeat Utf8View 1 IN +query TTITI rowsort +select specific_name, data_type, ordinal_position, parameter_mode, rid from information_schema.parameters where specific_name = 'repeat'; +---- +repeat Int64 2 IN 0 +repeat Int64 2 IN 1 +repeat Int64 2 IN 2 +repeat LargeUtf8 1 IN 1 +repeat LargeUtf8 1 OUT 1 +repeat Utf8 1 IN 0 +repeat Utf8 1 OUT 0 +repeat Utf8 1 OUT 2 +repeat Utf8View 1 IN 2 + +query TT??TTT rowsort +show functions like 'date_trunc'; +---- +date_trunc Timestamp(Microsecond, None) [precision, expression] [Utf8, Timestamp(Microsecond, None)] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Microsecond, None) [precision, expression] [Utf8View, Timestamp(Microsecond, None)] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Microsecond, Some("+TZ")) [precision, expression] [Utf8, Timestamp(Microsecond, Some("+TZ"))] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Microsecond, Some("+TZ")) [precision, expression] [Utf8View, Timestamp(Microsecond, Some("+TZ"))] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Millisecond, None) [precision, expression] [Utf8, Timestamp(Millisecond, None)] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Millisecond, None) [precision, expression] [Utf8View, Timestamp(Millisecond, None)] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Millisecond, Some("+TZ")) [precision, expression] [Utf8, Timestamp(Millisecond, Some("+TZ"))] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Millisecond, Some("+TZ")) [precision, expression] [Utf8View, Timestamp(Millisecond, Some("+TZ"))] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Nanosecond, None) [precision, expression] [Utf8, Timestamp(Nanosecond, None)] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Nanosecond, None) [precision, expression] [Utf8View, Timestamp(Nanosecond, None)] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Nanosecond, Some("+TZ")) [precision, expression] [Utf8, Timestamp(Nanosecond, Some("+TZ"))] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Nanosecond, Some("+TZ")) [precision, expression] [Utf8View, Timestamp(Nanosecond, Some("+TZ"))] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Second, None) [precision, expression] [Utf8, Timestamp(Second, None)] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Second, None) [precision, expression] [Utf8View, Timestamp(Second, None)] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Second, Some("+TZ")) [precision, expression] [Utf8, Timestamp(Second, Some("+TZ"))] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) +date_trunc Timestamp(Second, Some("+TZ")) [precision, expression] [Utf8View, Timestamp(Second, Some("+TZ"))] SCALAR Truncates a timestamp value to a specified precision. date_trunc(precision, expression) + +statement ok +show functions diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 4e74cfc54ae5..2e4147f96e0f 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -549,6 +549,7 @@ trunc(numeric_expression[, decimal_places]) - [coalesce](#coalesce) - [greatest](#greatest) - [ifnull](#ifnull) +- [least](#least) - [nullif](#nullif) - [nvl](#nvl) - [nvl2](#nvl2) @@ -603,6 +604,29 @@ greatest(expression1[, ..., expression_n]) _Alias of [nvl](#nvl)._ +### `least` + +Returns the smallest value in a list of expressions. Returns _null_ if all expressions are _null_. + +``` +least(expression1[, ..., expression_n]) +``` + +#### Arguments + +- **expression1, expression_n**: Expressions to compare and return the smallest value. Can be a constant, column, or function, and any combination of arithmetic operators. Pass as many expression arguments as necessary. + +#### Example + +```sql +> select least(4, 7, 5); ++---------------------------+ +| least(4,7,5) | ++---------------------------+ +| 4 | ++---------------------------+ +``` + ### `nullif` Returns _null_ if _expression1_ equals _expression2_; otherwise it returns _expression1_.