From 48054c032b60f32bb80a795c40ab73815539e52b Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 26 Apr 2024 15:32:43 +0800 Subject: [PATCH 01/14] chore: change `&mut self` to `&self` --- src/mito2/src/sst/parquet/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 72ec6c0528dd..b37f76b91287 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -478,7 +478,7 @@ impl RowGroupReaderBuilder { /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. async fn build( - &mut self, + &self, row_group_idx: usize, row_selection: Option, ) -> Result { From 647cd55994a62be35c00fb2802c97ca9a3fdda89 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 26 Apr 2024 17:02:24 +0800 Subject: [PATCH 02/14] feat: define partition and partition context --- src/mito2/src/sst/parquet.rs | 1 + src/mito2/src/sst/parquet/partition.rs | 85 ++++++++++++++++++++++++++ src/mito2/src/sst/parquet/reader.rs | 4 +- 3 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 src/mito2/src/sst/parquet/partition.rs diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index b0b72bde3bf1..101f1942fe2a 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -18,6 +18,7 @@ mod format; pub(crate) mod helper; pub(crate) mod metadata; mod page_reader; +pub(crate) mod partition; pub mod reader; pub mod row_group; mod row_selection; diff --git a/src/mito2/src/sst/parquet/partition.rs b/src/mito2/src/sst/parquet/partition.rs new file mode 100644 index 000000000000..b48dca53cb0b --- /dev/null +++ b/src/mito2/src/sst/parquet/partition.rs @@ -0,0 +1,85 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Structs and functions for reading partitions from a parquet file. A partition +//! is usually a row group in a parquet file. + +use std::sync::Arc; + +use common_recordbatch::filter::SimpleFilterEvaluator; +use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; + +use crate::error::Result; +use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::reader::RowGroupReaderBuilder; + +/// A partition of a parquet SST. Now it is a row group. +pub(crate) struct Partition { + /// Shared context. + context: PartitionContextRef, + /// Index of the row group in the SST. + row_group_idx: usize, + /// Row selection for the row group. `None` means all rows. + row_selection: Option, +} + +impl Partition { + /// Creates a new partition. + pub(crate) fn new( + context: PartitionContextRef, + row_group_idx: usize, + row_selection: Option, + ) -> Self { + Self { + context, + row_group_idx, + row_selection, + } + } + + /// Returns a reader to read the partition. + pub(crate) async fn reader(&self) -> Result { + self.context + .reader_builder + .build(self.row_group_idx, self.row_selection.clone()) + .await + } +} + +/// Context shared by partitions of the same parquet SST. +pub(crate) struct PartitionContext { + // Row group reader builder for the file. + reader_builder: RowGroupReaderBuilder, + /// Filters pushed down. + filters: Vec, + /// Helper to read the SST. + format: ReadFormat, +} + +pub(crate) type PartitionContextRef = Arc; + +impl PartitionContext { + /// Creates a new partition context. + pub(crate) fn new( + reader_builder: RowGroupReaderBuilder, + filters: Vec, + format: ReadFormat, + ) -> Self { + Self { + reader_builder, + filters, + format, + } + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index b37f76b91287..459336eec521 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -451,7 +451,7 @@ struct Metrics { } /// Builder to build a [ParquetRecordBatchReader] for a row group. -struct RowGroupReaderBuilder { +pub(crate) struct RowGroupReaderBuilder { /// SST file to read. /// /// Holds the file handle to avoid the file purge purge it. @@ -477,7 +477,7 @@ impl RowGroupReaderBuilder { } /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. - async fn build( + pub(crate) async fn build( &self, row_group_idx: usize, row_selection: Option, From a6302fc26a58a807fb5cc6cf9c5f95b9bcb41d03 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 28 Apr 2024 15:21:09 +0800 Subject: [PATCH 03/14] refactor: move precise_filter to PartitionContext --- src/mito2/src/sst/parquet/partition.rs | 106 ++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/sst/parquet/partition.rs b/src/mito2/src/sst/parquet/partition.rs index b48dca53cb0b..b5c2dec868ee 100644 --- a/src/mito2/src/sst/parquet/partition.rs +++ b/src/mito2/src/sst/parquet/partition.rs @@ -15,12 +15,19 @@ //! Structs and functions for reading partitions from a parquet file. A partition //! is usually a row group in a parquet file. +use std::ops::BitAnd; use std::sync::Arc; +use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; +use datatypes::arrow::array::BooleanArray; +use datatypes::arrow::buffer::BooleanBuffer; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; +use snafu::ResultExt; -use crate::error::Result; +use crate::error::{FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result}; +use crate::read::Batch; +use crate::row_converter::{McmpRowCodec, RowCodec}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::RowGroupReaderBuilder; @@ -64,7 +71,9 @@ pub(crate) struct PartitionContext { /// Filters pushed down. filters: Vec, /// Helper to read the SST. - format: ReadFormat, + read_format: ReadFormat, + /// Decoder for primary keys + codec: McmpRowCodec, } pub(crate) type PartitionContextRef = Arc; @@ -74,12 +83,101 @@ impl PartitionContext { pub(crate) fn new( reader_builder: RowGroupReaderBuilder, filters: Vec, - format: ReadFormat, + read_format: ReadFormat, + codec: McmpRowCodec, ) -> Self { Self { reader_builder, filters, - format, + read_format, + codec, } } + + /// Returns the path of the file to read. + pub(crate) fn file_path(&self) -> &str { + self.reader_builder.file_path() + } + + /// Returns filters pushed down. + pub(crate) fn filters(&self) -> &[SimpleFilterEvaluator] { + &self.filters + } + + /// Returns the format helper. + pub(crate) fn read_format(&self) -> &ReadFormat { + &self.read_format + } + + /// TRY THE BEST to perform pushed down predicate precisely on the input batch. + /// Return the filtered batch. If the entire batch is filtered out, return None. + /// + /// Supported filter expr type is defined in [SimpleFilterEvaluator]. + /// + /// When a filter is referencing primary key column, this method will decode + /// the primary key and put it into the batch. + pub(crate) fn precise_filter(&self, mut input: Batch) -> Result> { + let mut mask = BooleanBuffer::new_set(input.num_rows()); + + // Run filter one by one and combine them result + // TODO(ruihang): run primary key filter first. It may short circuit other filters + for filter in &self.filters { + let column_name = filter.column_name(); + let Some(column_metadata) = self.read_format.metadata().column_by_name(column_name) + else { + // column not found, skip + // in situation like an column is added later + continue; + }; + let result = match column_metadata.semantic_type { + SemanticType::Tag => { + let pk_values = if let Some(pk_values) = input.pk_values() { + pk_values + } else { + input.set_pk_values(self.codec.decode(input.primary_key())?); + input.pk_values().unwrap() + }; + // Safety: this is a primary key + let pk_index = self + .read_format + .metadata() + .primary_key_index(column_metadata.column_id) + .unwrap(); + let pk_value = pk_values[pk_index] + .try_to_scalar_value(&column_metadata.column_schema.data_type) + .context(FieldTypeMismatchSnafu)?; + if filter + .evaluate_scalar(&pk_value) + .context(FilterRecordBatchSnafu)? + { + continue; + } else { + // PK not match means the entire batch is filtered out. + return Ok(None); + } + } + SemanticType::Field => { + let Some(field_index) = self + .read_format + .field_index_by_id(column_metadata.column_id) + else { + continue; + }; + let field_col = &input.fields()[field_index].data; + filter + .evaluate_vector(field_col) + .context(FilterRecordBatchSnafu)? + } + SemanticType::Timestamp => filter + .evaluate_vector(input.timestamps()) + .context(FilterRecordBatchSnafu)?, + }; + + mask = mask.bitand(&result); + } + + input.filter(&BooleanArray::from(mask).into())?; + + Ok(Some(input)) + } } From 628ad7925f1f06f08742ca0380f4593701058d52 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 28 Apr 2024 15:54:46 +0800 Subject: [PATCH 04/14] feat: filter wip --- src/mito2/src/sst/parquet/partition.rs | 1 + src/mito2/src/sst/parquet/reader.rs | 99 +++++++++++++++++++++++++- 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/sst/parquet/partition.rs b/src/mito2/src/sst/parquet/partition.rs index b5c2dec868ee..59a33462ba17 100644 --- a/src/mito2/src/sst/parquet/partition.rs +++ b/src/mito2/src/sst/parquet/partition.rs @@ -119,6 +119,7 @@ impl PartitionContext { pub(crate) fn precise_filter(&self, mut input: Batch) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); + // FIXME(yingwen): We should use expected metadata to get the column id. // Run filter one by one and combine them result // TODO(ruihang): run primary key filter first. It may short circuit other filters for filter in &self.filters { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 459336eec521..50b5c3a86a83 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -53,6 +53,7 @@ use crate::sst::file::FileHandle; use crate::sst::index::applier::SstIndexApplierRef; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::metadata::MetadataLoader; +use crate::sst::parquet::partition::PartitionContextRef; use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::row_selection::row_selection_from_row_ranges; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -423,6 +424,20 @@ impl ParquetReaderBuilder { } } +/// Simple filter with additional information for evaluating. +struct SimpleFilterContext { + /// The filter. + filter: SimpleFilterEvaluator, + /// The column id in the expected region metadata. + column_id: ColumnId, + /// The semantic type of the column. + semantic_type: SemanticType, + /// The index of the column in [Batch]. + /// If the semantic type is a tag, this is the index in the primary key. + /// If the semantic type is a field, this is the index in fields. + index_in_batch: usize, +} + /// Parquet reader metrics. #[derive(Debug, Default)] struct Metrics { @@ -472,7 +487,7 @@ pub(crate) struct RowGroupReaderBuilder { impl RowGroupReaderBuilder { /// Path of the file to read. - fn file_path(&self) -> &str { + pub(crate) fn file_path(&self) -> &str { &self.file_path } @@ -768,3 +783,85 @@ impl ParquetReader { self.reader_builder.parquet_meta.clone() } } + +/// Reader to read a row group of a parquet file. +pub(crate) struct ParquetRowGroupReader { + /// Inner parquet reader. + reader: ParquetRecordBatchReader, + /// Context of partitions. + context: PartitionContextRef, + /// Buffered batches to return. + batches: VecDeque, + /// Scan metrics. + metrics: Metrics, +} + +impl ParquetRowGroupReader { + /// Tries to fetch next [Batch] from the reader. + async fn next_batch(&mut self) -> Result> { + let start = Instant::now(); + if let Some(batch) = self.batches.pop_front() { + self.metrics.scan_cost += start.elapsed(); + self.metrics.num_rows += batch.num_rows(); + return Ok(Some(batch)); + } + + // We need to fetch next record batch and convert it to batches. + while self.batches.is_empty() { + let Some(record_batch) = self.fetch_next_record_batch()? else { + self.metrics.scan_cost += start.elapsed(); + return Ok(None); + }; + self.metrics.num_record_batches += 1; + + self.context + .read_format() + .convert_record_batch(&record_batch, &mut self.batches)?; + self.prune_batches()?; + self.metrics.num_batches += self.batches.len(); + } + let batch = self.batches.pop_front(); + self.metrics.scan_cost += start.elapsed(); + self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); + Ok(batch) + } + + /// Tries to fetch next [RecordBatch] from the reader. + /// + /// If the reader is exhausted, reads next row group. + fn fetch_next_record_batch(&mut self) -> Result> { + self.reader.next().transpose().context(ArrowReaderSnafu { + path: self.context.file_path(), + }) + } + + /// Prunes batches by the pushed down predicate. + fn prune_batches(&mut self) -> Result<()> { + // fast path + if self.context.filters().is_empty() { + return Ok(()); + } + + let mut new_batches = VecDeque::new(); + let batches = std::mem::take(&mut self.batches); + for batch in batches { + let num_rows_before_filter = batch.num_rows(); + let Some(batch_filtered) = self.context.precise_filter(batch)? else { + // the entire batch is filtered out + self.metrics.num_rows_precise_filtered += num_rows_before_filter; + continue; + }; + + // update metric + let filtered_rows = num_rows_before_filter - batch_filtered.num_rows(); + self.metrics.num_rows_precise_filtered += filtered_rows; + + if !batch_filtered.is_empty() { + new_batches.push_back(batch_filtered); + } + } + self.batches = new_batches; + + Ok(()) + } +} From 7c9dadfdc187e1676c48c60b56bbbe5977ef05bf Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 28 Apr 2024 20:57:08 +0800 Subject: [PATCH 05/14] feat: compute projection and fields in format --- src/mito2/src/sst/parquet/format.rs | 125 ++++++++++++++-------------- src/mito2/src/sst/parquet/reader.rs | 38 ++++----- 2 files changed, 79 insertions(+), 84 deletions(-) diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index dd083047e07c..11a354c0fc7b 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -121,16 +121,16 @@ pub(crate) struct ReadFormat { /// Field column id to its index in `schema` (SST schema). /// In SST schema, fields are stored in the front of the schema. field_id_to_index: HashMap, + /// Indices of columns to read from the SST. It contains all internal columns. + projection_indices: Vec, /// Field column id to their index in the projected schema ( /// the schema of [Batch]). - /// - /// This field is set at the first call to [convert_record_batch](Self::convert_record_batch). - field_id_to_projected_index: Option>, + field_id_to_projected_index: HashMap, } impl ReadFormat { - /// Creates a helper with existing `metadata`. - pub(crate) fn new(metadata: RegionMetadataRef) -> ReadFormat { + /// Creates a helper with existing `metadata` and `column_ids` to read. + pub(crate) fn new(metadata: RegionMetadataRef, column_ids: &[ColumnId]) -> ReadFormat { let field_id_to_index: HashMap<_, _> = metadata .field_columns() .enumerate() @@ -138,11 +138,43 @@ impl ReadFormat { .collect(); let arrow_schema = to_sst_arrow_schema(&metadata); + // Maps column id of a projected field to its index in SST. + let mut projected_field_id_index: Vec<_> = column_ids + .into_iter() + .filter_map(|column_id| { + // Only apply projection to fields. + field_id_to_index + .get(column_id) + .copied() + .map(|index| (*column_id, index)) + }) + .collect(); + let mut projection_indices: Vec<_> = projected_field_id_index + .iter() + .map(|(_column_id, index)| *index) + // We need to add all fixed position columns. + .chain(arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM..arrow_schema.fields.len()) + .collect(); + projection_indices.sort_unstable(); + + // Sort fields by their indices in the SST. Then the order of fields is their order + // in the Batch. + projected_field_id_index.sort_unstable_by_key(|x| x.1); + // Because the SST put fields before other columns, we don't need to consider other + // columns. + let field_id_to_projected_index = projected_field_id_index + .into_iter() + .map(|(column_id, _)| column_id) + .enumerate() + .map(|(index, column_id)| (column_id, index)) + .collect(); + ReadFormat { metadata, arrow_schema, field_id_to_index, - field_id_to_projected_index: None, + projection_indices, + field_id_to_projected_index, } } @@ -159,35 +191,16 @@ impl ReadFormat { &self.metadata } - /// Gets sorted projection indices to read `columns` from parquet files. - /// - /// This function ignores columns not in `metadata` to for compatibility between - /// different schemas. - pub(crate) fn projection_indices( - &self, - columns: impl IntoIterator, - ) -> Vec { - let mut indices: Vec<_> = columns - .into_iter() - .filter_map(|column_id| { - // Only apply projection to fields. - self.field_id_to_index.get(&column_id).copied() - }) - // We need to add all fixed position columns. - .chain( - self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM - ..self.arrow_schema.fields.len(), - ) - .collect(); - indices.sort_unstable(); - indices + /// Gets sorted projection indices to read. + pub(crate) fn projection_indices(&self) -> &[usize] { + &self.projection_indices } /// Convert a arrow record batch into `batches`. /// /// Note that the `record_batch` may only contains a subset of columns if it is projected. pub(crate) fn convert_record_batch( - &mut self, + &self, record_batch: &RecordBatch, batches: &mut VecDeque, ) -> Result<()> { @@ -204,10 +217,6 @@ impl ReadFormat { } ); - if self.field_id_to_projected_index.is_none() { - self.init_id_to_projected_index(record_batch); - } - let mut fixed_pos_columns = record_batch .columns() .iter() @@ -270,19 +279,6 @@ impl ReadFormat { Ok(()) } - fn init_id_to_projected_index(&mut self, record_batch: &RecordBatch) { - let mut name_to_projected_index = HashMap::new(); - for (index, field) in record_batch.schema().fields().iter().enumerate() { - let Some(column) = self.metadata.column_by_name(field.name()) else { - continue; - }; - if column.semantic_type == SemanticType::Field { - name_to_projected_index.insert(column.column_id, index); - } - } - self.field_id_to_projected_index = Some(name_to_projected_index); - } - /// Returns min values of specific column in row groups. pub(crate) fn min_values( &self, @@ -513,13 +509,8 @@ impl ReadFormat { } /// Index of a field column by its column id. - /// This function is only available after the first call to - /// [convert_record_batch](Self::convert_record_batch). Otherwise - /// it always return `None` pub fn field_index_by_id(&self, column_id: ColumnId) -> Option { - self.field_id_to_projected_index - .as_ref() - .and_then(|m| m.get(&column_id).copied()) + self.field_id_to_projected_index.get(&column_id).copied() } } @@ -753,18 +744,18 @@ mod tests { #[test] fn test_projection_indices() { let metadata = build_test_region_metadata(); - let read_format = ReadFormat::new(metadata); // Only read tag1 - assert_eq!(vec![2, 3, 4, 5], read_format.projection_indices([3])); + let read_format = ReadFormat::new(metadata.clone(), &[3]); + assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); // Only read field1 - assert_eq!(vec![0, 2, 3, 4, 5], read_format.projection_indices([4])); + let read_format = ReadFormat::new(metadata.clone(), &[4]); + assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices()); // Only read ts - assert_eq!(vec![2, 3, 4, 5], read_format.projection_indices([5])); + let read_format = ReadFormat::new(metadata.clone(), &[5]); + assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); // Read field0, tag0, ts - assert_eq!( - vec![1, 2, 3, 4, 5], - read_format.projection_indices([2, 1, 5]) - ); + let read_format = ReadFormat::new(metadata, &[2, 1, 5]); + assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices()); } #[test] @@ -805,7 +796,12 @@ mod tests { fn test_convert_empty_record_batch() { let metadata = build_test_region_metadata(); let arrow_schema = build_test_arrow_schema(); - let mut read_format = ReadFormat::new(metadata); + let column_ids: Vec<_> = metadata + .column_metadatas + .iter() + .map(|col| col.column_id) + .collect(); + let read_format = ReadFormat::new(metadata, &column_ids); assert_eq!(arrow_schema, *read_format.arrow_schema()); let record_batch = RecordBatch::new_empty(arrow_schema); @@ -819,7 +815,12 @@ mod tests { #[test] fn test_convert_record_batch() { let metadata = build_test_region_metadata(); - let mut read_format = ReadFormat::new(metadata); + let column_ids: Vec<_> = metadata + .column_metadatas + .iter() + .map(|col| col.column_id) + .collect(); + let read_format = ReadFormat::new(metadata, &column_ids); let columns: Vec = vec![ Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1 diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 50b5c3a86a83..04b7d36cec9e 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -161,17 +161,25 @@ impl ParquetReaderBuilder { // Decodes region metadata. let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?; - let read_format = ReadFormat::new(Arc::new(region_meta)); + // Always list all column ids to read. + let column_ids = self.projection.clone().unwrap_or_else(|| { + let metadata = self + .expected_metadata + .as_deref() + .unwrap_or_else(|| ®ion_meta); + metadata + .column_metadatas + .iter() + .map(|col| col.column_id) + .collect() + }); + let read_format = ReadFormat::new(Arc::new(region_meta), &column_ids); // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); - let projection_mask = if let Some(column_ids) = self.projection.as_ref() { - let indices = read_format.projection_indices(column_ids.iter().copied()); - // Now we assumes we don't have nested schemas. - ProjectionMask::roots(parquet_schema_desc, indices) - } else { - ProjectionMask::all() - }; + let indices = read_format.projection_indices(); + // Now we assumes we don't have nested schemas. + let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied()); // Computes the field levels. let hint = Some(read_format.arrow_schema().fields()); @@ -424,20 +432,6 @@ impl ParquetReaderBuilder { } } -/// Simple filter with additional information for evaluating. -struct SimpleFilterContext { - /// The filter. - filter: SimpleFilterEvaluator, - /// The column id in the expected region metadata. - column_id: ColumnId, - /// The semantic type of the column. - semantic_type: SemanticType, - /// The index of the column in [Batch]. - /// If the semantic type is a tag, this is the index in the primary key. - /// If the semantic type is a field, this is the index in fields. - index_in_batch: usize, -} - /// Parquet reader metrics. #[derive(Debug, Default)] struct Metrics { From 7f1bbd542bfa18cebd8ac87462067f77bf88a0b0 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 28 Apr 2024 22:24:01 +0800 Subject: [PATCH 06/14] feat: use RowGroupReader to implement ParquetReader --- src/mito2/src/sst/parquet/partition.rs | 7 +- src/mito2/src/sst/parquet/reader.rs | 336 ++++++++----------------- 2 files changed, 117 insertions(+), 226 deletions(-) diff --git a/src/mito2/src/sst/parquet/partition.rs b/src/mito2/src/sst/parquet/partition.rs index 59a33462ba17..06d1476ffb24 100644 --- a/src/mito2/src/sst/parquet/partition.rs +++ b/src/mito2/src/sst/parquet/partition.rs @@ -109,6 +109,11 @@ impl PartitionContext { &self.read_format } + /// Returns the reader builder. + pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder { + &self.reader_builder + } + /// TRY THE BEST to perform pushed down predicate precisely on the input batch. /// Return the filtered batch. If the entire batch is filtered out, return None. /// @@ -119,11 +124,11 @@ impl PartitionContext { pub(crate) fn precise_filter(&self, mut input: Batch) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); - // FIXME(yingwen): We should use expected metadata to get the column id. // Run filter one by one and combine them result // TODO(ruihang): run primary key filter first. It may short circuit other filters for filter in &self.filters { let column_name = filter.column_name(); + // FIXME(yingwen): We should use expected metadata to get the column id. let Some(column_metadata) = self.read_format.metadata().column_by_name(column_name) else { // column not found, skip diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 04b7d36cec9e..06206c29f7b5 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -15,17 +15,13 @@ //! Parquet reader. use std::collections::{BTreeMap, VecDeque}; -use std::ops::BitAnd; use std::sync::Arc; use std::time::{Duration, Instant}; -use api::v1::SemanticType; use async_trait::async_trait; use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; -use datafusion_common::arrow::array::BooleanArray; -use datafusion_common::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use itertools::Itertools; use object_store::ObjectStore; @@ -40,20 +36,19 @@ use table::predicate::Predicate; use crate::cache::CacheManagerRef; use crate::error::{ - ArrowReaderSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, InvalidMetadataSnafu, - InvalidParquetSnafu, ReadParquetSnafu, Result, + ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result, }; use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED, }; use crate::read::{Batch, BatchReader}; -use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::row_converter::{McmpRowCodec, SortField}; use crate::sst::file::FileHandle; use crate::sst::index::applier::SstIndexApplierRef; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::metadata::MetadataLoader; -use crate::sst::parquet::partition::PartitionContextRef; +use crate::sst::parquet::partition::{PartitionContext, PartitionContextRef}; use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::row_selection::row_selection_from_row_ranges; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -205,7 +200,7 @@ impl ParquetReaderBuilder { metrics.build_cost = start.elapsed(); - let predicate = if let Some(predicate) = &self.predicate { + let filters = if let Some(predicate) = &self.predicate { predicate .exprs() .iter() @@ -214,7 +209,6 @@ impl ParquetReaderBuilder { } else { vec![] }; - let codec = McmpRowCodec::new( read_format .metadata() @@ -223,16 +217,8 @@ impl ParquetReaderBuilder { .collect(), ); - Ok(ParquetReader { - row_groups, - read_format, - reader_builder, - predicate, - current_reader: None, - batches: VecDeque::new(), - codec, - metrics, - }) + let context = PartitionContext::new(reader_builder, filters, read_format, codec); + ParquetReader::new(Arc::new(context), row_groups).await } /// Decodes region metadata from key value. @@ -522,280 +508,182 @@ impl RowGroupReaderBuilder { } } +/// The state of a [ParquetReader]. +enum ReaderState { + /// The reader is reading a row group. + Readable(RowGroupReader), + /// The reader is exhausted. + Exhausted(Metrics), +} + +impl ReaderState { + /// Returns the metrics of the reader. + fn metrics(&self) -> &Metrics { + match self { + ReaderState::Readable(reader) => &reader.metrics, + ReaderState::Exhausted(m) => &m, + } + } +} + /// Parquet batch reader to read our SST format. pub struct ParquetReader { + /// Partition context. + context: PartitionContextRef, /// Indices of row groups to read, along with their respective row selections. row_groups: BTreeMap>, - /// Helper to read record batches. - /// - /// Not `None` if [ParquetReader::stream] is not `None`. - read_format: ReadFormat, - /// Builder to build row group readers. - /// - /// The builder contains the file handle, so don't drop the builder while using - /// the [ParquetReader]. - reader_builder: RowGroupReaderBuilder, - /// Predicate pushed down to this reader. - predicate: Vec, /// Reader of current row group. - current_reader: Option, - /// Buffered batches to return. - batches: VecDeque, - /// Decoder for primary keys - codec: McmpRowCodec, - /// Local metrics. - metrics: Metrics, + reader_state: ReaderState, } #[async_trait] impl BatchReader for ParquetReader { async fn next_batch(&mut self) -> Result> { + let ReaderState::Readable(reader) = &mut self.reader_state else { + return Ok(None); + }; + let start = Instant::now(); - if let Some(batch) = self.batches.pop_front() { - self.metrics.scan_cost += start.elapsed(); - self.metrics.num_rows += batch.num_rows(); + // We don't collect the elapsed time if the reader returns an error. + if let Some(batch) = reader.next_batch().await? { + reader.metrics.scan_cost += start.elapsed(); return Ok(Some(batch)); } - // We need to fetch next record batch and convert it to batches. - while self.batches.is_empty() { - let Some(record_batch) = self.fetch_next_record_batch().await? else { - self.metrics.scan_cost += start.elapsed(); - return Ok(None); - }; - self.metrics.num_record_batches += 1; - - self.read_format - .convert_record_batch(&record_batch, &mut self.batches)?; - self.prune_batches()?; - self.metrics.num_batches += self.batches.len(); + // No more items in current row group, reads next row group. + while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() { + let parquet_reader = self + .context + .reader_builder() + .build(row_group_idx, row_selection) + .await?; + // Resets the parquet reader. + reader.reset_reader(parquet_reader); + if let Some(batch) = reader.next_batch().await? { + reader.metrics.scan_cost += start.elapsed(); + return Ok(Some(batch)); + } } - let batch = self.batches.pop_front(); - self.metrics.scan_cost += start.elapsed(); - self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); - Ok(batch) + + // The reader is exhaused. + reader.metrics.scan_cost += start.elapsed(); + self.reader_state = ReaderState::Exhausted(std::mem::take(&mut reader.metrics)); + Ok(None) } } impl Drop for ParquetReader { fn drop(&mut self) { + let metrics = self.reader_state.metrics(); debug!( "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}", - self.reader_builder.file_handle.region_id(), - self.reader_builder.file_handle.file_id(), - self.reader_builder.file_handle.time_range(), - self.metrics.num_row_groups_before_filtering - - self.metrics.num_row_groups_inverted_index_filtered - - self.metrics.num_row_groups_min_max_filtered, - self.metrics.num_row_groups_before_filtering, - self.metrics + self.context.reader_builder().file_handle.region_id(), + self.context.reader_builder().file_handle.file_id(), + self.context.reader_builder().file_handle.time_range(), + metrics.num_row_groups_before_filtering + - metrics.num_row_groups_inverted_index_filtered + - metrics.num_row_groups_min_max_filtered, + metrics.num_row_groups_before_filtering, + metrics ); // Report metrics. READ_STAGE_ELAPSED .with_label_values(&["build_parquet_reader"]) - .observe(self.metrics.build_cost.as_secs_f64()); + .observe(metrics.build_cost.as_secs_f64()); READ_STAGE_ELAPSED .with_label_values(&["scan_row_groups"]) - .observe(self.metrics.scan_cost.as_secs_f64()); + .observe(metrics.scan_cost.as_secs_f64()); READ_ROWS_TOTAL .with_label_values(&["parquet"]) - .inc_by(self.metrics.num_rows as u64); + .inc_by(metrics.num_rows as u64); READ_ROW_GROUPS_TOTAL .with_label_values(&["before_filtering"]) - .inc_by(self.metrics.num_row_groups_before_filtering as u64); + .inc_by(metrics.num_row_groups_before_filtering as u64); READ_ROW_GROUPS_TOTAL .with_label_values(&["inverted_index_filtered"]) - .inc_by(self.metrics.num_row_groups_inverted_index_filtered as u64); + .inc_by(metrics.num_row_groups_inverted_index_filtered as u64); READ_ROW_GROUPS_TOTAL .with_label_values(&["minmax_index_filtered"]) - .inc_by(self.metrics.num_row_groups_min_max_filtered as u64); + .inc_by(metrics.num_row_groups_min_max_filtered as u64); PRECISE_FILTER_ROWS_TOTAL .with_label_values(&["parquet"]) - .inc_by(self.metrics.num_rows_precise_filtered as u64); + .inc_by(metrics.num_rows_precise_filtered as u64); READ_ROWS_IN_ROW_GROUP_TOTAL .with_label_values(&["before_filtering"]) - .inc_by(self.metrics.num_rows_in_row_group_before_filtering as u64); + .inc_by(metrics.num_rows_in_row_group_before_filtering as u64); READ_ROWS_IN_ROW_GROUP_TOTAL .with_label_values(&["inverted_index_filtered"]) - .inc_by(self.metrics.num_rows_in_row_group_inverted_index_filtered as u64); + .inc_by(metrics.num_rows_in_row_group_inverted_index_filtered as u64); } } impl ParquetReader { - /// Returns the metadata of the SST. - pub fn metadata(&self) -> &RegionMetadataRef { - self.read_format.metadata() - } - - /// Tries to fetch next [RecordBatch] from the reader. - /// - /// If the reader is exhausted, reads next row group. - async fn fetch_next_record_batch(&mut self) -> Result> { - if let Some(row_group_reader) = &mut self.current_reader { - if let Some(record_batch) = - row_group_reader - .next() - .transpose() - .context(ArrowReaderSnafu { - path: self.reader_builder.file_path(), - })? - { - return Ok(Some(record_batch)); - } - } - + /// Creates a new reader. + async fn new( + context: PartitionContextRef, + mut row_groups: BTreeMap>, + ) -> Result { // No more items in current row group, reads next row group. - while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() { - let mut row_group_reader = self - .reader_builder + let reader_state = if let Some((row_group_idx, row_selection)) = row_groups.pop_first() { + let parquet_reader = context + .reader_builder() .build(row_group_idx, row_selection) .await?; - let Some(record_batch) = - row_group_reader - .next() - .transpose() - .context(ArrowReaderSnafu { - path: self.reader_builder.file_path(), - })? - else { - continue; - }; - - // Sets current reader to this reader. - self.current_reader = Some(row_group_reader); - return Ok(Some(record_batch)); - } - - Ok(None) - } - - /// Prunes batches by the pushed down predicate. - fn prune_batches(&mut self) -> Result<()> { - // fast path - if self.predicate.is_empty() { - return Ok(()); - } - - let mut new_batches = VecDeque::new(); - let batches = std::mem::take(&mut self.batches); - for batch in batches { - let num_rows_before_filter = batch.num_rows(); - let Some(batch_filtered) = self.precise_filter(batch)? else { - // the entire batch is filtered out - self.metrics.num_rows_precise_filtered += num_rows_before_filter; - continue; - }; - - // update metric - let filtered_rows = num_rows_before_filter - batch_filtered.num_rows(); - self.metrics.num_rows_precise_filtered += filtered_rows; - - if !batch_filtered.is_empty() { - new_batches.push_back(batch_filtered); - } - } - self.batches = new_batches; + ReaderState::Readable(RowGroupReader::new(context.clone(), parquet_reader)) + } else { + ReaderState::Exhausted(Metrics::default()) + }; - Ok(()) + Ok(ParquetReader { + context, + row_groups, + reader_state, + }) } - /// TRY THE BEST to perform pushed down predicate precisely on the input batch. - /// Return the filtered batch. If the entire batch is filtered out, return None. - /// - /// Supported filter expr type is defined in [SimpleFilterEvaluator]. - /// - /// When a filter is referencing primary key column, this method will decode - /// the primary key and put it into the batch. - fn precise_filter(&self, mut input: Batch) -> Result> { - let mut mask = BooleanBuffer::new_set(input.num_rows()); - - // Run filter one by one and combine them result - // TODO(ruihang): run primary key filter first. It may short circuit other filters - for filter in &self.predicate { - let column_name = filter.column_name(); - let Some(column_metadata) = self.read_format.metadata().column_by_name(column_name) - else { - // column not found, skip - // in situation like an column is added later - continue; - }; - let result = match column_metadata.semantic_type { - SemanticType::Tag => { - let pk_values = if let Some(pk_values) = input.pk_values() { - pk_values - } else { - input.set_pk_values(self.codec.decode(input.primary_key())?); - input.pk_values().unwrap() - }; - // Safety: this is a primary key - let pk_index = self - .read_format - .metadata() - .primary_key_index(column_metadata.column_id) - .unwrap(); - let pk_value = pk_values[pk_index] - .try_to_scalar_value(&column_metadata.column_schema.data_type) - .context(FieldTypeMismatchSnafu)?; - if filter - .evaluate_scalar(&pk_value) - .context(FilterRecordBatchSnafu)? - { - continue; - } else { - // PK not match means the entire batch is filtered out. - return Ok(None); - } - } - SemanticType::Field => { - let Some(field_index) = self - .read_format - .field_index_by_id(column_metadata.column_id) - else { - continue; - }; - let field_col = &input.fields()[field_index].data; - filter - .evaluate_vector(field_col) - .context(FilterRecordBatchSnafu)? - } - SemanticType::Timestamp => filter - .evaluate_vector(input.timestamps()) - .context(FilterRecordBatchSnafu)?, - }; - - mask = mask.bitand(&result); - } - - input.filter(&BooleanArray::from(mask).into())?; - - Ok(Some(input)) + /// Returns the metadata of the SST. + pub fn metadata(&self) -> &RegionMetadataRef { + self.context.read_format().metadata() } #[cfg(test)] pub fn parquet_metadata(&self) -> Arc { - self.reader_builder.parquet_meta.clone() + self.context.reader_builder().parquet_meta.clone() } } /// Reader to read a row group of a parquet file. -pub(crate) struct ParquetRowGroupReader { - /// Inner parquet reader. - reader: ParquetRecordBatchReader, +pub(crate) struct RowGroupReader { /// Context of partitions. context: PartitionContextRef, + /// Inner parquet reader. + reader: ParquetRecordBatchReader, /// Buffered batches to return. batches: VecDeque, - /// Scan metrics. + /// Local scan metrics. metrics: Metrics, } -impl ParquetRowGroupReader { +impl RowGroupReader { + /// Creates a new reader. + fn new(context: PartitionContextRef, reader: ParquetRecordBatchReader) -> Self { + Self { + context, + reader, + batches: VecDeque::new(), + metrics: Metrics::default(), + } + } + + /// Resets the parquet reader. + fn reset_reader(&mut self, reader: ParquetRecordBatchReader) { + self.reader = reader; + } + /// Tries to fetch next [Batch] from the reader. async fn next_batch(&mut self) -> Result> { - let start = Instant::now(); if let Some(batch) = self.batches.pop_front() { - self.metrics.scan_cost += start.elapsed(); self.metrics.num_rows += batch.num_rows(); return Ok(Some(batch)); } @@ -803,7 +691,6 @@ impl ParquetRowGroupReader { // We need to fetch next record batch and convert it to batches. while self.batches.is_empty() { let Some(record_batch) = self.fetch_next_record_batch()? else { - self.metrics.scan_cost += start.elapsed(); return Ok(None); }; self.metrics.num_record_batches += 1; @@ -815,7 +702,6 @@ impl ParquetRowGroupReader { self.metrics.num_batches += self.batches.len(); } let batch = self.batches.pop_front(); - self.metrics.scan_cost += start.elapsed(); self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); Ok(batch) } From 478db9d2eed328d60860edba9f25894f2021eb3c Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 29 Apr 2024 21:21:31 +0800 Subject: [PATCH 07/14] fix: use expected meta to get column id for filters --- src/mito2/src/sst/parquet/partition.rs | 29 ++++----- src/mito2/src/sst/parquet/reader.rs | 90 ++++++++++++++++++++++++-- 2 files changed, 94 insertions(+), 25 deletions(-) diff --git a/src/mito2/src/sst/parquet/partition.rs b/src/mito2/src/sst/parquet/partition.rs index 06d1476ffb24..353c4c0720bc 100644 --- a/src/mito2/src/sst/parquet/partition.rs +++ b/src/mito2/src/sst/parquet/partition.rs @@ -29,7 +29,7 @@ use crate::error::{FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result}; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec}; use crate::sst::parquet::format::ReadFormat; -use crate::sst::parquet::reader::RowGroupReaderBuilder; +use crate::sst::parquet::reader::{RowGroupReaderBuilder, SimpleFilterContext}; /// A partition of a parquet SST. Now it is a row group. pub(crate) struct Partition { @@ -69,7 +69,7 @@ pub(crate) struct PartitionContext { // Row group reader builder for the file. reader_builder: RowGroupReaderBuilder, /// Filters pushed down. - filters: Vec, + filters: Vec, /// Helper to read the SST. read_format: ReadFormat, /// Decoder for primary keys @@ -82,7 +82,7 @@ impl PartitionContext { /// Creates a new partition context. pub(crate) fn new( reader_builder: RowGroupReaderBuilder, - filters: Vec, + filters: Vec, read_format: ReadFormat, codec: McmpRowCodec, ) -> Self { @@ -100,7 +100,7 @@ impl PartitionContext { } /// Returns filters pushed down. - pub(crate) fn filters(&self) -> &[SimpleFilterEvaluator] { + pub(crate) fn filters(&self) -> &[SimpleFilterContext] { &self.filters } @@ -127,15 +127,7 @@ impl PartitionContext { // Run filter one by one and combine them result // TODO(ruihang): run primary key filter first. It may short circuit other filters for filter in &self.filters { - let column_name = filter.column_name(); - // FIXME(yingwen): We should use expected metadata to get the column id. - let Some(column_metadata) = self.read_format.metadata().column_by_name(column_name) - else { - // column not found, skip - // in situation like an column is added later - continue; - }; - let result = match column_metadata.semantic_type { + let result = match filter.semantic_type() { SemanticType::Tag => { let pk_values = if let Some(pk_values) = input.pk_values() { pk_values @@ -147,12 +139,13 @@ impl PartitionContext { let pk_index = self .read_format .metadata() - .primary_key_index(column_metadata.column_id) + .primary_key_index(filter.column_id()) .unwrap(); let pk_value = pk_values[pk_index] - .try_to_scalar_value(&column_metadata.column_schema.data_type) + .try_to_scalar_value(filter.data_type()) .context(FieldTypeMismatchSnafu)?; if filter + .filter() .evaluate_scalar(&pk_value) .context(FilterRecordBatchSnafu)? { @@ -163,18 +156,18 @@ impl PartitionContext { } } SemanticType::Field => { - let Some(field_index) = self - .read_format - .field_index_by_id(column_metadata.column_id) + let Some(field_index) = self.read_format.field_index_by_id(filter.column_id()) else { continue; }; let field_col = &input.fields()[field_index].data; filter + .filter() .evaluate_vector(field_col) .context(FilterRecordBatchSnafu)? } SemanticType::Timestamp => filter + .filter() .evaluate_vector(input.timestamps()) .context(FilterRecordBatchSnafu)?, }; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 06206c29f7b5..1f29986e4284 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -18,11 +18,14 @@ use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; +use api::v1::SemanticType; use async_trait::async_trait; use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; +use datafusion_expr::Expr; use datatypes::arrow::record_batch::RecordBatch; +use datatypes::data_type::ConcreteDataType; use itertools::Itertools; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; @@ -155,20 +158,21 @@ impl ParquetReaderBuilder { let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?; // Decodes region metadata. let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); - let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?; - // Always list all column ids to read. + // Gets the metadata stored in the SST. + let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?); + // Lists all column ids to read, we always use the expected metadata if possible. let column_ids = self.projection.clone().unwrap_or_else(|| { - let metadata = self + let expected_meta = self .expected_metadata - .as_deref() + .as_ref() .unwrap_or_else(|| ®ion_meta); - metadata + expected_meta .column_metadatas .iter() .map(|col| col.column_id) .collect() }); - let read_format = ReadFormat::new(Arc::new(region_meta), &column_ids); + let read_format = ReadFormat::new(region_meta.clone(), &column_ids); // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); @@ -204,7 +208,13 @@ impl ParquetReaderBuilder { predicate .exprs() .iter() - .filter_map(|expr| SimpleFilterEvaluator::try_new(expr.df_expr())) + .filter_map(|expr| { + SimpleFilterContext::new_opt( + ®ion_meta, + self.expected_metadata.as_deref(), + expr.df_expr(), + ) + }) .collect::>() } else { vec![] @@ -526,6 +536,72 @@ impl ReaderState { } } +/// Context to evaluate the column filter. +pub(crate) struct SimpleFilterContext { + /// Filter to evaluate. + filter: SimpleFilterEvaluator, + /// Id of the column to evaluate. + column_id: ColumnId, + /// Semantic type of the column. + semantic_type: SemanticType, + /// The data type of the column. + data_type: ConcreteDataType, +} + +impl SimpleFilterContext { + /// Creates a context for the `expr`. + /// + /// Returns None if the column to filter doesn't exist in the SST metadata or the + /// expected metadata. + fn new_opt( + sst_meta: &RegionMetadataRef, + expected_meta: Option<&RegionMetadata>, + expr: &Expr, + ) -> Option { + let filter = SimpleFilterEvaluator::try_new(expr)?; + let column_metadata = match expected_meta { + Some(meta) => { + // Gets the column metadata from the expected metadata. + let column = meta.column_by_name(filter.column_name())?; + // Checks if the column is present in the SST metadata. We still uses the + // column from the expected metadata. + let sst_column = sst_meta.column_by_id(column.column_id)?; + debug_assert_eq!(column.semantic_type, sst_column.semantic_type); + + column + } + None => sst_meta.column_by_name(filter.column_name())?, + }; + + Some(Self { + filter, + column_id: column_metadata.column_id, + semantic_type: column_metadata.semantic_type, + data_type: column_metadata.column_schema.data_type.clone(), + }) + } + + /// Returns the filter to evaluate. + pub(crate) fn filter(&self) -> &SimpleFilterEvaluator { + &self.filter + } + + /// Returns the column id. + pub(crate) fn column_id(&self) -> ColumnId { + self.column_id + } + + /// Returns the semantic type of the column. + pub(crate) fn semantic_type(&self) -> SemanticType { + self.semantic_type + } + + /// Returns the data type of the column. + pub(crate) fn data_type(&self) -> &ConcreteDataType { + &self.data_type + } +} + /// Parquet batch reader to read our SST format. pub struct ParquetReader { /// Partition context. From 13ec5542a982bb66ac7dc61872b1702fcca6008e Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 29 Apr 2024 21:24:58 +0800 Subject: [PATCH 08/14] feat: partition returns row group reader --- src/mito2/src/sst/parquet/partition.rs | 17 +++++++++++------ src/mito2/src/sst/parquet/reader.rs | 2 +- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/sst/parquet/partition.rs b/src/mito2/src/sst/parquet/partition.rs index 353c4c0720bc..942c13fc3b62 100644 --- a/src/mito2/src/sst/parquet/partition.rs +++ b/src/mito2/src/sst/parquet/partition.rs @@ -19,19 +19,19 @@ use std::ops::BitAnd; use std::sync::Arc; use api::v1::SemanticType; -use common_recordbatch::filter::SimpleFilterEvaluator; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; -use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; +use parquet::arrow::arrow_reader::RowSelection; use snafu::ResultExt; use crate::error::{FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result}; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec}; use crate::sst::parquet::format::ReadFormat; -use crate::sst::parquet::reader::{RowGroupReaderBuilder, SimpleFilterContext}; +use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext}; /// A partition of a parquet SST. Now it is a row group. +#[allow(dead_code)] pub(crate) struct Partition { /// Shared context. context: PartitionContextRef, @@ -43,6 +43,7 @@ pub(crate) struct Partition { impl Partition { /// Creates a new partition. + #[allow(dead_code)] pub(crate) fn new( context: PartitionContextRef, row_group_idx: usize, @@ -56,11 +57,15 @@ impl Partition { } /// Returns a reader to read the partition. - pub(crate) async fn reader(&self) -> Result { - self.context + #[allow(dead_code)] + pub(crate) async fn reader(&self) -> Result { + let parquet_reader = self + .context .reader_builder .build(self.row_group_idx, self.row_selection.clone()) - .await + .await?; + + Ok(RowGroupReader::new(self.context.clone(), parquet_reader)) } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 1f29986e4284..a7a104344f30 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -743,7 +743,7 @@ pub(crate) struct RowGroupReader { impl RowGroupReader { /// Creates a new reader. - fn new(context: PartitionContextRef, reader: ParquetRecordBatchReader) -> Self { + pub(crate) fn new(context: PartitionContextRef, reader: ParquetRecordBatchReader) -> Self { Self { context, reader, From dec6b71daf6dc93dbeb0d5702f5089214be8615b Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 29 Apr 2024 21:28:18 +0800 Subject: [PATCH 09/14] style: fix clippy --- src/mito2/src/sst/parquet/format.rs | 2 +- src/mito2/src/sst/parquet/reader.rs | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 11a354c0fc7b..f9d5473a57d1 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -140,7 +140,7 @@ impl ReadFormat { // Maps column id of a projected field to its index in SST. let mut projected_field_id_index: Vec<_> = column_ids - .into_iter() + .iter() .filter_map(|column_id| { // Only apply projection to fields. field_id_to_index diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index a7a104344f30..14354e0065b9 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -162,10 +162,7 @@ impl ParquetReaderBuilder { let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?); // Lists all column ids to read, we always use the expected metadata if possible. let column_ids = self.projection.clone().unwrap_or_else(|| { - let expected_meta = self - .expected_metadata - .as_ref() - .unwrap_or_else(|| ®ion_meta); + let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta); expected_meta .column_metadatas .iter() @@ -531,7 +528,7 @@ impl ReaderState { fn metrics(&self) -> &Metrics { match self { ReaderState::Readable(reader) => &reader.metrics, - ReaderState::Exhausted(m) => &m, + ReaderState::Exhausted(m) => m, } } } From 66f3ce1a1e9cbeabc195dc3c84c3c51486b940ba Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 30 Apr 2024 11:43:19 +0800 Subject: [PATCH 10/14] feat: add build partitions method --- src/mito2/src/sst/parquet/partition.rs | 4 +--- src/mito2/src/sst/parquet/reader.rs | 31 +++++++++++++++++++++----- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/sst/parquet/partition.rs b/src/mito2/src/sst/parquet/partition.rs index 942c13fc3b62..354a34aa6bd7 100644 --- a/src/mito2/src/sst/parquet/partition.rs +++ b/src/mito2/src/sst/parquet/partition.rs @@ -31,8 +31,7 @@ use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext}; /// A partition of a parquet SST. Now it is a row group. -#[allow(dead_code)] -pub(crate) struct Partition { +pub struct Partition { /// Shared context. context: PartitionContextRef, /// Index of the row group in the SST. @@ -43,7 +42,6 @@ pub(crate) struct Partition { impl Partition { /// Creates a new partition. - #[allow(dead_code)] pub(crate) fn new( context: PartitionContextRef, row_group_idx: usize, diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 14354e0065b9..a31c6bdb9258 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -51,7 +51,7 @@ use crate::sst::file::FileHandle; use crate::sst::index::applier::SstIndexApplierRef; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::metadata::MetadataLoader; -use crate::sst::parquet::partition::{PartitionContext, PartitionContextRef}; +use crate::sst::parquet::partition::{Partition, PartitionContext, PartitionContextRef}; use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::row_selection::row_selection_from_row_ranges; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -146,10 +146,29 @@ impl ParquetReaderBuilder { self } - /// Builds and initializes a [ParquetReader]. + /// Builds a [ParquetReader]. /// /// This needs to perform IO operation. pub async fn build(&self) -> Result { + let (context, row_groups) = self.build_reader_input().await?; + ParquetReader::new(context, row_groups).await + } + + /// Builds [Partition]s to read and pushes them to `partitions`. + #[allow(dead_code)] + pub async fn build_partitions(&self, partitions: &mut Vec) -> Result<()> { + let (context, row_groups) = self.build_reader_input().await?; + for (row_group_idx, row_selection) in row_groups { + let partition = Partition::new(context.clone(), row_group_idx, row_selection); + partitions.push(partition); + } + Ok(()) + } + + /// Builds a [PartitionContext] and collects row groups to read. + /// + /// This needs to perform IO operation. + async fn build_reader_input(&self) -> Result<(PartitionContextRef, RowGroupMap)> { let start = Instant::now(); let file_path = self.file_handle.file_path(&self.file_dir); @@ -225,7 +244,7 @@ impl ParquetReaderBuilder { ); let context = PartitionContext::new(reader_builder, filters, read_format, codec); - ParquetReader::new(Arc::new(context), row_groups).await + Ok((Arc::new(context), row_groups)) } /// Decodes region metadata from key value. @@ -599,12 +618,14 @@ impl SimpleFilterContext { } } +type RowGroupMap = BTreeMap>; + /// Parquet batch reader to read our SST format. pub struct ParquetReader { /// Partition context. context: PartitionContextRef, /// Indices of row groups to read, along with their respective row selections. - row_groups: BTreeMap>, + row_groups: RowGroupMap, /// Reader of current row group. reader_state: ReaderState, } @@ -638,7 +659,7 @@ impl BatchReader for ParquetReader { } } - // The reader is exhaused. + // The reader is exhausted. reader.metrics.scan_cost += start.elapsed(); self.reader_state = ReaderState::Exhausted(std::mem::take(&mut reader.metrics)); Ok(None) From 97bc9b584ac4c13d6b8f372b3def95f020738a6d Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 30 Apr 2024 14:49:40 +0800 Subject: [PATCH 11/14] docs: comment --- src/mito2/src/sst/parquet/partition.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mito2/src/sst/parquet/partition.rs b/src/mito2/src/sst/parquet/partition.rs index 354a34aa6bd7..fa0e938ee677 100644 --- a/src/mito2/src/sst/parquet/partition.rs +++ b/src/mito2/src/sst/parquet/partition.rs @@ -31,6 +31,7 @@ use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext}; /// A partition of a parquet SST. Now it is a row group. +/// We can read different partitions in parallel. pub struct Partition { /// Shared context. context: PartitionContextRef, From 907fae70cc7fb4b2a740c33b9223816317edba09 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 30 Apr 2024 16:48:45 +0800 Subject: [PATCH 12/14] refactor: rename Partition to FileRange --- src/mito2/src/sst/parquet.rs | 2 +- .../parquet/{partition.rs => file_range.rs} | 28 +++++++++---------- src/mito2/src/sst/parquet/reader.rs | 28 +++++++++---------- 3 files changed, 29 insertions(+), 29 deletions(-) rename src/mito2/src/sst/parquet/{partition.rs => file_range.rs} (90%) diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 101f1942fe2a..de723cae1e3d 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -14,11 +14,11 @@ //! SST in parquet format. +pub(crate) mod file_range; mod format; pub(crate) mod helper; pub(crate) mod metadata; mod page_reader; -pub(crate) mod partition; pub mod reader; pub mod row_group; mod row_selection; diff --git a/src/mito2/src/sst/parquet/partition.rs b/src/mito2/src/sst/parquet/file_range.rs similarity index 90% rename from src/mito2/src/sst/parquet/partition.rs rename to src/mito2/src/sst/parquet/file_range.rs index fa0e938ee677..f385ea992e0e 100644 --- a/src/mito2/src/sst/parquet/partition.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Structs and functions for reading partitions from a parquet file. A partition +//! Structs and functions for reading ranges from a parquet file. A file range //! is usually a row group in a parquet file. use std::ops::BitAnd; @@ -30,21 +30,21 @@ use crate::row_converter::{McmpRowCodec, RowCodec}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext}; -/// A partition of a parquet SST. Now it is a row group. -/// We can read different partitions in parallel. -pub struct Partition { +/// A range of a parquet SST. Now it is a row group. +/// We can read different file ranges in parallel. +pub struct FileRange { /// Shared context. - context: PartitionContextRef, + context: FileRangeContextRef, /// Index of the row group in the SST. row_group_idx: usize, /// Row selection for the row group. `None` means all rows. row_selection: Option, } -impl Partition { - /// Creates a new partition. +impl FileRange { + /// Creates a new [FileRange]. pub(crate) fn new( - context: PartitionContextRef, + context: FileRangeContextRef, row_group_idx: usize, row_selection: Option, ) -> Self { @@ -55,7 +55,7 @@ impl Partition { } } - /// Returns a reader to read the partition. + /// Returns a reader to read the [FileRange]. #[allow(dead_code)] pub(crate) async fn reader(&self) -> Result { let parquet_reader = self @@ -68,8 +68,8 @@ impl Partition { } } -/// Context shared by partitions of the same parquet SST. -pub(crate) struct PartitionContext { +/// Context shared by ranges of the same parquet SST. +pub(crate) struct FileRangeContext { // Row group reader builder for the file. reader_builder: RowGroupReaderBuilder, /// Filters pushed down. @@ -80,10 +80,10 @@ pub(crate) struct PartitionContext { codec: McmpRowCodec, } -pub(crate) type PartitionContextRef = Arc; +pub(crate) type FileRangeContextRef = Arc; -impl PartitionContext { - /// Creates a new partition context. +impl FileRangeContext { + /// Creates a new [FileRangeContext]. pub(crate) fn new( reader_builder: RowGroupReaderBuilder, filters: Vec, diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index a31c6bdb9258..b6e054a7d8a3 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -49,9 +49,9 @@ use crate::read::{Batch, BatchReader}; use crate::row_converter::{McmpRowCodec, SortField}; use crate::sst::file::FileHandle; use crate::sst::index::applier::SstIndexApplierRef; +use crate::sst::parquet::file_range::{FileRange, FileRangeContext, FileRangeContextRef}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::metadata::MetadataLoader; -use crate::sst::parquet::partition::{Partition, PartitionContext, PartitionContextRef}; use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::row_selection::row_selection_from_row_ranges; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -154,21 +154,21 @@ impl ParquetReaderBuilder { ParquetReader::new(context, row_groups).await } - /// Builds [Partition]s to read and pushes them to `partitions`. + /// Builds [FileRange]s to read and pushes them to `file_ranges`. #[allow(dead_code)] - pub async fn build_partitions(&self, partitions: &mut Vec) -> Result<()> { + pub async fn build_file_ranges(&self, file_ranges: &mut Vec) -> Result<()> { let (context, row_groups) = self.build_reader_input().await?; for (row_group_idx, row_selection) in row_groups { - let partition = Partition::new(context.clone(), row_group_idx, row_selection); - partitions.push(partition); + let file_range = FileRange::new(context.clone(), row_group_idx, row_selection); + file_ranges.push(file_range); } Ok(()) } - /// Builds a [PartitionContext] and collects row groups to read. + /// Builds a [FileRangeContext] and collects row groups to read. /// /// This needs to perform IO operation. - async fn build_reader_input(&self) -> Result<(PartitionContextRef, RowGroupMap)> { + async fn build_reader_input(&self) -> Result<(FileRangeContextRef, RowGroupMap)> { let start = Instant::now(); let file_path = self.file_handle.file_path(&self.file_dir); @@ -243,7 +243,7 @@ impl ParquetReaderBuilder { .collect(), ); - let context = PartitionContext::new(reader_builder, filters, read_format, codec); + let context = FileRangeContext::new(reader_builder, filters, read_format, codec); Ok((Arc::new(context), row_groups)) } @@ -622,8 +622,8 @@ type RowGroupMap = BTreeMap>; /// Parquet batch reader to read our SST format. pub struct ParquetReader { - /// Partition context. - context: PartitionContextRef, + /// File range context. + context: FileRangeContextRef, /// Indices of row groups to read, along with their respective row selections. row_groups: RowGroupMap, /// Reader of current row group. @@ -715,7 +715,7 @@ impl Drop for ParquetReader { impl ParquetReader { /// Creates a new reader. async fn new( - context: PartitionContextRef, + context: FileRangeContextRef, mut row_groups: BTreeMap>, ) -> Result { // No more items in current row group, reads next row group. @@ -749,8 +749,8 @@ impl ParquetReader { /// Reader to read a row group of a parquet file. pub(crate) struct RowGroupReader { - /// Context of partitions. - context: PartitionContextRef, + /// Context for file ranges. + context: FileRangeContextRef, /// Inner parquet reader. reader: ParquetRecordBatchReader, /// Buffered batches to return. @@ -761,7 +761,7 @@ pub(crate) struct RowGroupReader { impl RowGroupReader { /// Creates a new reader. - pub(crate) fn new(context: PartitionContextRef, reader: ParquetRecordBatchReader) -> Self { + pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self { Self { context, reader, From bd66d9dc8c658713d57924419d20978458facde7 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 7 May 2024 19:44:26 +0800 Subject: [PATCH 13/14] chore: address CR comments --- src/mito2/src/sst/parquet/reader.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index b6e054a7d8a3..e97df797bd77 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -158,6 +158,7 @@ impl ParquetReaderBuilder { #[allow(dead_code)] pub async fn build_file_ranges(&self, file_ranges: &mut Vec) -> Result<()> { let (context, row_groups) = self.build_reader_input().await?; + file_ranges.reserve_exact(row_groups.len()); for (row_group_idx, row_selection) in row_groups { let file_range = FileRange::new(context.clone(), row_group_idx, row_selection); file_ranges.push(file_range); @@ -179,21 +180,24 @@ impl ParquetReaderBuilder { let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); // Gets the metadata stored in the SST. let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?); - // Lists all column ids to read, we always use the expected metadata if possible. - let column_ids = self.projection.clone().unwrap_or_else(|| { + let read_format = if let Some(column_ids) = &self.projection { + ReadFormat::new(region_meta.clone(), column_ids) + } else { + // Lists all column ids to read, we always use the expected metadata if possible. let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta); - expected_meta + let column_ids: Vec<_> = expected_meta .column_metadatas .iter() .map(|col| col.column_id) - .collect() - }); - let read_format = ReadFormat::new(region_meta.clone(), &column_ids); + .collect(); + ReadFormat::new(region_meta.clone(), &column_ids) + }; // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); let indices = read_format.projection_indices(); // Now we assumes we don't have nested schemas. + // TODO(yingwen): Revisit this if we introduce nested types such as JSON type. let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied()); // Computes the field levels. From 53396e10979b2657bdcadb017976629abd4638f3 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 7 May 2024 20:04:35 +0800 Subject: [PATCH 14/14] feat: avoid allocating column ids while constructing ReadFormat --- src/mito2/src/sst/parquet/format.rs | 22 ++++++++++++---------- src/mito2/src/sst/parquet/reader.rs | 15 ++++++++------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index f9d5473a57d1..efc61d89de26 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -130,7 +130,10 @@ pub(crate) struct ReadFormat { impl ReadFormat { /// Creates a helper with existing `metadata` and `column_ids` to read. - pub(crate) fn new(metadata: RegionMetadataRef, column_ids: &[ColumnId]) -> ReadFormat { + pub(crate) fn new( + metadata: RegionMetadataRef, + column_ids: impl Iterator, + ) -> ReadFormat { let field_id_to_index: HashMap<_, _> = metadata .field_columns() .enumerate() @@ -140,13 +143,12 @@ impl ReadFormat { // Maps column id of a projected field to its index in SST. let mut projected_field_id_index: Vec<_> = column_ids - .iter() .filter_map(|column_id| { // Only apply projection to fields. field_id_to_index - .get(column_id) + .get(&column_id) .copied() - .map(|index| (*column_id, index)) + .map(|index| (column_id, index)) }) .collect(); let mut projection_indices: Vec<_> = projected_field_id_index @@ -745,16 +747,16 @@ mod tests { fn test_projection_indices() { let metadata = build_test_region_metadata(); // Only read tag1 - let read_format = ReadFormat::new(metadata.clone(), &[3]); + let read_format = ReadFormat::new(metadata.clone(), [3].iter().copied()); assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); // Only read field1 - let read_format = ReadFormat::new(metadata.clone(), &[4]); + let read_format = ReadFormat::new(metadata.clone(), [4].iter().copied()); assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices()); // Only read ts - let read_format = ReadFormat::new(metadata.clone(), &[5]); + let read_format = ReadFormat::new(metadata.clone(), [5].iter().copied()); assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); // Read field0, tag0, ts - let read_format = ReadFormat::new(metadata, &[2, 1, 5]); + let read_format = ReadFormat::new(metadata, [2, 1, 5].iter().copied()); assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices()); } @@ -801,7 +803,7 @@ mod tests { .iter() .map(|col| col.column_id) .collect(); - let read_format = ReadFormat::new(metadata, &column_ids); + let read_format = ReadFormat::new(metadata, column_ids.iter().copied()); assert_eq!(arrow_schema, *read_format.arrow_schema()); let record_batch = RecordBatch::new_empty(arrow_schema); @@ -820,7 +822,7 @@ mod tests { .iter() .map(|col| col.column_id) .collect(); - let read_format = ReadFormat::new(metadata, &column_ids); + let read_format = ReadFormat::new(metadata, column_ids.iter().copied()); let columns: Vec = vec![ Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1 diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index e97df797bd77..dff995b76ba6 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -181,16 +181,17 @@ impl ParquetReaderBuilder { // Gets the metadata stored in the SST. let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?); let read_format = if let Some(column_ids) = &self.projection { - ReadFormat::new(region_meta.clone(), column_ids) + ReadFormat::new(region_meta.clone(), column_ids.iter().copied()) } else { // Lists all column ids to read, we always use the expected metadata if possible. let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta); - let column_ids: Vec<_> = expected_meta - .column_metadatas - .iter() - .map(|col| col.column_id) - .collect(); - ReadFormat::new(region_meta.clone(), &column_ids) + ReadFormat::new( + region_meta.clone(), + expected_meta + .column_metadatas + .iter() + .map(|col| col.column_id), + ) }; // Computes the projection mask.