-
Notifications
You must be signed in to change notification settings - Fork 838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
parquet::column::reader::GenericColumnReader::skip_records
still decompresses most data
#6454
Comments
Have you enabled the page index? |
Indeed. Or enabled v2 page headers? The issue seems to be that when skipping rows ( I don't think pages are uncompressed twice...it's just a result of the two paths through |
I think the documentation on https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.RowFilter.html is also instructive. Even if all the decode is working properly, I think the arrow reader may well decode certain pages twice. It is one of my theories about why pushing filters down doesn't make things always faster, but I have not had time to look into it in more detail |
See also #5523 Although I suspect in this case the issue is a lack of page index information for whatever reason |
I've taken a look at the reproducer linked from apache/datafusion#7845 (comment) and I'm not sure that predicate pushdown is going to be helpful here. The query is
What follows are some avenues to explore Potentially Incorrect DF ProjectionMask However, there is something fishy here. Given the query doesn't actually request any columns, the final The trace, however, would suggest DF is requesting columns in the final projection, I wonder if DF requests filter columns in the projection mask even when the filter that needs them has been pushed down? This is probably something that could/should be fixed. Adaptive Predicate Pushdown Currently all
The question then becomes what makes this judgement, currently I believe DF pushes everything down that it can. #5523 proposes adding some logic to the parquet reader to accept the pushed down predicate but choose to not use it for late materialization. This would have the effect of making it so that pushing down a predicate is no worse than not pushing it down, but it would not improve the best-case performance. Cache Decompressed Pages Currently when evaluating a predicate, even if those columns are to be used again, either in another predicate or the final projection mask, the decompressed pages are not kept around. Keeping them around would have the advantage of saving CPU cycles at the cost of potentially significant additional memory usage, especially if the predicate is very selective. Cache Filtered Columns A potentially better option would be to retain the filtered columns for later usage, however, aside from being quite complex to implement this still runs the risk of blowing the memory budget Lazy Predicate Evaluation The problem with both of the above caching strategies is that predicates are completely evaluated up-front. Whilst this makes the code much simpler, especially in async contexts, it has the major drawback that any caching strategy has to potentially retain a huge amount of decoded data. If instead we incrementally evaluated the filters as we went, we would be able to yield batches as we went. The one subtlety concerns object stores, where interleaving IO in the same way is likely to seriously hurt. We may need to no longer perform I/O pushdown based on This would also improve the performance of queries with limits that can't be fully pushed down. I will try to find some time to work on this over the next few days. |
FWIW, I quantified the memory usage vs query time with ClickBench query 21: The query: SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE '%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
As @tustvold predicted, we roughly gets 4x better performance while 4x more memory usage. |
👍 on this. I happened to noticed that each intersect/union of |
I agree this should be fixed. I will try and file a ticket to investigate |
Describe the bug
I noticed this while investigating apache/datafusion#7845 (comment).
The suggestion from @jayzhan211 and @alamb was that
datafusion.execution.parquet.pushdown_filters true
should improve performance of queries like this, but it seems to make them slower.I think the reason is that data is being decompressed twice (or data is being decompressed that shouldn't be), here's a screenshot from samply running on this code:
(You can view this flamegraph properly here)
You can see that there are two blocks of decompression work, the second one is associated with
parquet::column::reader::GenericColumnReader::skip_records
and happens after the first decompression chunk and running the query has completed.In particular you can se that there's a
read_new_page()
cal inparquet::column::reader::GenericColumnReader::skip_records
(line 335) that's taking a lot of time:My question is - could this second run of compression be avoided?
To Reproduce
Clone https://github.com/samuelcolvin/batson-perf, comment out one of the modes, compile with profiling enabled
cargo build --profile profiling
, run with samplysamply record ./target/profiling/batson-perf
Expected behavior
I would expect that
datafusion.execution.parquet.pushdown_filters true
was faster, I think the reason it's not is decompressing the data twice.Additional context
apache/datafusion#7845 (comment)
The text was updated successfully, but these errors were encountered: