-
Notifications
You must be signed in to change notification settings - Fork 830
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(parquet): Add next_row_group API for ParquetRecordBatchStream #6907
Conversation
Signed-off-by: Xuanwo <github@xuanwo.io>
/// - `Ok(None)` if the stream has ended. | ||
/// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`. | ||
/// - `Ok(Some(reader))` which holds all the data for the row group. | ||
pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if next_row_group
is the best name, open to other options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a good, clear name as it clearly explains what it does
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this PR does have a certain elegant simplicity to it, however, it doesn't really solve the separation of IO and compute given that reader_factory.read_factory
potentially performs CPU-bound parquet decoding as part of late materialization / filter pushdown. It also has no ability to be parallelised.
Given that this isn't adding a host of additional complexity, I don't object to merging this in, but I wanted to flag that a solution to that problem likely will require something a bit different.
pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> { | ||
loop { | ||
match &mut self.state { | ||
StreamState::Decoding(_) | StreamState::Reading(_) => unreachable!(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should probably return an error saying not to mix polling the stream and using this API
I agree it doesn't solve (nor claim to) separting CPU and compute. Also, neither does what is currently in the repo
I don't understand the assertion that this can't be parallelized. Do you mean there is now way to have concurrent outstanding As I understand it, once the reader is returned, reading from the returned stream actually decodes the parquet data so this PR would allow the next IO to be interleaved with actually decoding the data.
I think we could support concurrent download / decode on multiple row groups of the same file today by creating multiple |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @etseidl |
Right this was in response to #6676 (comment) which instigated this PR. I'm mostly wary of merging an API if we're going to have to replace it in order to meet the desired use-case
The PR attests to be related to #5522 which concerns this Edit:
Yes, which is what Datafusion does today. It is somewhat arcane to get it to work, but is documented here |
I agree if we have some actual alternative in mind we should evaluate that prior to merging this PR It seems to me this PR makes it possible to interleave IO and decode which the current API does not. I agree it does not address the other parts of #5522 (like parallel decode of columns, for example). I updated the description to say it closed #6559 |
IN my opinion, even if we add some newer low level API there is still value to this higher one that permits interleaved download and decode, as described on |
I like this! This will work for what I need. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy for this to be merged, unless @Xuanwo it doesn't meet your requirements and you plan to add something different instead
Signed-off-by: Xuanwo <github@xuanwo.io>
fyi @thinkharderdev |
…pache#6907) * feat(parquet): Add next_row_group API for ParquetRecordBatchStream Signed-off-by: Xuanwo <github@xuanwo.io> * chore: Returning error instead of using unreachable Signed-off-by: Xuanwo <github@xuanwo.io> --------- Signed-off-by: Xuanwo <github@xuanwo.io>
Which issue does this PR close?
ParquetRecordBatchStream
API to fetch the next row group while decoding #6559ParquetRecordBatchStream
to load the next row group while decoding #6676Rationale for this change
Add
async fn next_row_group()
for ParquetRecordBatchStream so that users can fecth row groups based on their needs and decode the data seperately.This PR marks the first step in further decoupling the I/O and decoding processes of Parquet reading.
What changes are included in this PR?
Add new API:
Are there any user-facing changes?
Yes.