From eac0495c925cbabe01a3c39f68a3a45057438db4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 May 2024 20:11:07 -0400 Subject: [PATCH] Suggest using new(), add example --- parquet/src/arrow/arrow_reader/mod.rs | 4 +-- parquet/src/arrow/async_reader/mod.rs | 51 +++++++++++++++++++++++---- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 84e83f5dc653..2a2fee29c711 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -49,9 +49,9 @@ pub use selection::{RowSelection, RowSelector}; /// Most users should use one of the following specializations: /// /// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`] -/// * `async` API: [`ParquetRecordBatchStreamBuilder::new_with_metadata`] +/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`] /// -/// [`ParquetRecordBatchStreamBuilder::new_with_metadata`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_metadata +/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new pub struct ArrowReaderBuilder { pub(crate) input: T, diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index c8a9f82c32c0..1e298c654975 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -228,7 +228,7 @@ impl ArrowReaderMetadata { /// breaking the pre-existing ParquetRecordBatchStreamBuilder API pub struct AsyncReader(T); -/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file +/// A builder used to construct a [`ParquetRecordBatchStream`] for `async` reading of a parquet file /// /// In particular, this handles reading the parquet file metadata, allowing consumers /// to use this information to select what specific columns, row groups, etc... @@ -239,6 +239,37 @@ pub type ParquetRecordBatchStreamBuilder = ArrowReaderBuilder> impl ParquetRecordBatchStreamBuilder { /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file + /// + /// # Example + /// + /// ``` + /// # use std::fs::metadata; + /// # use std::sync::Arc; + /// # use bytes::Bytes; + /// # use arrow_array::{Int32Array, RecordBatch}; + /// # use arrow_schema::{DataType, Field, Schema}; + /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata; + /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; + /// # use tempfile::tempfile; + /// # use futures::StreamExt; + /// # #[tokio::main(flavor="current_thread")] + /// # async fn main() { + /// # + /// # let mut file = tempfile().unwrap(); + /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); + /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); + /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); + /// # writer.write(&batch).unwrap(); + /// # writer.close().unwrap(); + /// // Open async file containing parquet data + /// let mut file = tokio::fs::File::from_std(file); + /// // construct the reader + /// let mut reader = ParquetRecordBatchStreamBuilder::new(file) + /// .await.unwrap().build().unwrap(); + /// // Read batche + /// let batch: RecordBatch = reader.next().await.unwrap().unwrap(); + /// # } + /// ``` pub async fn new(input: T) -> Result { Self::new_with_options(input, Default::default()).await } @@ -253,7 +284,9 @@ impl ParquetRecordBatchStreamBuilder { /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`] /// /// This allows loading metadata once and using it to create multiple builders with - /// potentially different settings + /// potentially different settings, that can be read in parallel. + /// + /// # Example of reading from multiple streams in parallel /// /// ``` /// # use std::fs::metadata; @@ -268,23 +301,29 @@ impl ParquetRecordBatchStreamBuilder { /// # #[tokio::main(flavor="current_thread")] /// # async fn main() { /// # - /// let mut file = tempfile().unwrap(); + /// # let mut file = tempfile().unwrap(); /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); /// # writer.write(&batch).unwrap(); /// # writer.close().unwrap(); - /// # + /// // open file with parquet data /// let mut file = tokio::fs::File::from_std(file); + /// // load metadata once /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap(); + /// // create two readers, a and b, from the same underlying file + /// // without reading the metadata again /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata( /// file.try_clone().await.unwrap(), /// meta.clone() /// ).build().unwrap(); /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap(); /// - /// // Should be able to read from both in parallel - /// assert_eq!(a.next().await.unwrap().unwrap(), b.next().await.unwrap().unwrap()); + /// // Can read batches from both readers in parallel + /// assert_eq!( + /// a.next().await.unwrap().unwrap(), + /// b.next().await.unwrap().unwrap(), + /// ); /// # } /// ``` pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {