Skip to content

Commit

Permalink
Move many udf implementations from invoke to invoke_batch (#13491)
Browse files Browse the repository at this point in the history
* Added support for `ScalarUDFImpl::invoke_with_return_type` where the invoke is passed the return type created for the udf instance

* Move from invoke to invoke batch

* ex

* of

* docs

* fx

* fx

* fx

* fix

* fix

* Do not yet deprecate invoke_batch, add docs to invoke_with_args

* add ticket reference

* fix

* fix

* fix

* fix

* fmt

* fmt

* remove invoke

* fix agg

* unused

* update func docs

* update tests and remove deprecation

* remove dep

* oops

* internal as vec

* dep

* fixup

* fixup

* fix

* fix

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
joseph-isaacs and alamb authored Nov 26, 2024
1 parent ed227c0 commit 25cb812
Show file tree
Hide file tree
Showing 144 changed files with 1,079 additions and 410 deletions.
6 changes: 5 additions & 1 deletion datafusion-examples/examples/advanced_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ impl ScalarUDFImpl for PowUdf {
///
/// However, it also means the implementation is more complex than when
/// using `create_udf`.
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
// DataFusion has arranged for the correct inputs to be passed to this
// function, but we check again to make sure
assert_eq!(args.len(), 2);
Expand Down
8 changes: 5 additions & 3 deletions datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{exec_err, internal_err, DataFusionError};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{CreateFunction, Expr, ScalarUDF, ScalarUDFImpl, Signature};
use datafusion_expr::{
CreateFunction, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
};

/// This example shows how to utilize [FunctionFactory] to implement simple
/// SQL-macro like functions using a `CREATE FUNCTION` statement. The same
Expand Down Expand Up @@ -132,9 +134,9 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
Ok(self.return_type.clone())
}

fn invoke(
fn invoke_with_args(
&self,
_args: &[datafusion_expr::ColumnarValue],
_args: ScalarFunctionArgs,
) -> Result<datafusion_expr::ColumnarValue> {
// Since this function is always simplified to another expression, it
// should never actually be invoked
Expand Down
6 changes: 5 additions & 1 deletion datafusion-examples/examples/optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ impl ScalarUDFImpl for MyEq {
Ok(DataType::Boolean)
}

fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
_args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
// this example simply returns "true" which is not what a real
// implementation would do.
Ok(ColumnarValue::Scalar(ScalarValue::from(true)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,11 @@ mod tests {
Ok(DataType::Int32)
}

fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
_args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
unimplemented!("DummyUDF::invoke")
}
}
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/tests/fuzz_cases/equivalence/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,11 @@ impl ScalarUDFImpl for TestScalarUDF {
Ok(input[0].sort_properties)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;

let arr: ArrayRef = match args[0].data_type() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,6 @@ impl ScalarUDFImpl for AddIndexToStringVolatileScalarUDF {
Ok(self.return_type.clone())
}

fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
not_impl_err!("index_with_offset function does not accept arguments")
}

fn invoke_batch(
&self,
args: &[ColumnarValue],
Expand Down Expand Up @@ -720,7 +716,11 @@ impl ScalarUDFImpl for CastToI64UDF {
Ok(ExprSimplifyResult::Simplified(new_expr))
}

fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
_args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
unimplemented!("Function should have been simplified prior to evaluation")
}
}
Expand Down Expand Up @@ -848,7 +848,11 @@ impl ScalarUDFImpl for TakeUDF {
}

// The actual implementation
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
let take_idx = match &args[2] {
ColumnarValue::Scalar(ScalarValue::Int64(Some(v))) if v < &2 => *v as usize,
_ => unreachable!(),
Expand Down Expand Up @@ -956,7 +960,11 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
Ok(self.return_type.clone())
}

fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
_args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
internal_err!("This function should not get invoked!")
}

Expand Down Expand Up @@ -1240,7 +1248,11 @@ impl ScalarUDFImpl for MyRegexUdf {
}
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
match args {
[ColumnarValue::Scalar(ScalarValue::Utf8(value))] => {
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(
Expand Down
7 changes: 5 additions & 2 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2389,7 +2389,7 @@ mod test {
use crate::expr_fn::col;
use crate::{
case, lit, qualified_wildcard, wildcard, wildcard_with_options, ColumnarValue,
ScalarUDF, ScalarUDFImpl, Volatility,
ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Volatility,
};
use sqlparser::ast;
use sqlparser::ast::{Ident, IdentWithAlias};
Expand Down Expand Up @@ -2518,7 +2518,10 @@ mod test {
Ok(DataType::Utf8)
}

fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_with_args(
&self,
_args: ScalarFunctionArgs,
) -> Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::from("a")))
}
}
Expand Down
6 changes: 5 additions & 1 deletion datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,11 @@ impl ScalarUDFImpl for SimpleScalarUDF {
Ok(self.return_type.clone())
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
(self.fun)(args)
}
}
Expand Down
3 changes: 0 additions & 3 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,11 @@ impl ScalarUDF {
self.inner.is_nullable(args, schema)
}

#[deprecated(since = "43.0.0", note = "Use `invoke_with_args` instead")]
pub fn invoke_batch(
&self,
args: &[ColumnarValue],
number_rows: usize,
) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke_batch(args, number_rows)
}

Expand Down Expand Up @@ -544,7 +542,6 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
/// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments
/// to arrays, which will likely be simpler code, but be slower.
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.invoke_batch(args.args, args.number_rows)
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions-nested/benches/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ fn criterion_benchmark(c: &mut Criterion) {

b.iter(|| {
black_box(
#[allow(deprecated)] // TODO use invoke_batch
// TODO use invoke_with_args
map_udf()
.invoke(&[keys.clone(), values.clone()])
.invoke_batch(&[keys.clone(), values.clone()], 1)
.expect("map should work on valid values"),
);
});
Expand Down
18 changes: 15 additions & 3 deletions datafusion/functions-nested/src/array_has.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ impl ScalarUDFImpl for ArrayHas {
Ok(DataType::Boolean)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
match &args[1] {
ColumnarValue::Array(array_needle) => {
// the needle is already an array, convert the haystack to an array of the same length
Expand Down Expand Up @@ -321,7 +325,11 @@ impl ScalarUDFImpl for ArrayHasAll {
Ok(DataType::Boolean)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(array_has_all_inner)(args)
}

Expand Down Expand Up @@ -401,7 +409,11 @@ impl ScalarUDFImpl for ArrayHasAny {
Ok(DataType::Boolean)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(array_has_any_inner)(args)
}

Expand Down
6 changes: 5 additions & 1 deletion datafusion/functions-nested/src/cardinality.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ impl ScalarUDFImpl for Cardinality {
})
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(cardinality_inner)(args)
}

Expand Down
18 changes: 15 additions & 3 deletions datafusion/functions-nested/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ impl ScalarUDFImpl for ArrayAppend {
Ok(arg_types[0].clone())
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(array_append_inner)(args)
}

Expand Down Expand Up @@ -181,7 +185,11 @@ impl ScalarUDFImpl for ArrayPrepend {
Ok(arg_types[1].clone())
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(array_prepend_inner)(args)
}

Expand Down Expand Up @@ -300,7 +308,11 @@ impl ScalarUDFImpl for ArrayConcat {
Ok(expr_type)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(array_concat_inner)(args)
}

Expand Down
12 changes: 10 additions & 2 deletions datafusion/functions-nested/src/dimension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ impl ScalarUDFImpl for ArrayDims {
})
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(array_dims_inner)(args)
}

Expand Down Expand Up @@ -165,7 +169,11 @@ impl ScalarUDFImpl for ArrayNdims {
})
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(array_ndims_inner)(args)
}

Expand Down
6 changes: 5 additions & 1 deletion datafusion/functions-nested/src/distance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ impl ScalarUDFImpl for ArrayDistance {
Ok(result)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(array_distance_inner)(args)
}

Expand Down
6 changes: 5 additions & 1 deletion datafusion/functions-nested/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ impl ScalarUDFImpl for ArrayEmpty {
})
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(array_empty_inner)(args)
}

Expand Down
6 changes: 5 additions & 1 deletion datafusion/functions-nested/src/except.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ impl ScalarUDFImpl for ArrayExcept {
}
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(array_except_inner)(args)
}

Expand Down
Loading

0 comments on commit 25cb812

Please sign in to comment.