Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive Parquet Predicate Pushdown #5523

Open
tustvold opened this issue Mar 17, 2024 · 6 comments
Open

Adaptive Parquet Predicate Pushdown #5523

tustvold opened this issue Mar 17, 2024 · 6 comments
Assignees
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@tustvold
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Currently RowSelection stores a list of RowSelector. This is optimised for the case of large runs of skipped or selected rows, allowing this to be pushed down to the underlying decoding machinery. Whilst this works very well for the use-case of skipping data based on the page index, where the selections are necessarily in the thousands of rows, it will potentially degrade in the presence of more granular predicate evaluation, e.g. as performed by ArrowPredicate.

Describe the solution you'd like

In a similar vein to #1248, we should have different strategies based on the selectivity of the predicate. In particular I would like RowSelection to switch between a RowSelector approach that is pushed down to the underlying readers, and a late evaluation approach where it stores a BooleanBuffer that is applied to the columns after the fact

Describe alternatives you've considered

Additional context

@XiangpengHao
Copy link
Contributor

take

@XiangpengHao
Copy link
Contributor

I'll take a look at this. Here are some of my plans:

  • Implement a benchmark, e.g., RowSelection with 100k selectors (as used in ClickBench q21), benchmark and_then, from_filters and intersection
  • Implement a new RowSelection that uses a BooleanBuffer as the backend. Compare the performance.
  • Decide the policy when to use which.

Some more context:
I have to switch to a boolean buffer based row selection to reduce the selection overhead in another project. So I kind of already have all the implementation ready. The remaining work for me is to figure out a way to contribute back to arrow-rs.

@alamb
Copy link
Contributor

alamb commented Oct 22, 2024

An important goal of this ticket, mentioned by @tustvold in #6454 (comment), is that evaluating predicates in ArrowFilter (aka pushed down predicates) is never worse than decoding the columns first and then filtering them with the filter kernel

If we are able to achieve this goal, it would mean that query engines like DataFusion could push all predicates down into the predicate reader always.

At the moment, since it sometimes faster to apply filters after reading columns than it is via ArrowPredicate sometimes queries get slower when all predicates are pushed down.

When it makes sense to push predicates down depends on their actual selectivity, which is only known for sure during evaluation

Thus, I agree with the conclusion that implementing adaptivity in the lowest level scan will achieve the goal

@XiangpengHao
Copy link
Contributor

XiangpengHao commented Oct 22, 2024

is that evaluating predicates in ArrowFilter (aka pushed down predicates) is never worse than decoding the columns first and then filtering them with the filter kernel

This is an excellent summary of the goal, it also aligns well with my current project.

Since I have gone quite far on this, I want to share some of the issues I have encountered:

  • very fast row selection, as described in this ticket.
  • avoid decoding the predicate columns twice, potentially at the cost of higher memory usage, as described in parquet::column::reader::GenericColumnReader::skip_records still decompresses most data #6454 (comment)
  • adaptive slice or filter the resulting array, i.e., if the selection is sparse, we should filter/take, otherwise we should slice.
  • coalesce the resulting record batches. Since the filter is pushed to ParquetExec, we won't have FilterExec therefore no CoalesceBatchExec, which requires the ParquetExec to emit coalesced record batches.

@alamb
Copy link
Contributor

alamb commented Oct 22, 2024

coalesce the resulting record batches. Since the filter is pushed to ParquetExec, we won't have FilterExec therefore no CoalesceBatchExec, which requires the ParquetExec to emit coalesced record batches.

There is a structure in datafusion for Coalescing which might help / provide inspiration / be good to port upstream: https://github.com/apache/datafusion/blob/c22abb4ac3f1af8bbdf176ef0198988fc7b0982c/datafusion/physical-plan/src/coalesce/mod.rs#L71

@alamb
Copy link
Contributor

alamb commented Oct 22, 2024

adaptive slice or filter the resulting array, i.e., if the selection is sparse, we should filter/take, otherwise we should slice.

I think this is what @tustvold mentioned with the filter kernels that also adaptively decide take / iterate / etc based on the actual selection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

No branches or pull requests

3 participants