Skip to content

Commit

Permalink
Mutate the right row group metadata
Browse files Browse the repository at this point in the history
When using BloomFilterPosition::AfterRowGroup this was only writing the Bloom Filter
offset to a temporary clone of the metadata, causing the Bloom Filter to never
be seen by readers
  • Loading branch information
progval committed Jun 10, 2024
1 parent 6effa7f commit f23759a
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 64 deletions.
19 changes: 8 additions & 11 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use crate::column::writer::{
};
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr};
use crate::file::properties::{BloomFilterPosition, WriterProperties, WriterPropertiesPtr};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
Expand Down Expand Up @@ -185,7 +185,7 @@ impl<W: Write + Send> ArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.writer.flushed_row_groups()
}

Expand Down Expand Up @@ -263,12 +263,7 @@ impl<W: Write + Send> ArrowWriter<W> {
for chunk in in_progress.close()? {
chunk.append_to_row_group(&mut row_group_writer)?;
}
let row_group_metadata = row_group_writer.close()?;
match self.writer.properties().bloom_filter_position() {
BloomFilterPosition::AfterRowGroup =>
self.writer.write_bloom_filters(&mut [row_group_metadata.to_thrift()])?,
BloomFilterPosition::End => (),
}
row_group_writer.close()?;
Ok(())
}

Expand Down Expand Up @@ -1044,7 +1039,9 @@ mod tests {
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion};
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
reader::{FileReader, SerializedFileReader},
Expand Down Expand Up @@ -1761,7 +1758,7 @@ mod tests {
.set_dictionary_page_size_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(bloom_filter)
.set_bloom_filter_position(BloomFilterPosition::End)
.set_bloom_filter_position(BloomFilterPosition::AfterRowGroup)
.build();

files.push(roundtrip_opts(&expected_batch, props))
Expand Down
5 changes: 5 additions & 0 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ impl RowGroupMetaData {
&self.columns
}

/// Returns mutable slice of column chunk metadata.
pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
&mut self.columns
}

/// Number of rows in this row group.
pub fn num_rows(&self) -> i64 {
self.num_rows
Expand Down
124 changes: 71 additions & 53 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use crate::column::{
};
use crate::data_type::DataType;
use crate::errors::{ParquetError, Result};
use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
use crate::file::reader::ChunkReader;
use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC};
use crate::file::{metadata::*, PARQUET_MAGIC};
use crate::schema::types::{self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};

/// A wrapper around a [`Write`] that keeps track of the number
Expand Down Expand Up @@ -115,9 +116,10 @@ pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()
/// - the row group metadata
/// - the column index for each column chunk
/// - the offset index for each column chunk
pub type OnCloseRowGroup<'a> = Box<
pub type OnCloseRowGroup<'a, W> = Box<
dyn FnOnce(
RowGroupMetaDataPtr,
&'a mut TrackedWrite<W>,
RowGroupMetaData,
Vec<Option<Sbbf>>,
Vec<Option<ColumnIndex>>,
Vec<Option<OffsetIndex>>,
Expand All @@ -143,7 +145,7 @@ pub struct SerializedFileWriter<W: Write> {
schema: TypePtr,
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
row_groups: Vec<RowGroupMetaDataPtr>,
row_groups: Vec<RowGroupMetaData>,
bloom_filters: Vec<Vec<Option<Sbbf>>>,
column_indexes: Vec<Vec<Option<ColumnIndex>>>,
offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
Expand Down Expand Up @@ -197,18 +199,28 @@ impl<W: Write + Send> SerializedFileWriter<W> {

self.row_group_index += 1;

let bloom_filter_position = self.properties().bloom_filter_position();
let row_groups = &mut self.row_groups;
let row_bloom_filters = &mut self.bloom_filters;
let row_column_indexes = &mut self.column_indexes;
let row_offset_indexes = &mut self.offset_indexes;
let on_close =
|metadata, row_group_bloom_filter, row_group_column_index, row_group_offset_index| {
row_groups.push(metadata);
row_bloom_filters.push(row_group_bloom_filter);
row_column_indexes.push(row_group_column_index);
row_offset_indexes.push(row_group_offset_index);
Ok(())
let on_close = move |buf,
mut metadata,
row_group_bloom_filter,
row_group_column_index,
row_group_offset_index| {
row_bloom_filters.push(row_group_bloom_filter);
row_column_indexes.push(row_group_column_index);
row_offset_indexes.push(row_group_offset_index);
match bloom_filter_position {
BloomFilterPosition::AfterRowGroup => {
write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
}
BloomFilterPosition::End => (),
};
row_groups.push(metadata);
Ok(())
};

let row_group_writer = SerializedRowGroupWriter::new(
self.descr.clone(),
Expand All @@ -221,7 +233,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
&self.row_groups
}

Expand Down Expand Up @@ -273,40 +285,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
Ok(())
}

/// Serialize all the bloom filter to the file
pub fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
// iter row group
// iter each column
// write bloom filter to the file
for row_group in row_groups.iter_mut() {
let row_group_idx: u16 = row_group
.ordinal
.expect("Missing row group ordinal")
.try_into()
.expect("Negative row group ordinal");
let row_group_idx = row_group_idx as usize;
for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() {
match self.bloom_filters[row_group_idx][column_idx].take() {
Some(bloom_filter) => {
let start_offset = self.buf.bytes_written();
bloom_filter.write(&mut self.buf)?;
let end_offset = self.buf.bytes_written();
// set offset and index for bloom filter
let column_chunk_meta = column_chunk
.meta_data
.as_mut()
.expect("can't have bloom filter without column metadata");
column_chunk_meta.bloom_filter_offset = Some(start_offset as i64);
column_chunk_meta.bloom_filter_length =
Some((end_offset - start_offset) as i32);
}
None => {}
}
}
}
Ok(())
}

/// Serialize all the column index to the file
fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
// iter row group
Expand Down Expand Up @@ -337,14 +315,17 @@ impl<W: Write + Send> SerializedFileWriter<W> {
self.finished = true;
let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();

for row_group in &mut self.row_groups {
write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
}

let mut row_groups = self
.row_groups
.as_slice()
.iter()
.map(|v| v.to_thrift())
.collect::<Vec<_>>();

self.write_bloom_filters(&mut row_groups)?;
// Write column indexes and offset indexes
self.write_column_indexes(&mut row_groups)?;
self.write_offset_indexes(&mut row_groups)?;
Expand Down Expand Up @@ -449,6 +430,43 @@ impl<W: Write + Send> SerializedFileWriter<W> {
}
}

/// Serialize all the bloom filters of the given row group to the given buffer,
/// and returns the updated row group metadata.
fn write_bloom_filters<W: Write + Send>(
buf: &mut TrackedWrite<W>,
bloom_filters: &mut Vec<Vec<Option<Sbbf>>>,
row_group: &mut RowGroupMetaData,
) -> Result<()> {
// iter row group
// iter each column
// write bloom filter to the file

let row_group_idx: u16 = row_group
.ordinal()
.expect("Missing row group ordinal")
.try_into()
.expect("Negative row group ordinal");
let row_group_idx = row_group_idx as usize;
for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
match bloom_filters[row_group_idx][column_idx].take() {
Some(bloom_filter) => {
let start_offset = buf.bytes_written();
bloom_filter.write(&mut *buf)?;
let end_offset = buf.bytes_written();
// set offset and index for bloom filter
*column_chunk = column_chunk
.clone()
.into_builder()
.set_bloom_filter_offset(Some(start_offset as i64))
.set_bloom_filter_length(Some((end_offset - start_offset) as i32))
.build()?;
}
None => {}
}
}
Ok(())
}

/// Parquet row group writer API.
/// Provides methods to access column writers in an iterator-like fashion, order is
/// guaranteed to match the order of schema leaves (column descriptors).
Expand All @@ -474,7 +492,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
offset_indexes: Vec<Option<OffsetIndex>>,
row_group_index: i16,
file_offset: i64,
on_close: Option<OnCloseRowGroup<'a>>,
on_close: Option<OnCloseRowGroup<'a, W>>,
}

impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
Expand All @@ -491,7 +509,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
properties: WriterPropertiesPtr,
buf: &'a mut TrackedWrite<W>,
row_group_index: i16,
on_close: Option<OnCloseRowGroup<'a>>,
on_close: Option<OnCloseRowGroup<'a, W>>,
) -> Self {
let num_columns = schema_descr.num_columns();
let file_offset = buf.bytes_written() as i64;
Expand Down Expand Up @@ -675,12 +693,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
.set_file_offset(self.file_offset)
.build()?;

let metadata = Arc::new(row_group_metadata);
self.row_group_metadata = Some(metadata.clone());
self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));

if let Some(on_close) = self.on_close.take() {
on_close(
metadata,
self.buf,
row_group_metadata,
self.bloom_filters,
self.column_indexes,
self.offset_indexes,
Expand Down Expand Up @@ -1452,7 +1470,7 @@ mod tests {
assert_eq!(flushed.len(), idx + 1);
assert_eq!(Some(idx as i16), last_group.ordinal());
assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
assert_eq!(flushed[idx].as_ref(), last_group.as_ref());
assert_eq!(flushed[idx], Arc::unwrap_or_clone(last_group));
}
let file_metadata = file_writer.close().unwrap();

Expand Down

0 comments on commit f23759a

Please sign in to comment.