Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype implementing DataFusion functions / operators using arrow-udf liibrary #11413

Closed
alamb opened this issue Jul 11, 2024 · 18 comments
Closed
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 11, 2024

Is your feature request related to a problem or challenge?

Related to the discussion on #11192 with @Xuanwo

RisingWave has a library for automatically creating vectorized implementations of functions (e.g. that operate on arrow arrays) from scalar implementations

The library is here: https://github.com/risingwavelabs/arrow-udf

A blog post describing it is here: https://risingwave.com/blog/simplifying-sql-function-implementation-with-rust-procedural-macro/

DataFusion uses macros to do something similar in binary.rs but they are pretty hard to read / understand in my opinon:

macro_rules! compute_utf8_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast left side array");
let rr = $RIGHT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast right side array");
Ok(Arc::new(paste::expr! {[<$OP _utf8>]}(&ll, &rr)?))
}};
}

One main benefit I can see to switching to https://github.com/risingwavelabs/arrow-udf is that we could then extend arrow-udf to support Dictionary and StringView and maybe other types to generate fast kernels for multiple different array layouts.

Describe the solution you'd like

I think it would be great if someone could evaluate the feasibility of using the macros in https://github.com/risingwavelabs/arrow-udf to implement Datafusion's operations (and maybe eventually functions etc)

Describe alternatives you've considered

I suggest a POC that picks one or two functions (maybe string equality or regexp_match or something) and tries to use arrow-udfs function macro instead.

Here is an example of how to use it: https://docs.rs/arrow-udf/0.3.0/arrow_udf/

Additional context

No response

@alamb alamb added the enhancement New feature or request label Jul 11, 2024
@xinlifoobar
Copy link
Contributor

take

@xinlifoobar
Copy link
Contributor

xinlifoobar commented Jul 16, 2024

Sorry it takes longer than I expected to make this works end-to-end. I plan to make an ScalarUDF with arrow-udf as the completion of the prototype work.

From my perspective (feel free to correct me if I'm wrong),

Good points:

  • Provide uniform way to implement functions against record batches.
  • Code saving.

Bad points:

  • Due to the macro implementation, the global_registry features needs to be defined at the crate that references arrow-udf. otherwise, it would not work.
  • Difficult to leverage arrow infrastructures projects like arrow-string or arrow-ord.
  • Lack of support for operations against array and scalar.
  • By default all udf are private, lack of a way to reference the udf that could be used in e.g., ExprPlanner.

Neural:

  • The arrow-udf interfaces are targeting RecordBatch and Field while Datafusion uses ColumnarValue and Datatype. I'd vote for both implementations but thought of RecordBatch are more nature abstraction while take advantages of arrow.
  • Lack of support of Arrow types that Datafusion needs, e.g, Decimal128. This would be an issue if we plan to replace some generic function, e.g., eq.

I'd think we could replace some string functions, that are not supported by arrow-string by arrow-udf to get rid of macros like compute_utf8_op.

match &self.op {
IsDistinctFrom | IsNotDistinctFrom | Lt | LtEq | Gt | GtEq | Eq | NotEq
| Plus | Minus | Multiply | Divide | Modulo | LikeMatch | ILikeMatch
| NotLikeMatch | NotILikeMatch => unreachable!(),
And => {
if left_data_type == &DataType::Boolean {
boolean_op!(&left, &right, and_kleene)
} else {
internal_err!(
"Cannot evaluate binary expression {:?} with types {:?} and {:?}",
self.op,
left.data_type(),
right.data_type()
)
}
}
Or => {
if left_data_type == &DataType::Boolean {
boolean_op!(&left, &right, or_kleene)
} else {
internal_err!(
"Cannot evaluate binary expression {:?} with types {:?} and {:?}",
self.op,
left_data_type,
right_data_type
)
}
}
RegexMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, false, false)
}
RegexIMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, false, true)
}
RegexNotMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, true, false)
}
RegexNotIMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, true, true)
}
BitwiseAnd => bitwise_and_dyn(left, right),
BitwiseOr => bitwise_or_dyn(left, right),
BitwiseXor => bitwise_xor_dyn(left, right),
BitwiseShiftRight => bitwise_shift_right_dyn(left, right),
BitwiseShiftLeft => bitwise_shift_left_dyn(left, right),
StringConcat => binary_string_array_op!(left, right, concat_elements),
AtArrow | ArrowAt => {
unreachable!("ArrowAt and AtArrow should be rewritten to function")
}

An example would be

// declare concat
#[function("concat(string, string) -> string")]
#[function("concat(largestring, largestring) -> largestring")]
fn concat(lhs: &str, rhs: &str) -> String {
    format!("{}{}", lhs, rhs)
}

// reference concat
apply_udf(
    &ColumnarValue::Array(left),
    &ColumnarValue::Array(right),
    &Field::new("", DataType::Utf8, true),
    "concat",
)

CC @alamb

@xinlifoobar
Copy link
Contributor

Btw... the ColumnarValue introduced into datafusion 2 years ago. Considering:

  • ColumnarValue are always paired use with SchemaRef
  • RecordBatches are one or more ColumnarValues with schema.

Will it be better to use RecordBatches instead of ColumnValues in PhysicalExpr evaluate function? It will provide finely integrations with arrow-rs eco system.

@alamb
Copy link
Contributor Author

alamb commented Jul 17, 2024

Btw... the ColumnarValue introduced into datafusion 2 years ago. Considering:

  • ColumnarValue are always paired use with SchemaRef
  • RecordBatches are one or more ColumnarValues with schema.

Will it be better to use RecordBatches instead of ColumnValues in PhysicalExpr evaluate function? It will provide finely integrations with arrow-rs eco system.

One thing that ColumnarValue does well is represent single values efficiently (aka ScalarValue) which is a very important optimization for performance

I don't see any fundamental reason we couldn't use RecordBatch if we figured out a better way to represent single row

@alamb
Copy link
Contributor Author

alamb commented Jul 17, 2024

Sorry it takes longer than I expected to make this works end-to-end. I plan to make an ScalarUDF with arrow-udf as the completion of the prototype work.

Thank you so much @xinlifoobar -- this is really helpful and a great analysis (I think the pros/cons you identified make a lot of sense to me)

From what I can see, if we wanted to proceed with using arrow-udf in DataFusion we would need to address the shortcomings you identified above.

Here are some additional discussions

By default all udf are private, lack of a way to reference the udf that could be used in e.g., ExprPlanner.

I think this is part of the same concept as discussed on https://lists.apache.org/thread/x8wvlkfr0osl15o52rw85wom0p4v05x6 -- basically the arrow-udf library's scope is large enough to encompass things like a function registry that DataFusion already has

Lack of support for operations against array and scalar.

I do think being able to special case scalar value is a critical requirement for performance.

I will post about your findings on the mailing lists and let's see what the authors of arrow-udf have to say

cc @wangrunji0408 @Xuanwo

@alamb
Copy link
Contributor Author

alamb commented Jul 17, 2024

An example would be

FWIW this implementation of concat would likely perform pretty poorly compared to a hand written one as it will both create and allocate a new temporary String for each row (and then presumably copy that value into a final StringArray/LargeStringArray).

// declare concat
#[function("concat(string, string) -> string")]
#[function("concat(largestring, largestring) -> largestring")]
fn concat(lhs: &str, rhs: &str) -> String {
    format!("{}{}", lhs, rhs)
}

@wangrunji0408
Copy link

wangrunji0408 commented Jul 22, 2024

An example would be

FWIW this implementation of concat would likely perform pretty poorly compared to a hand written one as it will both create and allocate a new temporary String for each row (and then presumably copy that value into a final StringArray/LargeStringArray).

// declare concat
#[function("concat(string, string) -> string")]
#[function("concat(largestring, largestring) -> largestring")]
fn concat(lhs: &str, rhs: &str) -> String {
    format!("{}{}", lhs, rhs)
}

In this case, a writer-style return value is supported to avoid the overhead.

#[function("concat(string, string) -> string")]
#[function("concat(largestring, largestring) -> largestring")] // will be supported soon
fn concat(lhs: &str, rhs: &str, output: &mut impl std::fmt::Write) {
    write!(output, "{}{}", lhs, rhs).unwrap();
}

The same technique can be applied to binary and largebinary. However, it is not yet implemented due to the lack of impl std::io::Write for (Large)BinaryBuilder in arrow crate.

// to be supported
#[function("concat(binary, binary) -> binary")]
#[function("concat(largebinary, largebinary) -> largebinary")]
fn concat(lhs: &[u8], rhs: &[u8], output: &mut impl std::io::Write) {
    output.write_all(lhs).unwrap();
    output.write_all(rhs).unwrap();
}

wangrunji0408 added a commit to arrow-udf/arrow-udf that referenced this issue Jul 22, 2024
This PR will add an additional meta parameter `visibility` to
`arrow-udf`. I might want this to be added while working on
apache/datafusion#11413. Sometimes it is
better to reference the symbol directly instead of using the function
registry.

---------

Co-authored-by: Runji Wang <wangrunji0408@163.com>
@xxchan
Copy link
Contributor

xxchan commented Jul 24, 2024

To clarify, the arrow-udf project provides several things. (I just tried to explain it here arrow-udf/arrow-udf#55) For DataFusion, we are mainly interested in the #[function] macro (in the arrow-udf crate), right?

wangrunji0408 pushed a commit to arrow-udf/arrow-udf that referenced this issue Jul 24, 2024
I'm inspired by apache/datafusion#11413. It
seems they just need the function macro, not like "UDF", which surprised
me a little.

---------

Signed-off-by: xxchan <xxchan22f@gmail.com>
@alamb
Copy link
Contributor Author

alamb commented Jul 24, 2024

To clarify, the arrow-udf project provides several things. (I just tried to explain it here risingwavelabs/arrow-udf#55) For DataFusion, we are mainly interested in the #[function] macro (in the arrow-udf crate), right?

the #[function] macro seemed the obvious thing that would be easiest to apply to DataFusion. The rest of the system (for loading user defined functions at runtime with WASM and a function registry) might be interesting, but DataFusion already has similar features already, so switching would likely be a large API change for users for unclear benfits at the moment

@findepi
Copy link
Member

findepi commented Sep 12, 2024

it will both create and allocate a new temporary String for each row (and then presumably copy that value into a final StringArray/LargeStringArray).

// declare concat
#[function("concat(string, string) -> string")]
#[function("concat(largestring, largestring) -> largestring")]
fn concat(lhs: &str, rhs: &str) -> String {
    format!("{}{}", lhs, rhs)
}

I understand we're not concerned whether format! is the best way to concatenate two strings, but more about the fundamental fact that concat function allocates and returns a new String object every time.

I don't know how does #[function] macro work, but I assume it generates a ColumnarValue processor that handles ScalarValue directly and an Array value with a for loop. For the for loop code branch, is the compiler capable of optimizing away the concat's allocation?

@wangrunji0408
Copy link

@findepi

The #[function] macro doesn't process ColumnarValue as in datafusion. Instead it always outputs an array value.

A new String object will be allocated if the concat function is defined as above. However, you can prevent this by rewriting the function as follows, where the result is written directly to the array buffer.

#[function("concat(string, string) -> string")]
#[function("concat(largestring, largestring) -> largestring")]
fn concat(lhs: &str, rhs: &str, output: &mut impl std::fmt::Write) {
    write!(output, "{}{}", lhs, rhs).unwrap();
}

@findepi
Copy link
Member

findepi commented Sep 13, 2024

@wangrunji0408 thanks, that's valuable!

I spend quite some time today benchmarking various hypothetical implementations where concatenation logic is maximally separated from columnar (scalar/array) handling.

Here couple conclusions

  • hand-written1 concat(a, b) can easily be 18%2 faster than current variadic implementation
  • result array pre-sizing has very big impact on performance (~35%)
    • currently concat does this internally, together with doing the actual concatenation.
      However, if we go the "simple functions" path, the sizing can also be adaptive, especially if function implementation is given some scratch space for doing the math for adaptivity Specialized / Pre-compiled / Prepared ScalarUDFs #8051 (comment)
  • extracting logic to fn concat(a: &str, b: &str) -> String has x3-6 performance hit (compared to writing to MutableBuffer directly). I still struggle to accept that the compiler cannot inline that!
  • adapting &MutableBuffer to &std::fmt::Write with another throw-away struct is optimized away by the compiler. (and we also can rustly do impl std::fmt::Write for MutableBuffer). This means we should be able to provide zero-cost3 abstraction where function logic is implemented as
    concat(lhs: &str, rhs: &str, output: &mut impl std::fmt::Write)
    and the output writes directly back to MutableBuffer of the StringArray being constructed.
  • write!(output, "{}{}", a, b).unwrap()
    is significantly slower than
    output.write_str(a).unwrap();
    output.write_str(b).unwrap();
  • and
    format!("{}{}", a, b)
    is significantly slower than
    a.to_string() + b

Footnotes

  1. for hand-written i was using DF's StringArrayBuilder, but interacting with it's valueBuffer MutableBuffer directly. So bare-arrow in practice. https://gist.github.com/findepi/e7b53f8e06cde083205125d83f7ec615

  2. i was running benchmarks with time cargo bench --bench concat with slight modifications of the benchmark: two inputs arrays, no nulls since DF's concat has non-typical null handling which i didn't want to focus on

  3. pre-sizing aside

@alamb
Copy link
Contributor Author

alamb commented Sep 13, 2024

adapting &MutableBuffer to &std::fmt::Write with another throw-away struct is optimized away by the compiler. (and we also can rustly do impl std::fmt::Write for MutableBuffer). This means we should be able to provide zero-cost3 abstraction where function logic is implemented as

BTW StringBuilder already implements Write https://docs.rs/arrow/latest/arrow/array/builder/type.GenericStringBuilder.html

We are discussing something similar here:

@findepi
Copy link
Member

findepi commented Sep 13, 2024

BTW StringBuilder already implements Write

Thanks for pointing this out.
In my testing, with other things being constant, StringBuilder is slower than my hacked DF's StringArrayBuilder (by ~10% of the benchmark). My guess this is because of null handling, but didn't measure. (The function should be able to declare whether it can declare null result on non-null input, which means null handling can be elided away in the optimal case, without requiring the function body to deal with this.)

@alamb
Copy link
Contributor Author

alamb commented Sep 13, 2024

In my testing, with other things being constant, StringBuilder is slower than my hacked DF's StringArrayBuilder (by ~10% of the benchmark). My guess this is because of null handling, but didn't measure. (

That would make sense and was the rationale on the original inclusion from @JasonLi-cn (❤️ )

(The function should be able to declare whether it can declare null result on non-null input, which means null handling can be elided away in the optimal case, without requiring the function body to deal with this.)

I think the other thing that is important is if the function produces null on a null input (which has some term but I can't remember now).

With those two properties I do think you can skip most null handling.

This PR from @kazuyukitanimura in arrow may also help improve the situation for normal arrow StringBuilder apache/arrow-rs#6288

@findepi
Copy link
Member

findepi commented Sep 14, 2024

I think the other thing that is important is if the function produces null on a null input (which has some term but I can't remember now).

is this a null-call function?

A null-call function is an SQL-invoked function that is defined to return the null value if any of its input argu- ments is the null value. A null-call function is an SQL-invoked function whose specifies “RETURNS NULL ON NULL INPUT”.

With those two properties I do think you can skip most null handling.

yes. many functions are "return null on null; return non-null on non-null". for these null handling can be externalized, and the function "business logic" abstracted.

@alamb
Copy link
Contributor Author

alamb commented Sep 15, 2024

A null-call function is an SQL-invoked function that is defined to return the null value if any of its input argu- ments is the null value. A null-call function is an SQL-invoked function whose specifies “RETURNS NULL ON NULL INPUT”.

Indeed I was thinking of RETURNS NULL ON NULL INPUT (though I thought there was some other academic term like "null pure" or something, but I can't find it now).

With those two properties I do think you can skip most null handling.

yes. many functions are "return null on null; return non-null on non-null". for these null handling can be externalized, and the function "business logic" abstracted.

Exactly!

@alamb
Copy link
Contributor Author

alamb commented Nov 21, 2024

I think the prototype was done. Let's continue the discussion in #12635

@alamb alamb closed this as completed Nov 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
5 participants