Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A new interface for Scalar Functions #7978

Closed
wants to merge 1 commit into from

Conversation

2010YOUY01
Copy link
Contributor

Which issue does this PR close?

1.1 in #7977

Rationale for this change

See #7977

What changes are included in this PR?

  1. New interface for ScalarFunctions ScalarFunctionDef
  2. Introduce a new Logical Expr node for the new interface (as a transition step, eventually both current built-in functions and scalar UDFs will be all using this as the internal Expr representation)
  3. Build an adapter between current built-in functions and the new interface (impl ScalarFunctionDef for BuiltinScalarFunctions), and replace SQL execution code for built-in functions with the new interface

Remaining tasks

  • Fix one broken tpcds test case (the error seems not straightforward 😢 )
  • More doc comments and examples for the new interface after finalizing and agreeing on the design

Are these changes tested?

Should be covered by existing SQL tests

Are there any user-facing changes?

New UDF interface

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate substrait labels Oct 30, 2023
/// `execute()` and `execute_raw()` are two possible alternative for function definition:
/// If returns `false`, `execute()` will be used for execution;
/// If returns `true`, `execute_raw()` will be called.
fn use_execute_raw_instead(&self) -> bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rational for this:
built-in functions now have two kinds of implementation

  1. execute() -- It's a more common and easy-to-implement interface for UDFs, and can be converted to the more general execute_raw() case using make_scalar_function()
  2. execute_raw() -- Fewer existing functions are directly implemented using this interface

Though a single execute_raw() can cover all existing cases, this design can make the general case easier to implement for UDFs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a single execute method that takes ColumnarValues and put in the documentation how to go from ColumnarValue --> ArrayRef for simple, initial implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should make the initial implementation concise

}

/// Defines the return type behavior of a function.
pub enum FunctionReturnType {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now like 99% of built-in functions are either SameAsFirstArg or FixedType, only very rare array functions can only be defined using lambda. This way can make the new interface a little bit easier to use.
(also possible to extend to address #7657)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about a signature like this:

pub trait ScalarFunctionDef: Any + Sync + Send + std::fmt::Debug {
...
    /// What type will this function return, given arguments of the specified input types?
    /// By default, returns the same type as the first argument
    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
      arg_types.get(0)
        .ok_or_else(Error ("Implementation of Function {} did not specify a return type, and there are no arguments"))
  }
...
}

Then I think most function implementations can be left as the default or as

impl ScalarFunctionDef for Foo {
    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
      Ok(DataType::Utf8)
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returning enum FunctionReturnType approach's advantage is being able to extend to solve #7657, otherwise we have to extend the function interface to address that issue (though I'm not sure if that requirement is common, should we consider that case?)
And its limitation is harder to use when the return type is actually some complex lambda, but only for very few array functions.

self.monotonicity()
}

// execution functions are defined in `BuiltinScalarFunctionWrapper`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All execution code for BuiltinScalarFunction are in phys-expr crate (which depends on this crate), so they're defined elsewhere

pub(crate) struct BuiltinScalarFunctionWrapper {
func: Arc<dyn ScalarFunctionDef>,
// functions like `now()` requires per-execution properties
execution_props: ExecutionProps,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few functions like now() require the core to pass some information to it, when migrating those functions, we can extend the trait ScalarFunctionDef with a optional method set_execution_props(exec_props: ExecutionProps), as the mechanism to let core pass data to functions defined outside the core

fn monotonicity(&self) -> Option<FuncMonotonicity>;

// ===============================
// OPTIONAL METHODS START BELOW
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This trait consists of mandatory and optional methods, it can get a bit lengthy...
An alternative implementation is trait inheritance(e.g. pub trait ScalarFunctionExtended: ScalarFunctionDef, it seems won't be much clearer than the current one.
We can add more docs/examples to make it more straightforward later

@alamb
Copy link
Contributor

alamb commented Oct 30, 2023

Thank you @2010YOUY01 -- I plan to review this carefully tomorrow.

cc @viirya and others with whom we have talked about making functions more modular

@alamb
Copy link
Contributor

alamb commented Oct 31, 2023

As somewhat of an aside, I implemented a user defined function today (a backwards compatible implementation of to_timestamp) and it is quite annoying to have to provide the different callbacks as Arc around dynamic functions. Thank you for pushing this forward

Example User Defined Function

//! Implementation of `to_timestamp` function that
//! overrides the built in version in DataFusion because the semantics changed
//! upstream: <https://github.com/apache/arrow-datafusion/pull/7844>
use std::sync::Arc;

use arrow::datatypes::DataType;
use arrow::datatypes::TimeUnit;
use datafusion::common::internal_err;
use datafusion::logical_expr::{ReturnTypeFunction, Signature};
use datafusion::physical_expr::datetime_expressions;
use datafusion::physical_expr::expressions::cast_column;
use datafusion::{
    error::DataFusionError,
    logical_expr::{ScalarFunctionImplementation, ScalarUDF, Volatility},
    physical_plan::ColumnarValue,
};
use once_cell::sync::Lazy;

/// The name of the function
pub const TO_TIMESTAMP_FUNCTION_NAME: &str = "to_timestamp";

/// Implementation of to_timestamp
pub(crate) static TO_TIMESTAMP_UDF: Lazy<Arc<ScalarUDF>> = Lazy::new(|| {
    Arc::new(ScalarUDF::new(
        TO_TIMESTAMP_FUNCTION_NAME,
        &Signature::uniform(
            1,
            vec![
                DataType::Int64,
                DataType::Timestamp(TimeUnit::Nanosecond, None),
                DataType::Timestamp(TimeUnit::Microsecond, None),
                DataType::Timestamp(TimeUnit::Millisecond, None),
                DataType::Timestamp(TimeUnit::Second, None),
                DataType::Utf8,
            ],
            Volatility::Immutable,
        ),
        &TO_TIMESTAMP_RETURN_TYPE,
        &TO_TIMESTAMP_IMPL,
    ))
});

static TO_TIMESTAMP_RETURN_TYPE: Lazy<ReturnTypeFunction> = Lazy::new(|| {
    let func =
        |_arg_types: &[DataType]| Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)));

    Arc::new(func)
});

static TO_TIMESTAMP_IMPL: Lazy<ScalarFunctionImplementation> = Lazy::new(|| {
    let func = |args: &[ColumnarValue]| {
        if args.len() != 1 {
            return internal_err!("to_timestamp expected 1 argument, got {}", args.len());
        }

        match args[0].data_type() {
            // call through to arrow cast kernel
            DataType::Int64 | DataType::Timestamp(_, _) => cast_column(
                &args[0],
                &DataType::Timestamp(TimeUnit::Nanosecond, None),
                None,
            ),
            DataType::Utf8 => datetime_expressions::to_timestamp_nanos(args),
            dt => internal_err!("to_timestamp does not support argument type '{dt}'"),
        }
    };

    Arc::new(func)
});

// https://github.com/apache/arrow-datafusion/pull/7844

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @2010YOUY01 -- this is looking like a great first start.

One thing that came to mind when I was reviewing this PR was what about simply moving BuiltInScalarFunctions entirely into the physical_expr crate (and remove Expr::ScalarFunction). We would have to extend ScalarUDF... but that would have the nice property of keeping things uniform

}

/// Defines the return type behavior of a function.
pub enum FunctionReturnType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about a signature like this:

pub trait ScalarFunctionDef: Any + Sync + Send + std::fmt::Debug {
...
    /// What type will this function return, given arguments of the specified input types?
    /// By default, returns the same type as the first argument
    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
      arg_types.get(0)
        .ok_or_else(Error ("Implementation of Function {} did not specify a return type, and there are no arguments"))
  }
...
}

Then I think most function implementations can be left as the default or as

impl ScalarFunctionDef for Foo {
    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
      Ok(DataType::Utf8)
    }
}

@@ -150,6 +151,9 @@ pub enum Expr {
Sort(Sort),
/// Represents the call of a built-in scalar function with a set of arguments.
ScalarFunction(ScalarFunction),
/// Represents the call of a built-in scalar function with a set of arguments,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about calling this ScalarFunction2 or ScalarFunctionDyn to try and describe the difference? It might be confusing to see ScalarFunction and ScalarFunctionDef

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than add a parallel implementation I would love to just change

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarFunction {
    /// The function
    pub fun: built_in_function::BuiltinScalarFunction,
    /// List of expressions to feed to the functions as arguments
    pub args: Vec<Expr>,
}

to something like

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarFunction {
    /// The function
    pub fun: Arc<dyn ScalarFunctionDef>,
    /// List of expressions to feed to the functions as arguments
    pub args: Vec<Expr>,
}

use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

// TODO(PR): add doc comments
pub trait ScalarFunctionDef: Any + Sync + Send + std::fmt::Debug {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about calling this trait ScalarFunction rather than ScalarFunctionDef? I know there are already several other things called ScalarFunction but that would also keep it in line with things like WindowFunction https://docs.rs/datafusion/latest/datafusion/index.html?search=WindowFunction

fn as_any(&self) -> &dyn Any;

// May return 1 or more name as aliasing
fn name(&self) -> &[&str];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend name(&self) ->&str that returns one name, and then a second API that returns optional aliases

/// returns any alias names this function is known by. Defaults to empty list
fn aliases(&self) -> &[&str] { &[] }
``

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach looks better 👍🏼


fn return_type(&self) -> FunctionReturnType;

fn execute(&self, _args: &[ArrayRef]) -> Result<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be

Suggested change
fn execute(&self, _args: &[ArrayRef]) -> Result<ArrayRef> {
fn execute(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {

/// `execute()` and `execute_raw()` are two possible alternative for function definition:
/// If returns `false`, `execute()` will be used for execution;
/// If returns `true`, `execute_raw()` will be called.
fn use_execute_raw_instead(&self) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a single execute method that takes ColumnarValues and put in the documentation how to go from ColumnarValue --> ArrayRef for simple, initial implementation

/// The function body (`execute()` in `ScalarFunctionDef`) now are all defined in
/// `physical-expr` crate, so the new interface implementation are defined separately
/// in `BuiltinScalarFunctionWrapper`
impl ScalarFunctionDef for BuiltinScalarFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'll be able to impl this as long as BuiltInScalarFunction is split across two crates.

@2010YOUY01
Copy link
Contributor Author

2010YOUY01 commented Nov 1, 2023

Thank you @2010YOUY01 -- this is looking like a great first start.

One thing that came to mind when I was reviewing this PR was what about simply moving BuiltInScalarFunctions entirely into the physical_expr crate (and remove Expr::ScalarFunction). We would have to extend ScalarUDF... but that would have the nice property of keeping things uniform

Thank you for your review.

Moving BuiltinScalarFunctions entirely into phys-expr crate and extend ScalarUDF sounds like a good idea, I will experiment if this approach is doable

@alamb
Copy link
Contributor

alamb commented Nov 1, 2023

Moving BuiltinScalarFunctions entirely into phys-expr crate and extend ScalarUDF sounds like a good idea, I will experiment if this approach is doable

Thank you @2010YOUY01 -- I am pretty excited that this approach has the benefits of

  1. Minimizes API changes
  2. Ensures that UDFs have the same breadth of functionality as built in functions

Some steps that might get us there might be to make the fields of ScalarUDF non pub, which would be a breaking change, but would would then let us extend the API in a non breaking way

let udf = ScalarUDF::new(..)
  .with_aliases(["foo", "bar"]);

Over time, we could then even introduce a new trait API (and internally change how ScalarUDF is implemented) without making additional breaking changes.

@alamb alamb marked this pull request as draft November 1, 2023 16:19
@alamb
Copy link
Contributor

alamb commented Nov 2, 2023

@2010YOUY01 what do you think about this PR: #8039 (make the fields non pub)? Then I was thinking of trying to split out some of the BuiltInScalarFunctions into their own crate (array_functions perhaps)

@alamb
Copy link
Contributor

alamb commented Feb 7, 2024

I think we have merged a version of this work now, thank you @2010YOUY01 for all the help with the planning and implementation

@alamb alamb closed this Feb 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions sql SQL Planner substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants