diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index b0b72bde3bf1..de723cae1e3d 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -14,6 +14,7 @@ //! SST in parquet format. +pub(crate) mod file_range; mod format; pub(crate) mod helper; pub(crate) mod metadata; diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs new file mode 100644 index 000000000000..f385ea992e0e --- /dev/null +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -0,0 +1,186 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Structs and functions for reading ranges from a parquet file. A file range +//! is usually a row group in a parquet file. + +use std::ops::BitAnd; +use std::sync::Arc; + +use api::v1::SemanticType; +use datatypes::arrow::array::BooleanArray; +use datatypes::arrow::buffer::BooleanBuffer; +use parquet::arrow::arrow_reader::RowSelection; +use snafu::ResultExt; + +use crate::error::{FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result}; +use crate::read::Batch; +use crate::row_converter::{McmpRowCodec, RowCodec}; +use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext}; + +/// A range of a parquet SST. Now it is a row group. +/// We can read different file ranges in parallel. +pub struct FileRange { + /// Shared context. + context: FileRangeContextRef, + /// Index of the row group in the SST. + row_group_idx: usize, + /// Row selection for the row group. `None` means all rows. + row_selection: Option, +} + +impl FileRange { + /// Creates a new [FileRange]. + pub(crate) fn new( + context: FileRangeContextRef, + row_group_idx: usize, + row_selection: Option, + ) -> Self { + Self { + context, + row_group_idx, + row_selection, + } + } + + /// Returns a reader to read the [FileRange]. + #[allow(dead_code)] + pub(crate) async fn reader(&self) -> Result { + let parquet_reader = self + .context + .reader_builder + .build(self.row_group_idx, self.row_selection.clone()) + .await?; + + Ok(RowGroupReader::new(self.context.clone(), parquet_reader)) + } +} + +/// Context shared by ranges of the same parquet SST. +pub(crate) struct FileRangeContext { + // Row group reader builder for the file. + reader_builder: RowGroupReaderBuilder, + /// Filters pushed down. + filters: Vec, + /// Helper to read the SST. + read_format: ReadFormat, + /// Decoder for primary keys + codec: McmpRowCodec, +} + +pub(crate) type FileRangeContextRef = Arc; + +impl FileRangeContext { + /// Creates a new [FileRangeContext]. + pub(crate) fn new( + reader_builder: RowGroupReaderBuilder, + filters: Vec, + read_format: ReadFormat, + codec: McmpRowCodec, + ) -> Self { + Self { + reader_builder, + filters, + read_format, + codec, + } + } + + /// Returns the path of the file to read. + pub(crate) fn file_path(&self) -> &str { + self.reader_builder.file_path() + } + + /// Returns filters pushed down. + pub(crate) fn filters(&self) -> &[SimpleFilterContext] { + &self.filters + } + + /// Returns the format helper. + pub(crate) fn read_format(&self) -> &ReadFormat { + &self.read_format + } + + /// Returns the reader builder. + pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder { + &self.reader_builder + } + + /// TRY THE BEST to perform pushed down predicate precisely on the input batch. + /// Return the filtered batch. If the entire batch is filtered out, return None. + /// + /// Supported filter expr type is defined in [SimpleFilterEvaluator]. + /// + /// When a filter is referencing primary key column, this method will decode + /// the primary key and put it into the batch. + pub(crate) fn precise_filter(&self, mut input: Batch) -> Result> { + let mut mask = BooleanBuffer::new_set(input.num_rows()); + + // Run filter one by one and combine them result + // TODO(ruihang): run primary key filter first. It may short circuit other filters + for filter in &self.filters { + let result = match filter.semantic_type() { + SemanticType::Tag => { + let pk_values = if let Some(pk_values) = input.pk_values() { + pk_values + } else { + input.set_pk_values(self.codec.decode(input.primary_key())?); + input.pk_values().unwrap() + }; + // Safety: this is a primary key + let pk_index = self + .read_format + .metadata() + .primary_key_index(filter.column_id()) + .unwrap(); + let pk_value = pk_values[pk_index] + .try_to_scalar_value(filter.data_type()) + .context(FieldTypeMismatchSnafu)?; + if filter + .filter() + .evaluate_scalar(&pk_value) + .context(FilterRecordBatchSnafu)? + { + continue; + } else { + // PK not match means the entire batch is filtered out. + return Ok(None); + } + } + SemanticType::Field => { + let Some(field_index) = self.read_format.field_index_by_id(filter.column_id()) + else { + continue; + }; + let field_col = &input.fields()[field_index].data; + filter + .filter() + .evaluate_vector(field_col) + .context(FilterRecordBatchSnafu)? + } + SemanticType::Timestamp => filter + .filter() + .evaluate_vector(input.timestamps()) + .context(FilterRecordBatchSnafu)?, + }; + + mask = mask.bitand(&result); + } + + input.filter(&BooleanArray::from(mask).into())?; + + Ok(Some(input)) + } +} diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index dd083047e07c..efc61d89de26 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -121,16 +121,19 @@ pub(crate) struct ReadFormat { /// Field column id to its index in `schema` (SST schema). /// In SST schema, fields are stored in the front of the schema. field_id_to_index: HashMap, + /// Indices of columns to read from the SST. It contains all internal columns. + projection_indices: Vec, /// Field column id to their index in the projected schema ( /// the schema of [Batch]). - /// - /// This field is set at the first call to [convert_record_batch](Self::convert_record_batch). - field_id_to_projected_index: Option>, + field_id_to_projected_index: HashMap, } impl ReadFormat { - /// Creates a helper with existing `metadata`. - pub(crate) fn new(metadata: RegionMetadataRef) -> ReadFormat { + /// Creates a helper with existing `metadata` and `column_ids` to read. + pub(crate) fn new( + metadata: RegionMetadataRef, + column_ids: impl Iterator, + ) -> ReadFormat { let field_id_to_index: HashMap<_, _> = metadata .field_columns() .enumerate() @@ -138,11 +141,42 @@ impl ReadFormat { .collect(); let arrow_schema = to_sst_arrow_schema(&metadata); + // Maps column id of a projected field to its index in SST. + let mut projected_field_id_index: Vec<_> = column_ids + .filter_map(|column_id| { + // Only apply projection to fields. + field_id_to_index + .get(&column_id) + .copied() + .map(|index| (column_id, index)) + }) + .collect(); + let mut projection_indices: Vec<_> = projected_field_id_index + .iter() + .map(|(_column_id, index)| *index) + // We need to add all fixed position columns. + .chain(arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM..arrow_schema.fields.len()) + .collect(); + projection_indices.sort_unstable(); + + // Sort fields by their indices in the SST. Then the order of fields is their order + // in the Batch. + projected_field_id_index.sort_unstable_by_key(|x| x.1); + // Because the SST put fields before other columns, we don't need to consider other + // columns. + let field_id_to_projected_index = projected_field_id_index + .into_iter() + .map(|(column_id, _)| column_id) + .enumerate() + .map(|(index, column_id)| (column_id, index)) + .collect(); + ReadFormat { metadata, arrow_schema, field_id_to_index, - field_id_to_projected_index: None, + projection_indices, + field_id_to_projected_index, } } @@ -159,35 +193,16 @@ impl ReadFormat { &self.metadata } - /// Gets sorted projection indices to read `columns` from parquet files. - /// - /// This function ignores columns not in `metadata` to for compatibility between - /// different schemas. - pub(crate) fn projection_indices( - &self, - columns: impl IntoIterator, - ) -> Vec { - let mut indices: Vec<_> = columns - .into_iter() - .filter_map(|column_id| { - // Only apply projection to fields. - self.field_id_to_index.get(&column_id).copied() - }) - // We need to add all fixed position columns. - .chain( - self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM - ..self.arrow_schema.fields.len(), - ) - .collect(); - indices.sort_unstable(); - indices + /// Gets sorted projection indices to read. + pub(crate) fn projection_indices(&self) -> &[usize] { + &self.projection_indices } /// Convert a arrow record batch into `batches`. /// /// Note that the `record_batch` may only contains a subset of columns if it is projected. pub(crate) fn convert_record_batch( - &mut self, + &self, record_batch: &RecordBatch, batches: &mut VecDeque, ) -> Result<()> { @@ -204,10 +219,6 @@ impl ReadFormat { } ); - if self.field_id_to_projected_index.is_none() { - self.init_id_to_projected_index(record_batch); - } - let mut fixed_pos_columns = record_batch .columns() .iter() @@ -270,19 +281,6 @@ impl ReadFormat { Ok(()) } - fn init_id_to_projected_index(&mut self, record_batch: &RecordBatch) { - let mut name_to_projected_index = HashMap::new(); - for (index, field) in record_batch.schema().fields().iter().enumerate() { - let Some(column) = self.metadata.column_by_name(field.name()) else { - continue; - }; - if column.semantic_type == SemanticType::Field { - name_to_projected_index.insert(column.column_id, index); - } - } - self.field_id_to_projected_index = Some(name_to_projected_index); - } - /// Returns min values of specific column in row groups. pub(crate) fn min_values( &self, @@ -513,13 +511,8 @@ impl ReadFormat { } /// Index of a field column by its column id. - /// This function is only available after the first call to - /// [convert_record_batch](Self::convert_record_batch). Otherwise - /// it always return `None` pub fn field_index_by_id(&self, column_id: ColumnId) -> Option { - self.field_id_to_projected_index - .as_ref() - .and_then(|m| m.get(&column_id).copied()) + self.field_id_to_projected_index.get(&column_id).copied() } } @@ -753,18 +746,18 @@ mod tests { #[test] fn test_projection_indices() { let metadata = build_test_region_metadata(); - let read_format = ReadFormat::new(metadata); // Only read tag1 - assert_eq!(vec![2, 3, 4, 5], read_format.projection_indices([3])); + let read_format = ReadFormat::new(metadata.clone(), [3].iter().copied()); + assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); // Only read field1 - assert_eq!(vec![0, 2, 3, 4, 5], read_format.projection_indices([4])); + let read_format = ReadFormat::new(metadata.clone(), [4].iter().copied()); + assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices()); // Only read ts - assert_eq!(vec![2, 3, 4, 5], read_format.projection_indices([5])); + let read_format = ReadFormat::new(metadata.clone(), [5].iter().copied()); + assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); // Read field0, tag0, ts - assert_eq!( - vec![1, 2, 3, 4, 5], - read_format.projection_indices([2, 1, 5]) - ); + let read_format = ReadFormat::new(metadata, [2, 1, 5].iter().copied()); + assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices()); } #[test] @@ -805,7 +798,12 @@ mod tests { fn test_convert_empty_record_batch() { let metadata = build_test_region_metadata(); let arrow_schema = build_test_arrow_schema(); - let mut read_format = ReadFormat::new(metadata); + let column_ids: Vec<_> = metadata + .column_metadatas + .iter() + .map(|col| col.column_id) + .collect(); + let read_format = ReadFormat::new(metadata, column_ids.iter().copied()); assert_eq!(arrow_schema, *read_format.arrow_schema()); let record_batch = RecordBatch::new_empty(arrow_schema); @@ -819,7 +817,12 @@ mod tests { #[test] fn test_convert_record_batch() { let metadata = build_test_region_metadata(); - let mut read_format = ReadFormat::new(metadata); + let column_ids: Vec<_> = metadata + .column_metadatas + .iter() + .map(|col| col.column_id) + .collect(); + let read_format = ReadFormat::new(metadata, column_ids.iter().copied()); let columns: Vec = vec![ Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1 diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 72ec6c0528dd..dff995b76ba6 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -15,7 +15,6 @@ //! Parquet reader. use std::collections::{BTreeMap, VecDeque}; -use std::ops::BitAnd; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -24,9 +23,9 @@ use async_trait::async_trait; use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; -use datafusion_common::arrow::array::BooleanArray; -use datafusion_common::arrow::buffer::BooleanBuffer; +use datafusion_expr::Expr; use datatypes::arrow::record_batch::RecordBatch; +use datatypes::data_type::ConcreteDataType; use itertools::Itertools; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; @@ -40,17 +39,17 @@ use table::predicate::Predicate; use crate::cache::CacheManagerRef; use crate::error::{ - ArrowReaderSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, InvalidMetadataSnafu, - InvalidParquetSnafu, ReadParquetSnafu, Result, + ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result, }; use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED, }; use crate::read::{Batch, BatchReader}; -use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::row_converter::{McmpRowCodec, SortField}; use crate::sst::file::FileHandle; use crate::sst::index::applier::SstIndexApplierRef; +use crate::sst::parquet::file_range::{FileRange, FileRangeContext, FileRangeContextRef}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::InMemoryRowGroup; @@ -147,10 +146,30 @@ impl ParquetReaderBuilder { self } - /// Builds and initializes a [ParquetReader]. + /// Builds a [ParquetReader]. /// /// This needs to perform IO operation. pub async fn build(&self) -> Result { + let (context, row_groups) = self.build_reader_input().await?; + ParquetReader::new(context, row_groups).await + } + + /// Builds [FileRange]s to read and pushes them to `file_ranges`. + #[allow(dead_code)] + pub async fn build_file_ranges(&self, file_ranges: &mut Vec) -> Result<()> { + let (context, row_groups) = self.build_reader_input().await?; + file_ranges.reserve_exact(row_groups.len()); + for (row_group_idx, row_selection) in row_groups { + let file_range = FileRange::new(context.clone(), row_group_idx, row_selection); + file_ranges.push(file_range); + } + Ok(()) + } + + /// Builds a [FileRangeContext] and collects row groups to read. + /// + /// This needs to perform IO operation. + async fn build_reader_input(&self) -> Result<(FileRangeContextRef, RowGroupMap)> { let start = Instant::now(); let file_path = self.file_handle.file_path(&self.file_dir); @@ -159,18 +178,28 @@ impl ParquetReaderBuilder { let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?; // Decodes region metadata. let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); - let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?; - let read_format = ReadFormat::new(Arc::new(region_meta)); + // Gets the metadata stored in the SST. + let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?); + let read_format = if let Some(column_ids) = &self.projection { + ReadFormat::new(region_meta.clone(), column_ids.iter().copied()) + } else { + // Lists all column ids to read, we always use the expected metadata if possible. + let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta); + ReadFormat::new( + region_meta.clone(), + expected_meta + .column_metadatas + .iter() + .map(|col| col.column_id), + ) + }; // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); - let projection_mask = if let Some(column_ids) = self.projection.as_ref() { - let indices = read_format.projection_indices(column_ids.iter().copied()); - // Now we assumes we don't have nested schemas. - ProjectionMask::roots(parquet_schema_desc, indices) - } else { - ProjectionMask::all() - }; + let indices = read_format.projection_indices(); + // Now we assumes we don't have nested schemas. + // TODO(yingwen): Revisit this if we introduce nested types such as JSON type. + let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied()); // Computes the field levels. let hint = Some(read_format.arrow_schema().fields()); @@ -196,16 +225,21 @@ impl ParquetReaderBuilder { metrics.build_cost = start.elapsed(); - let predicate = if let Some(predicate) = &self.predicate { + let filters = if let Some(predicate) = &self.predicate { predicate .exprs() .iter() - .filter_map(|expr| SimpleFilterEvaluator::try_new(expr.df_expr())) + .filter_map(|expr| { + SimpleFilterContext::new_opt( + ®ion_meta, + self.expected_metadata.as_deref(), + expr.df_expr(), + ) + }) .collect::>() } else { vec![] }; - let codec = McmpRowCodec::new( read_format .metadata() @@ -214,16 +248,8 @@ impl ParquetReaderBuilder { .collect(), ); - Ok(ParquetReader { - row_groups, - read_format, - reader_builder, - predicate, - current_reader: None, - batches: VecDeque::new(), - codec, - metrics, - }) + let context = FileRangeContext::new(reader_builder, filters, read_format, codec); + Ok((Arc::new(context), row_groups)) } /// Decodes region metadata from key value. @@ -451,7 +477,7 @@ struct Metrics { } /// Builder to build a [ParquetRecordBatchReader] for a row group. -struct RowGroupReaderBuilder { +pub(crate) struct RowGroupReaderBuilder { /// SST file to read. /// /// Holds the file handle to avoid the file purge purge it. @@ -472,13 +498,13 @@ struct RowGroupReaderBuilder { impl RowGroupReaderBuilder { /// Path of the file to read. - fn file_path(&self) -> &str { + pub(crate) fn file_path(&self) -> &str { &self.file_path } /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. - async fn build( - &mut self, + pub(crate) async fn build( + &self, row_group_idx: usize, row_selection: Option, ) -> Result { @@ -513,158 +539,285 @@ impl RowGroupReaderBuilder { } } +/// The state of a [ParquetReader]. +enum ReaderState { + /// The reader is reading a row group. + Readable(RowGroupReader), + /// The reader is exhausted. + Exhausted(Metrics), +} + +impl ReaderState { + /// Returns the metrics of the reader. + fn metrics(&self) -> &Metrics { + match self { + ReaderState::Readable(reader) => &reader.metrics, + ReaderState::Exhausted(m) => m, + } + } +} + +/// Context to evaluate the column filter. +pub(crate) struct SimpleFilterContext { + /// Filter to evaluate. + filter: SimpleFilterEvaluator, + /// Id of the column to evaluate. + column_id: ColumnId, + /// Semantic type of the column. + semantic_type: SemanticType, + /// The data type of the column. + data_type: ConcreteDataType, +} + +impl SimpleFilterContext { + /// Creates a context for the `expr`. + /// + /// Returns None if the column to filter doesn't exist in the SST metadata or the + /// expected metadata. + fn new_opt( + sst_meta: &RegionMetadataRef, + expected_meta: Option<&RegionMetadata>, + expr: &Expr, + ) -> Option { + let filter = SimpleFilterEvaluator::try_new(expr)?; + let column_metadata = match expected_meta { + Some(meta) => { + // Gets the column metadata from the expected metadata. + let column = meta.column_by_name(filter.column_name())?; + // Checks if the column is present in the SST metadata. We still uses the + // column from the expected metadata. + let sst_column = sst_meta.column_by_id(column.column_id)?; + debug_assert_eq!(column.semantic_type, sst_column.semantic_type); + + column + } + None => sst_meta.column_by_name(filter.column_name())?, + }; + + Some(Self { + filter, + column_id: column_metadata.column_id, + semantic_type: column_metadata.semantic_type, + data_type: column_metadata.column_schema.data_type.clone(), + }) + } + + /// Returns the filter to evaluate. + pub(crate) fn filter(&self) -> &SimpleFilterEvaluator { + &self.filter + } + + /// Returns the column id. + pub(crate) fn column_id(&self) -> ColumnId { + self.column_id + } + + /// Returns the semantic type of the column. + pub(crate) fn semantic_type(&self) -> SemanticType { + self.semantic_type + } + + /// Returns the data type of the column. + pub(crate) fn data_type(&self) -> &ConcreteDataType { + &self.data_type + } +} + +type RowGroupMap = BTreeMap>; + /// Parquet batch reader to read our SST format. pub struct ParquetReader { + /// File range context. + context: FileRangeContextRef, /// Indices of row groups to read, along with their respective row selections. - row_groups: BTreeMap>, - /// Helper to read record batches. - /// - /// Not `None` if [ParquetReader::stream] is not `None`. - read_format: ReadFormat, - /// Builder to build row group readers. - /// - /// The builder contains the file handle, so don't drop the builder while using - /// the [ParquetReader]. - reader_builder: RowGroupReaderBuilder, - /// Predicate pushed down to this reader. - predicate: Vec, + row_groups: RowGroupMap, /// Reader of current row group. - current_reader: Option, - /// Buffered batches to return. - batches: VecDeque, - /// Decoder for primary keys - codec: McmpRowCodec, - /// Local metrics. - metrics: Metrics, + reader_state: ReaderState, } #[async_trait] impl BatchReader for ParquetReader { async fn next_batch(&mut self) -> Result> { + let ReaderState::Readable(reader) = &mut self.reader_state else { + return Ok(None); + }; + let start = Instant::now(); - if let Some(batch) = self.batches.pop_front() { - self.metrics.scan_cost += start.elapsed(); - self.metrics.num_rows += batch.num_rows(); + // We don't collect the elapsed time if the reader returns an error. + if let Some(batch) = reader.next_batch().await? { + reader.metrics.scan_cost += start.elapsed(); return Ok(Some(batch)); } - // We need to fetch next record batch and convert it to batches. - while self.batches.is_empty() { - let Some(record_batch) = self.fetch_next_record_batch().await? else { - self.metrics.scan_cost += start.elapsed(); - return Ok(None); - }; - self.metrics.num_record_batches += 1; - - self.read_format - .convert_record_batch(&record_batch, &mut self.batches)?; - self.prune_batches()?; - self.metrics.num_batches += self.batches.len(); + // No more items in current row group, reads next row group. + while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() { + let parquet_reader = self + .context + .reader_builder() + .build(row_group_idx, row_selection) + .await?; + // Resets the parquet reader. + reader.reset_reader(parquet_reader); + if let Some(batch) = reader.next_batch().await? { + reader.metrics.scan_cost += start.elapsed(); + return Ok(Some(batch)); + } } - let batch = self.batches.pop_front(); - self.metrics.scan_cost += start.elapsed(); - self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); - Ok(batch) + + // The reader is exhausted. + reader.metrics.scan_cost += start.elapsed(); + self.reader_state = ReaderState::Exhausted(std::mem::take(&mut reader.metrics)); + Ok(None) } } impl Drop for ParquetReader { fn drop(&mut self) { + let metrics = self.reader_state.metrics(); debug!( "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}", - self.reader_builder.file_handle.region_id(), - self.reader_builder.file_handle.file_id(), - self.reader_builder.file_handle.time_range(), - self.metrics.num_row_groups_before_filtering - - self.metrics.num_row_groups_inverted_index_filtered - - self.metrics.num_row_groups_min_max_filtered, - self.metrics.num_row_groups_before_filtering, - self.metrics + self.context.reader_builder().file_handle.region_id(), + self.context.reader_builder().file_handle.file_id(), + self.context.reader_builder().file_handle.time_range(), + metrics.num_row_groups_before_filtering + - metrics.num_row_groups_inverted_index_filtered + - metrics.num_row_groups_min_max_filtered, + metrics.num_row_groups_before_filtering, + metrics ); // Report metrics. READ_STAGE_ELAPSED .with_label_values(&["build_parquet_reader"]) - .observe(self.metrics.build_cost.as_secs_f64()); + .observe(metrics.build_cost.as_secs_f64()); READ_STAGE_ELAPSED .with_label_values(&["scan_row_groups"]) - .observe(self.metrics.scan_cost.as_secs_f64()); + .observe(metrics.scan_cost.as_secs_f64()); READ_ROWS_TOTAL .with_label_values(&["parquet"]) - .inc_by(self.metrics.num_rows as u64); + .inc_by(metrics.num_rows as u64); READ_ROW_GROUPS_TOTAL .with_label_values(&["before_filtering"]) - .inc_by(self.metrics.num_row_groups_before_filtering as u64); + .inc_by(metrics.num_row_groups_before_filtering as u64); READ_ROW_GROUPS_TOTAL .with_label_values(&["inverted_index_filtered"]) - .inc_by(self.metrics.num_row_groups_inverted_index_filtered as u64); + .inc_by(metrics.num_row_groups_inverted_index_filtered as u64); READ_ROW_GROUPS_TOTAL .with_label_values(&["minmax_index_filtered"]) - .inc_by(self.metrics.num_row_groups_min_max_filtered as u64); + .inc_by(metrics.num_row_groups_min_max_filtered as u64); PRECISE_FILTER_ROWS_TOTAL .with_label_values(&["parquet"]) - .inc_by(self.metrics.num_rows_precise_filtered as u64); + .inc_by(metrics.num_rows_precise_filtered as u64); READ_ROWS_IN_ROW_GROUP_TOTAL .with_label_values(&["before_filtering"]) - .inc_by(self.metrics.num_rows_in_row_group_before_filtering as u64); + .inc_by(metrics.num_rows_in_row_group_before_filtering as u64); READ_ROWS_IN_ROW_GROUP_TOTAL .with_label_values(&["inverted_index_filtered"]) - .inc_by(self.metrics.num_rows_in_row_group_inverted_index_filtered as u64); + .inc_by(metrics.num_rows_in_row_group_inverted_index_filtered as u64); } } impl ParquetReader { + /// Creates a new reader. + async fn new( + context: FileRangeContextRef, + mut row_groups: BTreeMap>, + ) -> Result { + // No more items in current row group, reads next row group. + let reader_state = if let Some((row_group_idx, row_selection)) = row_groups.pop_first() { + let parquet_reader = context + .reader_builder() + .build(row_group_idx, row_selection) + .await?; + ReaderState::Readable(RowGroupReader::new(context.clone(), parquet_reader)) + } else { + ReaderState::Exhausted(Metrics::default()) + }; + + Ok(ParquetReader { + context, + row_groups, + reader_state, + }) + } + /// Returns the metadata of the SST. pub fn metadata(&self) -> &RegionMetadataRef { - self.read_format.metadata() + self.context.read_format().metadata() } - /// Tries to fetch next [RecordBatch] from the reader. - /// - /// If the reader is exhausted, reads next row group. - async fn fetch_next_record_batch(&mut self) -> Result> { - if let Some(row_group_reader) = &mut self.current_reader { - if let Some(record_batch) = - row_group_reader - .next() - .transpose() - .context(ArrowReaderSnafu { - path: self.reader_builder.file_path(), - })? - { - return Ok(Some(record_batch)); - } + #[cfg(test)] + pub fn parquet_metadata(&self) -> Arc { + self.context.reader_builder().parquet_meta.clone() + } +} + +/// Reader to read a row group of a parquet file. +pub(crate) struct RowGroupReader { + /// Context for file ranges. + context: FileRangeContextRef, + /// Inner parquet reader. + reader: ParquetRecordBatchReader, + /// Buffered batches to return. + batches: VecDeque, + /// Local scan metrics. + metrics: Metrics, +} + +impl RowGroupReader { + /// Creates a new reader. + pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self { + Self { + context, + reader, + batches: VecDeque::new(), + metrics: Metrics::default(), } + } - // No more items in current row group, reads next row group. - while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() { - let mut row_group_reader = self - .reader_builder - .build(row_group_idx, row_selection) - .await?; - let Some(record_batch) = - row_group_reader - .next() - .transpose() - .context(ArrowReaderSnafu { - path: self.reader_builder.file_path(), - })? - else { - continue; + /// Resets the parquet reader. + fn reset_reader(&mut self, reader: ParquetRecordBatchReader) { + self.reader = reader; + } + + /// Tries to fetch next [Batch] from the reader. + async fn next_batch(&mut self) -> Result> { + if let Some(batch) = self.batches.pop_front() { + self.metrics.num_rows += batch.num_rows(); + return Ok(Some(batch)); + } + + // We need to fetch next record batch and convert it to batches. + while self.batches.is_empty() { + let Some(record_batch) = self.fetch_next_record_batch()? else { + return Ok(None); }; + self.metrics.num_record_batches += 1; - // Sets current reader to this reader. - self.current_reader = Some(row_group_reader); - return Ok(Some(record_batch)); + self.context + .read_format() + .convert_record_batch(&record_batch, &mut self.batches)?; + self.prune_batches()?; + self.metrics.num_batches += self.batches.len(); } + let batch = self.batches.pop_front(); + self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); + Ok(batch) + } - Ok(None) + /// Tries to fetch next [RecordBatch] from the reader. + /// + /// If the reader is exhausted, reads next row group. + fn fetch_next_record_batch(&mut self) -> Result> { + self.reader.next().transpose().context(ArrowReaderSnafu { + path: self.context.file_path(), + }) } /// Prunes batches by the pushed down predicate. fn prune_batches(&mut self) -> Result<()> { // fast path - if self.predicate.is_empty() { + if self.context.filters().is_empty() { return Ok(()); } @@ -672,7 +825,7 @@ impl ParquetReader { let batches = std::mem::take(&mut self.batches); for batch in batches { let num_rows_before_filter = batch.num_rows(); - let Some(batch_filtered) = self.precise_filter(batch)? else { + let Some(batch_filtered) = self.context.precise_filter(batch)? else { // the entire batch is filtered out self.metrics.num_rows_precise_filtered += num_rows_before_filter; continue; @@ -690,81 +843,4 @@ impl ParquetReader { Ok(()) } - - /// TRY THE BEST to perform pushed down predicate precisely on the input batch. - /// Return the filtered batch. If the entire batch is filtered out, return None. - /// - /// Supported filter expr type is defined in [SimpleFilterEvaluator]. - /// - /// When a filter is referencing primary key column, this method will decode - /// the primary key and put it into the batch. - fn precise_filter(&self, mut input: Batch) -> Result> { - let mut mask = BooleanBuffer::new_set(input.num_rows()); - - // Run filter one by one and combine them result - // TODO(ruihang): run primary key filter first. It may short circuit other filters - for filter in &self.predicate { - let column_name = filter.column_name(); - let Some(column_metadata) = self.read_format.metadata().column_by_name(column_name) - else { - // column not found, skip - // in situation like an column is added later - continue; - }; - let result = match column_metadata.semantic_type { - SemanticType::Tag => { - let pk_values = if let Some(pk_values) = input.pk_values() { - pk_values - } else { - input.set_pk_values(self.codec.decode(input.primary_key())?); - input.pk_values().unwrap() - }; - // Safety: this is a primary key - let pk_index = self - .read_format - .metadata() - .primary_key_index(column_metadata.column_id) - .unwrap(); - let pk_value = pk_values[pk_index] - .try_to_scalar_value(&column_metadata.column_schema.data_type) - .context(FieldTypeMismatchSnafu)?; - if filter - .evaluate_scalar(&pk_value) - .context(FilterRecordBatchSnafu)? - { - continue; - } else { - // PK not match means the entire batch is filtered out. - return Ok(None); - } - } - SemanticType::Field => { - let Some(field_index) = self - .read_format - .field_index_by_id(column_metadata.column_id) - else { - continue; - }; - let field_col = &input.fields()[field_index].data; - filter - .evaluate_vector(field_col) - .context(FilterRecordBatchSnafu)? - } - SemanticType::Timestamp => filter - .evaluate_vector(input.timestamps()) - .context(FilterRecordBatchSnafu)?, - }; - - mask = mask.bitand(&result); - } - - input.filter(&BooleanArray::from(mask).into())?; - - Ok(Some(input)) - } - - #[cfg(test)] - pub fn parquet_metadata(&self) -> Arc { - self.reader_builder.parquet_meta.clone() - } }