diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs index ea529b74f610..a80255f413ee 100644 --- a/parquet/src/arrow/arrow_reader/filter.rs +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -29,8 +29,10 @@ pub trait ArrowPredicate: Send + 'static { /// Evaluate this predicate for the given [`RecordBatch`] containing the columns /// identified by [`Self::projection`] /// - /// Rows that are `true` in the returned [`BooleanArray`] will be returned by the - /// parquet reader, whereas rows that are `false` or `Null` will not be + /// Must return a [`BooleanArray`] that has the same length as the input + /// `batch` where each row indicates whether the row should be returned: + /// * `true`:the row should be returned + /// * `false` or `null`: the row should not be returned fn evaluate(&mut self, batch: RecordBatch) -> Result; } diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 77de83994078..52d7249a290e 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -677,11 +677,16 @@ pub(crate) fn apply_range( selection } -/// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`] +/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating +/// which rows to return. /// -/// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the -/// returned [`RowSelection`] will be the conjunction of this and -/// the rows selected by `predicate` +/// `input_selection`: Optional pre-existing selection. If `Some`, then the +/// final [`RowSelection`] will be the conjunction of it and the rows selected +/// by `predicate`. +/// +/// Note: A pre-existing selection may come from evaluating a previous predicate +/// or if the [`ParquetRecordBatchReader`] specified an explicit +/// [`RowSelection`] in addition to one or more predicates. pub(crate) fn evaluate_predicate( batch_size: usize, array_reader: Box, @@ -691,7 +696,16 @@ pub(crate) fn evaluate_predicate( let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone()); let mut filters = vec![]; for maybe_batch in reader { - let filter = predicate.evaluate(maybe_batch?)?; + let maybe_batch = maybe_batch?; + let input_rows = maybe_batch.num_rows(); + let filter = predicate.evaluate(maybe_batch)?; + // Since user supplied predicate, check error here to catch bugs quickly + if filter.len() != input_rows { + return Err(arrow_err!( + "ArrowPredicate predicate returned {} rows, expected {input_rows}", + filter.len() + )); + } match filter.null_count() { 0 => filters.push(filter), _ => filters.push(prep_null_mask_filter(&filter)), diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index cebf3f9d38b6..82f21461290b 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -241,16 +241,22 @@ impl RowSelection { selectors: remaining, } } - /// Given a [`RowSelection`] computed under `self`, returns the [`RowSelection`] - /// representing their conjunction + /// returns a [`RowSelection`] representing rows that are selected in both + /// input [`RowSelection`]s. /// - /// For example: + /// This is equivalent to the logical `AND` / conjunction of the two + /// selections. + /// + /// # Example + /// If `N` means the row is not selected, and `Y` means it is + /// selected: /// + /// ```text /// self: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY /// other: YYYYYNNNNYYYYYYYYYYYYY YYNNN /// /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN - /// + /// ``` /// /// # Panics ///