Skip to content

Commit

Permalink
Suggest using new(), add example
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 21, 2024
1 parent 96642ac commit eac0495
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ pub use selection::{RowSelection, RowSelector};
/// Most users should use one of the following specializations:
///
/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
/// * `async` API: [`ParquetRecordBatchStreamBuilder::new_with_metadata`]
/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
///
/// [`ParquetRecordBatchStreamBuilder::new_with_metadata`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new_with_metadata
/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
pub struct ArrowReaderBuilder<T> {
pub(crate) input: T,

Expand Down
51 changes: 45 additions & 6 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);

/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file
/// A builder used to construct a [`ParquetRecordBatchStream`] for `async` reading of a parquet file
///
/// In particular, this handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
Expand All @@ -239,6 +239,37 @@ pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>

impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
///
/// # Example
///
/// ```
/// # use std::fs::metadata;
/// # use std::sync::Arc;
/// # use bytes::Bytes;
/// # use arrow_array::{Int32Array, RecordBatch};
/// # use arrow_schema::{DataType, Field, Schema};
/// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
/// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
/// # use tempfile::tempfile;
/// # use futures::StreamExt;
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
/// # let mut file = tempfile().unwrap();
/// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
/// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
/// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
/// # writer.write(&batch).unwrap();
/// # writer.close().unwrap();
/// // Open async file containing parquet data
/// let mut file = tokio::fs::File::from_std(file);
/// // construct the reader
/// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
/// .await.unwrap().build().unwrap();
/// // Read batche
/// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
/// # }
/// ```
pub async fn new(input: T) -> Result<Self> {
Self::new_with_options(input, Default::default()).await
}
Expand All @@ -253,7 +284,9 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
///
/// This allows loading metadata once and using it to create multiple builders with
/// potentially different settings
/// potentially different settings, that can be read in parallel.
///
/// # Example of reading from multiple streams in parallel
///
/// ```
/// # use std::fs::metadata;
Expand All @@ -268,23 +301,29 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
/// let mut file = tempfile().unwrap();
/// # let mut file = tempfile().unwrap();
/// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
/// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
/// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
/// # writer.write(&batch).unwrap();
/// # writer.close().unwrap();
/// #
/// // open file with parquet data
/// let mut file = tokio::fs::File::from_std(file);
/// // load metadata once
/// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
/// // create two readers, a and b, from the same underlying file
/// // without reading the metadata again
/// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
/// file.try_clone().await.unwrap(),
/// meta.clone()
/// ).build().unwrap();
/// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
///
/// // Should be able to read from both in parallel
/// assert_eq!(a.next().await.unwrap().unwrap(), b.next().await.unwrap().unwrap());
/// // Can read batches from both readers in parallel
/// assert_eq!(
/// a.next().await.unwrap().unwrap(),
/// b.next().await.unwrap().unwrap(),
/// );
/// # }
/// ```
pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
Expand Down

0 comments on commit eac0495

Please sign in to comment.