Skip to content

Commit

Permalink
Return None when Parquet page indexes are not present in file (apac…
Browse files Browse the repository at this point in the history
…he#6639)

* return none for missing page indexes

* return option from page index read functions

* update docs
  • Loading branch information
etseidl authored Nov 24, 2024
1 parent f033e4f commit 73a0c26
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 55 deletions.
4 changes: 1 addition & 3 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3584,9 +3584,7 @@ mod tests {
.unwrap();
// Although `Vec<Vec<PageLoacation>>` of each row group is empty,
// we should read the file successfully.
// FIXME: this test will fail when metadata parsing returns `None` for missing page
// indexes. https://github.com/apache/arrow-rs/issues/6447
assert!(builder.metadata().offset_index().unwrap()[0].is_empty());
assert!(builder.metadata().offset_index().is_none());
let reader = builder.build().unwrap();
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(batches.len(), 1);
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1743,7 +1743,7 @@ mod tests {
"Expected a dictionary page"
);

let offset_indexes = read_offset_indexes(&file, column).unwrap();
let offset_indexes = read_offset_indexes(&file, column).unwrap().unwrap();

let page_locations = offset_indexes[0].page_locations.clone();

Expand Down
6 changes: 2 additions & 4 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,6 @@ mod tests {
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::ArrowWriter;
use crate::file::metadata::ParquetMetaDataReader;
use crate::file::page_index::index_reader;
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -1566,12 +1565,11 @@ mod tests {
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&data)
.unwrap();

let offset_index =
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
.expect("reading offset index");
let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();

let mut metadata_builder = metadata.into_builder();
let mut row_groups = metadata_builder.take_row_groups();
Expand Down
15 changes: 0 additions & 15 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ impl ParquetMetaDataReader {

// Get bounds needed for page indexes (if any are present in the file).
let Some(range) = self.range_for_page_index() else {
self.empty_page_indexes();
return Ok(());
};

Expand Down Expand Up @@ -477,20 +476,6 @@ impl ParquetMetaDataReader {
Ok(())
}

/// Set the column_index and offset_indexes to empty `Vec` for backwards compatibility
///
/// See <https://github.com/apache/arrow-rs/pull/6451> for details
fn empty_page_indexes(&mut self) {
let metadata = self.metadata.as_mut().unwrap();
let num_row_groups = metadata.num_row_groups();
if self.column_index {
metadata.set_column_index(Some(vec![vec![]; num_row_groups]));
}
if self.offset_index {
metadata.set_offset_index(Some(vec![vec![]; num_row_groups]));
}
}

fn range_for_page_index(&self) -> Option<Range<usize>> {
// sanity check
self.metadata.as_ref()?;
Expand Down
48 changes: 26 additions & 22 deletions parquet/src/file/page_index/index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,35 +43,37 @@ pub(crate) fn acc_range(a: Option<Range<usize>>, b: Option<Range<usize>>) -> Opt
///
/// Returns a vector of `index[column_number]`.
///
/// Returns an empty vector if this row group does not contain a
/// [`ColumnIndex`].
/// Returns `None` if this row group does not contain a [`ColumnIndex`].
///
/// See [Page Index Documentation] for more details.
///
/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn read_columns_indexes<R: ChunkReader>(
reader: &R,
chunks: &[ColumnChunkMetaData],
) -> Result<Vec<Index>, ParquetError> {
) -> Result<Option<Vec<Index>>, ParquetError> {
let fetch = chunks
.iter()
.fold(None, |range, c| acc_range(range, c.column_index_range()));

let fetch = match fetch {
Some(r) => r,
None => return Ok(vec![Index::NONE; chunks.len()]),
None => return Ok(None),
};

let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];

chunks
.iter()
.map(|c| match c.column_index_range() {
Some(r) => decode_column_index(get(r), c.column_type()),
None => Ok(Index::NONE),
})
.collect()
Some(
chunks
.iter()
.map(|c| match c.column_index_range() {
Some(r) => decode_column_index(get(r), c.column_type()),
None => Ok(Index::NONE),
})
.collect(),
)
.transpose()
}

/// Reads [`OffsetIndex`], per-page [`PageLocation`] for all columns of a row
Expand Down Expand Up @@ -116,35 +118,37 @@ pub fn read_pages_locations<R: ChunkReader>(
///
/// Returns a vector of `offset_index[column_number]`.
///
/// Returns an empty vector if this row group does not contain an
/// [`OffsetIndex`].
/// Returns `None` if this row group does not contain an [`OffsetIndex`].
///
/// See [Page Index Documentation] for more details.
///
/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn read_offset_indexes<R: ChunkReader>(
reader: &R,
chunks: &[ColumnChunkMetaData],
) -> Result<Vec<OffsetIndexMetaData>, ParquetError> {
) -> Result<Option<Vec<OffsetIndexMetaData>>, ParquetError> {
let fetch = chunks
.iter()
.fold(None, |range, c| acc_range(range, c.offset_index_range()));

let fetch = match fetch {
Some(r) => r,
None => return Ok(vec![]),
None => return Ok(None),
};

let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];

chunks
.iter()
.map(|c| match c.offset_index_range() {
Some(r) => decode_offset_index(get(r)),
None => Err(general_err!("missing offset index")),
})
.collect()
Some(
chunks
.iter()
.map(|c| match c.offset_index_range() {
Some(r) => decode_offset_index(get(r)),
None => Err(general_err!("missing offset index")),
})
.collect(),
)
.transpose()
}

pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> {
Expand Down
22 changes: 12 additions & 10 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,8 +1223,8 @@ mod tests {
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);
assert!(metadata.column_index().is_none());
assert!(metadata.offset_index().is_none());

// false, true predicate
let test_file = get_test_file("alltypes_tiny_pages.parquet");
Expand All @@ -1236,8 +1236,8 @@ mod tests {
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);
assert!(metadata.column_index().is_none());
assert!(metadata.offset_index().is_none());

// false, false predicate
let test_file = get_test_file("alltypes_tiny_pages.parquet");
Expand All @@ -1249,8 +1249,8 @@ mod tests {
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);
assert!(metadata.column_index().is_none());
assert!(metadata.offset_index().is_none());
Ok(())
}

Expand Down Expand Up @@ -1340,13 +1340,15 @@ mod tests {
let columns = metadata.row_group(0).columns();
let reversed: Vec<_> = columns.iter().cloned().rev().collect();

let a = read_columns_indexes(&test_file, columns).unwrap();
let mut b = read_columns_indexes(&test_file, &reversed).unwrap();
let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
let mut b = read_columns_indexes(&test_file, &reversed)
.unwrap()
.unwrap();
b.reverse();
assert_eq!(a, b);

let a = read_offset_indexes(&test_file, columns).unwrap();
let mut b = read_offset_indexes(&test_file, &reversed).unwrap();
let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
b.reverse();
assert_eq!(a, b);
}
Expand Down

0 comments on commit 73a0c26

Please sign in to comment.