From d65240c12067924919fb50c079b961ec828a6100 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 21 May 2024 03:14:47 -0400 Subject: [PATCH] Refine ParquetRecordBatchReaderBuilder docs (#5774) * Refine ParquetRecordBatchReaderBuilder docs * fix link * Suggest using new(), add example --- parquet/src/arrow/arrow_reader/mod.rs | 12 +++---- parquet/src/arrow/async_reader/mod.rs | 51 +++++++++++++++++++++++---- 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index db75c54bf5d0..adfd4fa4c4cc 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -44,14 +44,14 @@ use crate::file::page_index::index_reader; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; -/// A generic builder for constructing sync or async arrow parquet readers. This is not intended -/// to be used directly, instead you should use the specialization for the type of reader -/// you wish to use +/// Builder for constructing parquet readers into arrow. /// -/// * For a synchronous API - [`ParquetRecordBatchReaderBuilder`] -/// * For an asynchronous API - [`ParquetRecordBatchStreamBuilder`] +/// Most users should use one of the following specializations: /// -/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder +/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`] +/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`] +/// +/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new pub struct ArrowReaderBuilder { pub(crate) input: T, diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index c8a9f82c32c0..1e298c654975 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -228,7 +228,7 @@ impl ArrowReaderMetadata { /// breaking the pre-existing ParquetRecordBatchStreamBuilder API pub struct AsyncReader(T); -/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file +/// A builder used to construct a [`ParquetRecordBatchStream`] for `async` reading of a parquet file /// /// In particular, this handles reading the parquet file metadata, allowing consumers /// to use this information to select what specific columns, row groups, etc... @@ -239,6 +239,37 @@ pub type ParquetRecordBatchStreamBuilder = ArrowReaderBuilder> impl ParquetRecordBatchStreamBuilder { /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file + /// + /// # Example + /// + /// ``` + /// # use std::fs::metadata; + /// # use std::sync::Arc; + /// # use bytes::Bytes; + /// # use arrow_array::{Int32Array, RecordBatch}; + /// # use arrow_schema::{DataType, Field, Schema}; + /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata; + /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; + /// # use tempfile::tempfile; + /// # use futures::StreamExt; + /// # #[tokio::main(flavor="current_thread")] + /// # async fn main() { + /// # + /// # let mut file = tempfile().unwrap(); + /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); + /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); + /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); + /// # writer.write(&batch).unwrap(); + /// # writer.close().unwrap(); + /// // Open async file containing parquet data + /// let mut file = tokio::fs::File::from_std(file); + /// // construct the reader + /// let mut reader = ParquetRecordBatchStreamBuilder::new(file) + /// .await.unwrap().build().unwrap(); + /// // Read batche + /// let batch: RecordBatch = reader.next().await.unwrap().unwrap(); + /// # } + /// ``` pub async fn new(input: T) -> Result { Self::new_with_options(input, Default::default()).await } @@ -253,7 +284,9 @@ impl ParquetRecordBatchStreamBuilder { /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`] /// /// This allows loading metadata once and using it to create multiple builders with - /// potentially different settings + /// potentially different settings, that can be read in parallel. + /// + /// # Example of reading from multiple streams in parallel /// /// ``` /// # use std::fs::metadata; @@ -268,23 +301,29 @@ impl ParquetRecordBatchStreamBuilder { /// # #[tokio::main(flavor="current_thread")] /// # async fn main() { /// # - /// let mut file = tempfile().unwrap(); + /// # let mut file = tempfile().unwrap(); /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); /// # writer.write(&batch).unwrap(); /// # writer.close().unwrap(); - /// # + /// // open file with parquet data /// let mut file = tokio::fs::File::from_std(file); + /// // load metadata once /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap(); + /// // create two readers, a and b, from the same underlying file + /// // without reading the metadata again /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata( /// file.try_clone().await.unwrap(), /// meta.clone() /// ).build().unwrap(); /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap(); /// - /// // Should be able to read from both in parallel - /// assert_eq!(a.next().await.unwrap().unwrap(), b.next().await.unwrap().unwrap()); + /// // Can read batches from both readers in parallel + /// assert_eq!( + /// a.next().await.unwrap().unwrap(), + /// b.next().await.unwrap().unwrap(), + /// ); /// # } /// ``` pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {