Skip to content

Commit

Permalink
Simplify parquet statistics generation (#5183)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Dec 8, 2023
1 parent 2a213bc commit 7e28913
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 85 deletions.
40 changes: 14 additions & 26 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::data_type::{AsBytes, ByteArray, Int32Type};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::properties::{WriterProperties, WriterVersion};
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
Expand Down Expand Up @@ -379,6 +379,7 @@ impl DictEncoder {
pub struct ByteArrayEncoder {
fallback: FallbackEncoder,
dict_encoder: Option<DictEncoder>,
statistics_enabled: EnabledStatistics,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
bloom_filter: Option<Sbbf>,
Expand All @@ -387,24 +388,6 @@ pub struct ByteArrayEncoder {
impl ColumnValueEncoder for ByteArrayEncoder {
type T = ByteArray;
type Values = dyn Array;

fn min_max(
&self,
values: &dyn Array,
value_indices: Option<&[usize]>,
) -> Option<(Self::T, Self::T)> {
match value_indices {
Some(indices) => {
let iter = indices.iter().cloned();
downcast_op!(values.data_type(), values, compute_min_max, iter)
}
None => {
let len = Array::len(values);
downcast_op!(values.data_type(), values, compute_min_max, 0..len)
}
}
}

fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
self.bloom_filter.take()
}
Expand All @@ -424,12 +407,15 @@ impl ColumnValueEncoder for ByteArrayEncoder {
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;

let statistics_enabled = props.statistics_enabled(descr.path());

Ok(Self {
fallback,
statistics_enabled,
bloom_filter,
dict_encoder: dictionary,
min_value: None,
max_value: None,
bloom_filter,
})
}

Expand Down Expand Up @@ -498,13 +484,15 @@ where
T: ArrayAccessor + Copy,
T::Item: Copy + Ord + AsRef<[u8]>,
{
if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
encoder.min_value = Some(min);
}
if encoder.statistics_enabled != EnabledStatistics::None {
if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
encoder.min_value = Some(min);
}

if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
encoder.max_value = Some(max);
if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
encoder.max_value = Some(max);
}
}
}

Expand Down
29 changes: 8 additions & 21 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,6 @@ pub trait ColumnValueEncoder {
/// The values encoded by this encoder
type Values: ColumnValues + ?Sized;

/// Returns the min and max values in this collection, skipping any NaN values
///
/// Returns `None` if no values found
fn min_max(
&self,
values: &Self::Values,
value_indices: Option<&[usize]>,
) -> Option<(Self::T, Self::T)>;

/// Create a new [`ColumnValueEncoder`]
fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
where
Expand Down Expand Up @@ -136,8 +127,15 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
}

impl<T: DataType> ColumnValueEncoderImpl<T> {
fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> {
match value_indices {
Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])),
None => get_min_max(&self.descr, values.iter()),
}
}

fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
if self.statistics_enabled == EnabledStatistics::Page
if self.statistics_enabled != EnabledStatistics::None
// INTERVAL has undefined sort order, so don't write min/max stats for it
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
Expand Down Expand Up @@ -166,17 +164,6 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {

type Values = [T::T];

fn min_max(
&self,
values: &Self::Values,
value_indices: Option<&[usize]>,
) -> Option<(Self::T, Self::T)> {
match value_indices {
Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])),
None => get_min_max(&self.descr, values.iter()),
}
}

fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
self.bloom_filter.take()
}
Expand Down
60 changes: 22 additions & 38 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,28 +329,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
None => values.len(),
};

// If only computing chunk-level statistics compute them here, page-level statistics
// are computed in [`Self::write_mini_batch`] and used to update chunk statistics in
// [`Self::add_data_page`]
if self.statistics_enabled == EnabledStatistics::Chunk
// INTERVAL has undefined sort order, so don't write min/max stats for it
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
match (min, max) {
(Some(min), Some(max)) => {
update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
}
(None, Some(_)) | (Some(_), None) => {
panic!("min/max should be both set or both None")
}
(None, None) => {
if let Some((min, max)) = self.encoder.min_max(values, value_indices) {
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
}
}
};
if let Some(min) = min {
update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
}
if let Some(max) = max {
update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
}

// We can only set the distinct count if there are no other writes
Expand Down Expand Up @@ -764,22 +747,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;

let page_statistics = if let (Some(min), Some(max)) =
(values_data.min_value, values_data.max_value)
{
// Update chunk level statistics
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);

(self.statistics_enabled == EnabledStatistics::Page).then_some(ValueStatistics::new(
Some(min),
Some(max),
None,
self.page_metrics.num_page_nulls,
false,
))
} else {
None
let page_statistics = match (values_data.min_value, values_data.max_value) {
(Some(min), Some(max)) => {
// Update chunk level statistics
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);

(self.statistics_enabled == EnabledStatistics::Page).then_some(
ValueStatistics::new(
Some(min),
Some(max),
None,
self.page_metrics.num_page_nulls,
false,
),
)
}
_ => None,
};

// update column and offset index
Expand Down

0 comments on commit 7e28913

Please sign in to comment.