Skip to content

Commit

Permalink
feat: add build partitions method
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Apr 30, 2024
1 parent e7e5dc6 commit 8f9c9d2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
4 changes: 1 addition & 3 deletions src/mito2/src/sst/parquet/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext};

/// A partition of a parquet SST. Now it is a row group.
#[allow(dead_code)]
pub(crate) struct Partition {
pub struct Partition {
/// Shared context.
context: PartitionContextRef,
/// Index of the row group in the SST.
Expand All @@ -43,7 +42,6 @@ pub(crate) struct Partition {

impl Partition {
/// Creates a new partition.
#[allow(dead_code)]
pub(crate) fn new(
context: PartitionContextRef,
row_group_idx: usize,
Expand Down
31 changes: 26 additions & 5 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::sst::file::FileHandle;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::partition::{PartitionContext, PartitionContextRef};
use crate::sst::parquet::partition::{Partition, PartitionContext, PartitionContextRef};
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::row_selection::row_selection_from_row_ranges;
use crate::sst::parquet::stats::RowGroupPruningStats;
Expand Down Expand Up @@ -146,10 +146,29 @@ impl ParquetReaderBuilder {
self
}

/// Builds and initializes a [ParquetReader].
/// Builds a [ParquetReader].
///
/// This needs to perform IO operation.
pub async fn build(&self) -> Result<ParquetReader> {
let (context, row_groups) = self.build_reader_input().await?;
ParquetReader::new(context, row_groups).await
}

/// Builds [Partition]s to read and pushes them to `partitions`.
#[allow(dead_code)]
pub async fn build_partitions(&self, partitions: &mut Vec<Partition>) -> Result<()> {
let (context, row_groups) = self.build_reader_input().await?;
for (row_group_idx, row_selection) in row_groups {
let partition = Partition::new(context.clone(), row_group_idx, row_selection);
partitions.push(partition);
}
Ok(())
}

/// Builds a [PartitionContext] and collects row groups to read.
///
/// This needs to perform IO operation.
async fn build_reader_input(&self) -> Result<(PartitionContextRef, RowGroupMap)> {
let start = Instant::now();

let file_path = self.file_handle.file_path(&self.file_dir);
Expand Down Expand Up @@ -225,7 +244,7 @@ impl ParquetReaderBuilder {
);

let context = PartitionContext::new(reader_builder, filters, read_format, codec);
ParquetReader::new(Arc::new(context), row_groups).await
Ok((Arc::new(context), row_groups))
}

/// Decodes region metadata from key value.
Expand Down Expand Up @@ -599,12 +618,14 @@ impl SimpleFilterContext {
}
}

type RowGroupMap = BTreeMap<usize, Option<RowSelection>>;

/// Parquet batch reader to read our SST format.
pub struct ParquetReader {
/// Partition context.
context: PartitionContextRef,
/// Indices of row groups to read, along with their respective row selections.
row_groups: BTreeMap<usize, Option<RowSelection>>,
row_groups: RowGroupMap,
/// Reader of current row group.
reader_state: ReaderState,
}
Expand Down Expand Up @@ -638,7 +659,7 @@ impl BatchReader for ParquetReader {
}
}

// The reader is exhaused.
// The reader is exhausted.
reader.metrics.scan_cost += start.elapsed();
self.reader_state = ReaderState::Exhausted(std::mem::take(&mut reader.metrics));
Ok(None)
Expand Down

0 comments on commit 8f9c9d2

Please sign in to comment.