From e11f89c794e145d6ac4ebcd8d271b3d7abd1f6a0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 25 Sep 2024 10:56:16 -0400 Subject: [PATCH 1/6] Refactor PrimitiveGroupValueBuilder to use BooleanBuilder --- .../group_values/group_value_row.rs | 147 ++++++++++++------ 1 file changed, 99 insertions(+), 48 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_value_row.rs b/datafusion/physical-plan/src/aggregates/group_values/group_value_row.rs index ad8da37e7ca0..8ab4fed1ff44 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_value_row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_value_row.rs @@ -62,9 +62,14 @@ pub trait ArrayRowEq: Send + Sync { pub struct PrimitiveGroupValueBuilder { group_values: Vec, - nulls: Vec, - // whether the array contains at least one null, for fast non-null path - has_null: bool, + /// Nulls buffer for the group values -- + /// * None if we have seen no arrays yet + /// * Some if we have seen arrays and have at least one null + /// + /// Note this is an Arrow *VALIDITY* buffer (so it is false for nulls, true + /// for non-nulls) + nulls: Option, + /// If true, the input is guaranteed not to have nulls nullable: bool, } @@ -72,11 +77,13 @@ impl PrimitiveGroupValueBuilder where T: ArrowPrimitiveType, { + /// Create a new [`PrimitiveGroupValueBuilder`] + /// + /// If `nullable` is false, it means the input will never have nulls pub fn new(nullable: bool) -> Self { Self { group_values: vec![], - nulls: vec![], - has_null: false, + nulls: None, nullable, } } @@ -84,35 +91,52 @@ where impl ArrayRowEq for PrimitiveGroupValueBuilder { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { - // non-null fast path - // both non-null + // fast path when input has no nulls if !self.nullable { + debug_assert!(self.nulls.is_none()); return self.group_values[lhs_row] == array.as_primitive::().value(rhs_row); } - - // lhs is non-null - if self.nulls[lhs_row] { - if array.is_null(rhs_row) { + // slow path if the input could have nulls + if let Some(nulls) = self.nulls.as_ref() { + // if lhs_row is valid (non null), but array is null + let lhs_is_null = !nulls.get_bit(lhs_row); + let rhs_is_null = array.is_null(rhs_row); + if lhs_is_null != rhs_is_null { return false; } - - return self.group_values[lhs_row] - == array.as_primitive::().value(rhs_row); } - - array.is_null(rhs_row) + self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) } fn append_val(&mut self, array: &ArrayRef, row: usize) { - if self.nullable && array.is_null(row) { - self.group_values.push(T::default_value()); - self.nulls.push(false); - self.has_null = true; - } else { - let elem = array.as_primitive::().value(row); - self.group_values.push(elem); - self.nulls.push(true); + let is_null = array.is_null(row); + + match self.nulls.as_mut() { + Some(nulls) => { + if is_null { + nulls.append(false); + self.group_values.push(T::default_value()); + } else { + nulls.append(true); + self.group_values.push(array.as_primitive::().value(row)); + } + } + None => { + if is_null { + // have seen no nulls so far, this is the first null, + // need to create the nulls buffer for all currently valid values + let num_values = self.group_values.len(); + let mut nulls = BooleanBufferBuilder::new(num_values); + nulls.append_n(num_values, true); + nulls.append(false); // null + self.group_values.push(T::default_value()); + self.nulls = Some(nulls); + } else { + // Had no nulls so far, and this is not a null + self.group_values.push(array.as_primitive::().value(row)); + } + } } } @@ -121,37 +145,64 @@ impl ArrayRowEq for PrimitiveGroupValueBuilder { } fn size(&self) -> usize { - self.group_values.allocated_size() + self.nulls.allocated_size() + // BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes) + let null_builder_size = self + .nulls + .as_ref() + .map(|nulls| nulls.capacity() / 8) + .unwrap_or(0); + self.group_values.allocated_size() + null_builder_size } fn build(self: Box) -> ArrayRef { - if self.has_null { - Arc::new(PrimitiveArray::::new( - ScalarBuffer::from(self.group_values), - Some(NullBuffer::from(self.nulls)), - )) - } else { - Arc::new(PrimitiveArray::::new( - ScalarBuffer::from(self.group_values), - None, - )) - } + let Self { + group_values, + nulls, + nullable: _, + } = *self; + let null_buffer = nulls.map(|mut nulls| NullBuffer::from(nulls.finish())); + + Arc::new(PrimitiveArray::::new( + ScalarBuffer::from(group_values), + null_buffer, + )) } fn take_n(&mut self, n: usize) -> ArrayRef { - if self.has_null { - let first_n = self.group_values.drain(0..n).collect::>(); - let first_n_nulls = self.nulls.drain(0..n).collect::>(); - Arc::new(PrimitiveArray::::new( - ScalarBuffer::from(first_n), - Some(NullBuffer::from(first_n_nulls)), - )) - } else { - let first_n = self.group_values.drain(0..n).collect::>(); - self.nulls.truncate(self.nulls.len() - n); - Arc::new(PrimitiveArray::::new(ScalarBuffer::from(first_n), None)) - } + let first_n = self.group_values.drain(0..n).collect::>(); + + let first_n_nulls = self.nulls.take().map(|nulls| { + let (first_n_nulls, remaining_nulls) = take_n_nulls(n, nulls); + self.nulls = Some(remaining_nulls); + first_n_nulls + }); + + Arc::new(PrimitiveArray::::new( + ScalarBuffer::from(first_n), + first_n_nulls, + )) + } +} + +/// Takes the first `n` nulls from the `nulls` buffer and +/// +/// Returns +/// * [`NullBuffer`] with the first `n` nulls +/// * [`BooleanBufferBuilder`] with the remaining nulls +fn take_n_nulls( + n: usize, + mut nulls: BooleanBufferBuilder, +) -> (NullBuffer, BooleanBufferBuilder) { + // Copy over the values at n..len-1 values to the start of a new builder + // (todo it would be great to use something like `set_bits` from arrow here. + let mut new_builder = BooleanBufferBuilder::new(nulls.len()); + for i in n..nulls.len() { + new_builder.append(nulls.get_bit(i)); } + // take only first n values from the original builder + nulls.truncate(n); + let null_buffer = NullBuffer::from(nulls.finish()); + (null_buffer, new_builder) } pub struct ByteGroupValueBuilder From 20f1ce52c505994490e1d2072e4d359ccfa2b906 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 28 Sep 2024 10:50:31 -0400 Subject: [PATCH 2/6] Refactor boolean buffer builder out --- .../aggregates/group_values/group_column.rs | 95 +++----------- .../src/aggregates/group_values/mod.rs | 1 + .../aggregates/group_values/null_builder.rs | 119 ++++++++++++++++++ 3 files changed, 134 insertions(+), 81 deletions(-) create mode 100644 datafusion/physical-plan/src/aggregates/group_values/null_builder.rs diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index b944b85254ee..249672cbcbfb 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -35,6 +35,7 @@ use datafusion_common::utils::proxy::VecAllocExt; use std::sync::Arc; use std::vec; +use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; /// Trait for group values column-wise row comparison @@ -62,15 +63,10 @@ pub trait GroupColumn: Send + Sync { pub struct PrimitiveGroupValueBuilder { group_values: Vec, - /// Nulls buffer for the group values -- - /// * None if we have seen no arrays yet - /// * Some if we have seen arrays and have at least one null - /// - /// Note this is an Arrow *VALIDITY* buffer (so it is false for nulls, true - /// for non-nulls) - nulls: Option, - /// If true, the input is guaranteed not to have nulls + /// If false, the input is guaranteed to have no nulls nullable: bool, + /// Null state + nulls: MaybeNullBufferBuilder, } impl PrimitiveGroupValueBuilder @@ -83,7 +79,7 @@ where pub fn new(nullable: bool) -> Self { Self { group_values: vec![], - nulls: None, + nulls: MaybeNullBufferBuilder::new(), nullable, } } @@ -93,51 +89,20 @@ impl GroupColumn for PrimitiveGroupValueBuilder { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { // fast path when input has no nulls if !self.nullable { - debug_assert!(self.nulls.is_none()); + debug_assert!(!self.nulls.has_nulls()); return self.group_values[lhs_row] == array.as_primitive::().value(rhs_row); } // slow path if the input could have nulls - if let Some(nulls) = self.nulls.as_ref() { - // if lhs_row is valid (non null), but array is null - let lhs_is_null = !nulls.get_bit(lhs_row); - let rhs_is_null = array.is_null(rhs_row); - if lhs_is_null != rhs_is_null { - return false; - } + if self.nulls.is_null(lhs_row) || array.is_null(rhs_row) { + false // comparing null to anything is not true + } else { + self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) } - self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) } fn append_val(&mut self, array: &ArrayRef, row: usize) { - let is_null = array.is_null(row); - - match self.nulls.as_mut() { - Some(nulls) => { - if is_null { - nulls.append(false); - self.group_values.push(T::default_value()); - } else { - nulls.append(true); - self.group_values.push(array.as_primitive::().value(row)); - } - } - None => { - if is_null { - // have seen no nulls so far, this is the first null, - // need to create the nulls buffer for all currently valid values - let num_values = self.group_values.len(); - let mut nulls = BooleanBufferBuilder::new(num_values); - nulls.append_n(num_values, true); - nulls.append(false); // null - self.group_values.push(T::default_value()); - self.nulls = Some(nulls); - } else { - // Had no nulls so far, and this is not a null - self.group_values.push(array.as_primitive::().value(row)); - } - } - } + self.nulls.append(array.is_null(row)) } fn len(&self) -> usize { @@ -146,12 +111,7 @@ impl GroupColumn for PrimitiveGroupValueBuilder { fn size(&self) -> usize { // BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes) - let null_builder_size = self - .nulls - .as_ref() - .map(|nulls| nulls.capacity() / 8) - .unwrap_or(0); - self.group_values.allocated_size() + null_builder_size + self.group_values.allocated_size() + self.nulls.allocated_size() } fn build(self: Box) -> ArrayRef { @@ -160,22 +120,16 @@ impl GroupColumn for PrimitiveGroupValueBuilder { nulls, nullable: _, } = *self; - let null_buffer = nulls.map(|mut nulls| NullBuffer::from(nulls.finish())); Arc::new(PrimitiveArray::::new( ScalarBuffer::from(group_values), - null_buffer, + nulls.build(), )) } fn take_n(&mut self, n: usize) -> ArrayRef { let first_n = self.group_values.drain(0..n).collect::>(); - - let first_n_nulls = self.nulls.take().map(|nulls| { - let (first_n_nulls, remaining_nulls) = take_n_nulls(n, nulls); - self.nulls = Some(remaining_nulls); - first_n_nulls - }); + let first_n_nulls = self.nulls.take_n(n); Arc::new(PrimitiveArray::::new( ScalarBuffer::from(first_n), @@ -184,27 +138,6 @@ impl GroupColumn for PrimitiveGroupValueBuilder { } } -/// Takes the first `n` nulls from the `nulls` buffer and -/// -/// Returns -/// * [`NullBuffer`] with the first `n` nulls -/// * [`BooleanBufferBuilder`] with the remaining nulls -fn take_n_nulls( - n: usize, - mut nulls: BooleanBufferBuilder, -) -> (NullBuffer, BooleanBufferBuilder) { - // Copy over the values at n..len-1 values to the start of a new builder - // (todo it would be great to use something like `set_bits` from arrow here. - let mut new_builder = BooleanBufferBuilder::new(nulls.len()); - for i in n..nulls.len() { - new_builder.append(nulls.get_bit(i)); - } - // take only first n values from the original builder - nulls.truncate(n); - let null_buffer = NullBuffer::from(nulls.finish()); - (null_buffer, new_builder) -} - pub struct ByteGroupValueBuilder where O: OffsetSizeTrait, diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 9256631fa578..bc05e8a40516 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -36,6 +36,7 @@ use bytes::GroupValuesByes; use datafusion_physical_expr::binary_map::OutputType; mod group_column; +mod null_builder; /// An interning store for group keys pub trait GroupValues: Send { diff --git a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs new file mode 100644 index 000000000000..622f6e001784 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; + +/// Support building up an optional null mask +#[derive(Debug)] +pub enum MaybeNullBufferBuilder { + /// seen `row_count` rows but no nulls yet + NoNulls { row_count: usize }, + /// have at least one null value + /// + /// Note this is an Arrow *VALIDITY* buffer (so it is false for nulls, true + /// for non-nulls) + Nulls(BooleanBufferBuilder), +} + +impl MaybeNullBufferBuilder { + /// Create a new builder + pub fn new() -> Self { + Self::NoNulls { row_count: 0 } + } + + /// Returns true if this builder is tracking any nulls + /// + /// This will return true if at least one null has been set via XXX + pub fn has_nulls(&self) -> bool { + matches!(self, Self::Nulls { .. }) + } + + /// Return true if the row at index `row` is null + pub fn is_null(&self, row: usize) -> bool { + match self { + Self::NoNulls { .. } => false, + // validity mask means a unset bit is NULL + Self::Nulls(nulls) => !nulls.get_bit(row), + } + } + + /// Set the nullness of the next row to `is_null` + /// + /// num_values is the current length of the rows being tracked + /// + /// If `value` is true, the row is null. + /// If `value` is false, the row is non null + pub fn append(&mut self, is_null: bool) { + match self { + Self::NoNulls { row_count } if !is_null => { + *row_count += 1; + } + Self::NoNulls { row_count } => { + // have seen no nulls so far, this is the first null, + // need to create the nulls buffer for all currently valid values + // alloc 2x the need given we push a new but immediately + let mut nulls = BooleanBufferBuilder::new(*row_count * 2); + nulls.append_n(*row_count, true); + nulls.append(false); + *self = Self::Nulls(nulls); + } + Self::Nulls(nulls) => nulls.append(is_null), + } + } + + /// return the number of heap allocated bytes used by this structure to store boolean values + pub fn allocated_size(&self) -> usize { + match self { + Self::NoNulls { .. } => 0, + Self::Nulls(nulls) => nulls.capacity() / 8, + } + } + + /// Return a NullBuffer representing the accumulated nulls so far + pub fn build(self) -> Option { + match self { + Self::NoNulls { .. } => None, + Self::Nulls(mut nulls) => Some(NullBuffer::from(nulls.finish())), + } + } + + /// Returns a NullBuffer representing the first `n` rows accumulated so far + /// shifting any remaining down by `n` + pub fn take_n(&mut self, n: usize) -> Option { + match self { + Self::NoNulls { row_count } => { + *row_count -= n; + None + } + Self::Nulls(nulls) => { + // Copy over the values at n..len-1 values to the start of a + // new builder and leave it in self + // + // TODO: it would be great to use something like `set_bits` from arrow here. + let mut new_builder = BooleanBufferBuilder::new(nulls.len()); + for i in n..nulls.len() { + new_builder.append(nulls.get_bit(i)); + } + std::mem::swap(&mut new_builder, nulls); + + // take only first n values from the original builder + new_builder.truncate(n); + Some(NullBuffer::from(new_builder.finish())) + } + } + } +} From ee7ed9dcfc5febd97e30224864d7a6f2725b8bd3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 28 Sep 2024 11:01:21 -0400 Subject: [PATCH 3/6] tweaks --- .../aggregates/group_values/group_column.rs | 8 ++++- .../aggregates/group_values/null_builder.rs | 29 ++++++++++--------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 249672cbcbfb..873ac986cfa6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -102,7 +102,13 @@ impl GroupColumn for PrimitiveGroupValueBuilder { } fn append_val(&mut self, array: &ArrayRef, row: usize) { - self.nulls.append(array.is_null(row)) + if array.is_null(row) { + self.nulls.append(true); + self.group_values.push(T::default_value()); + } else { + self.nulls.append(false); + self.group_values.push(array.as_primitive::().value(row)); + } } fn len(&self) -> usize { diff --git a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs index 622f6e001784..13e2e210292e 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs @@ -19,7 +19,7 @@ use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; /// Support building up an optional null mask #[derive(Debug)] -pub enum MaybeNullBufferBuilder { +pub(crate) enum MaybeNullBufferBuilder { /// seen `row_count` rows but no nulls yet NoNulls { row_count: usize }, /// have at least one null value @@ -47,7 +47,7 @@ impl MaybeNullBufferBuilder { match self { Self::NoNulls { .. } => false, // validity mask means a unset bit is NULL - Self::Nulls(nulls) => !nulls.get_bit(row), + Self::Nulls(builder) => !builder.get_bit(row), } } @@ -59,10 +59,7 @@ impl MaybeNullBufferBuilder { /// If `value` is false, the row is non null pub fn append(&mut self, is_null: bool) { match self { - Self::NoNulls { row_count } if !is_null => { - *row_count += 1; - } - Self::NoNulls { row_count } => { + Self::NoNulls { row_count } if is_null => { // have seen no nulls so far, this is the first null, // need to create the nulls buffer for all currently valid values // alloc 2x the need given we push a new but immediately @@ -71,15 +68,19 @@ impl MaybeNullBufferBuilder { nulls.append(false); *self = Self::Nulls(nulls); } - Self::Nulls(nulls) => nulls.append(is_null), + Self::NoNulls { row_count } => { + *row_count += 1; + } + Self::Nulls(builder) => builder.append(!is_null), } } + /// return the number of heap allocated bytes used by this structure to store boolean values pub fn allocated_size(&self) -> usize { match self { Self::NoNulls { .. } => 0, - Self::Nulls(nulls) => nulls.capacity() / 8, + Self::Nulls(builder) => builder.capacity() / 8, } } @@ -87,7 +88,7 @@ impl MaybeNullBufferBuilder { pub fn build(self) -> Option { match self { Self::NoNulls { .. } => None, - Self::Nulls(mut nulls) => Some(NullBuffer::from(nulls.finish())), + Self::Nulls(mut builder) => Some(NullBuffer::from(builder.finish())), } } @@ -99,16 +100,16 @@ impl MaybeNullBufferBuilder { *row_count -= n; None } - Self::Nulls(nulls) => { + Self::Nulls(builder) => { // Copy over the values at n..len-1 values to the start of a // new builder and leave it in self // // TODO: it would be great to use something like `set_bits` from arrow here. - let mut new_builder = BooleanBufferBuilder::new(nulls.len()); - for i in n..nulls.len() { - new_builder.append(nulls.get_bit(i)); + let mut new_builder = BooleanBufferBuilder::new(builder.len()); + for i in n..builder.len() { + new_builder.append(builder.get_bit(i)); } - std::mem::swap(&mut new_builder, nulls); + std::mem::swap(&mut new_builder, builder); // take only first n values from the original builder new_builder.truncate(n); From 9f879553276c3fdd7b4e21cb950c1aa4ed4b38ca Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 28 Sep 2024 11:05:46 -0400 Subject: [PATCH 4/6] tweak --- .../src/aggregates/group_values/group_column.rs | 13 ++++++------- .../src/aggregates/group_values/null_builder.rs | 3 +-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 873ac986cfa6..9a035362831f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -47,6 +47,8 @@ use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAP pub trait GroupColumn: Send + Sync { /// Returns equal if the row stored in this builder at `lhs_row` is equal to /// the row in `array` at `rhs_row` + /// + /// Note that this comparison returns true if both elements are NULL fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool; /// Appends the row at `row` in `array` to this builder fn append_val(&mut self, array: &ArrayRef, row: usize); @@ -90,14 +92,11 @@ impl GroupColumn for PrimitiveGroupValueBuilder { // fast path when input has no nulls if !self.nullable { debug_assert!(!self.nulls.has_nulls()); - return self.group_values[lhs_row] - == array.as_primitive::().value(rhs_row); - } - // slow path if the input could have nulls - if self.nulls.is_null(lhs_row) || array.is_null(rhs_row) { - false // comparing null to anything is not true - } else { self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) + } else { + // slow path if the input could have nulls + self.nulls.is_null(lhs_row) == array.is_null(rhs_row) + && self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) } } diff --git a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs index 13e2e210292e..cd369b018aed 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs @@ -68,14 +68,13 @@ impl MaybeNullBufferBuilder { nulls.append(false); *self = Self::Nulls(nulls); } - Self::NoNulls { row_count } => { + Self::NoNulls { row_count } => { *row_count += 1; } Self::Nulls(builder) => builder.append(!is_null), } } - /// return the number of heap allocated bytes used by this structure to store boolean values pub fn allocated_size(&self) -> usize { match self { From 5ef10386d08565eeb814b639da80c3bf9c617909 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 28 Sep 2024 11:23:16 -0400 Subject: [PATCH 5/6] simplify --- .../aggregates/group_values/group_column.rs | 71 ++++++++++++------- .../aggregates/group_values/null_builder.rs | 12 ++-- 2 files changed, 48 insertions(+), 35 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 9a035362831f..5917f87d6910 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -32,11 +32,10 @@ use arrow::datatypes::GenericBinaryType; use arrow::datatypes::GenericStringType; use datafusion_common::utils::proxy::VecAllocExt; -use std::sync::Arc; -use std::vec; - use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; +use std::sync::Arc; +use std::vec; /// Trait for group values column-wise row comparison /// @@ -65,10 +64,8 @@ pub trait GroupColumn: Send + Sync { pub struct PrimitiveGroupValueBuilder { group_values: Vec, - /// If false, the input is guaranteed to have no nulls - nullable: bool, - /// Null state - nulls: MaybeNullBufferBuilder, + /// Null state (when None, input is guaranteed not to have nulls) + nulls: Option, } impl PrimitiveGroupValueBuilder @@ -79,10 +76,15 @@ where /// /// If `nullable` is false, it means the input will never have nulls pub fn new(nullable: bool) -> Self { + let nulls = if nullable { + Some(MaybeNullBufferBuilder::new()) + } else { + None + }; + Self { group_values: vec![], - nulls: MaybeNullBufferBuilder::new(), - nullable, + nulls, } } } @@ -90,23 +92,32 @@ where impl GroupColumn for PrimitiveGroupValueBuilder { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { // fast path when input has no nulls - if !self.nullable { - debug_assert!(!self.nulls.has_nulls()); - self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) - } else { - // slow path if the input could have nulls - self.nulls.is_null(lhs_row) == array.is_null(rhs_row) - && self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) + match self.nulls.as_ref() { + None => { + self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) + } + Some(nulls) => { + // slower path if the input could have nulls + nulls.is_null(lhs_row) == array.is_null(rhs_row) + && self.group_values[lhs_row] + == array.as_primitive::().value(rhs_row) + } } } fn append_val(&mut self, array: &ArrayRef, row: usize) { - if array.is_null(row) { - self.nulls.append(true); - self.group_values.push(T::default_value()); - } else { - self.nulls.append(false); - self.group_values.push(array.as_primitive::().value(row)); + match self.nulls.as_mut() { + // input can't possibly have nulls, so don't worry about them + None => self.group_values.push(array.as_primitive::().value(row)), + Some(nulls) => { + if array.is_null(row) { + nulls.append(true); + self.group_values.push(T::default_value()); + } else { + nulls.append(false); + self.group_values.push(array.as_primitive::().value(row)); + } + } } } @@ -115,26 +126,32 @@ impl GroupColumn for PrimitiveGroupValueBuilder { } fn size(&self) -> usize { - // BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes) - self.group_values.allocated_size() + self.nulls.allocated_size() + let nulls_size = self + .nulls + .as_ref() + .map(|nulls| nulls.allocated_size()) + .unwrap_or(0); + + self.group_values.allocated_size() + nulls_size } fn build(self: Box) -> ArrayRef { let Self { group_values, nulls, - nullable: _, } = *self; + let nulls = nulls.and_then(|nulls| nulls.build()); + Arc::new(PrimitiveArray::::new( ScalarBuffer::from(group_values), - nulls.build(), + nulls, )) } fn take_n(&mut self, n: usize) -> ArrayRef { let first_n = self.group_values.drain(0..n).collect::>(); - let first_n_nulls = self.nulls.take_n(n); + let first_n_nulls = self.nulls.as_mut().and_then(|nulls| nulls.take_n(n)); Arc::new(PrimitiveArray::::new( ScalarBuffer::from(first_n), diff --git a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs index cd369b018aed..0249390f38cd 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs @@ -17,7 +17,9 @@ use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; -/// Support building up an optional null mask +/// Builder for an (optional) null mask +/// +/// Optimized for avoid creating the bitmask when all values are non-null #[derive(Debug)] pub(crate) enum MaybeNullBufferBuilder { /// seen `row_count` rows but no nulls yet @@ -35,13 +37,6 @@ impl MaybeNullBufferBuilder { Self::NoNulls { row_count: 0 } } - /// Returns true if this builder is tracking any nulls - /// - /// This will return true if at least one null has been set via XXX - pub fn has_nulls(&self) -> bool { - matches!(self, Self::Nulls { .. }) - } - /// Return true if the row at index `row` is null pub fn is_null(&self, row: usize) -> bool { match self { @@ -79,6 +74,7 @@ impl MaybeNullBufferBuilder { pub fn allocated_size(&self) -> usize { match self { Self::NoNulls { .. } => 0, + // BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes) Self::Nulls(builder) => builder.capacity() / 8, } } From 36a2003d00c6b2cf0c05b0775fdde24274f8009f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 29 Sep 2024 06:56:56 -0400 Subject: [PATCH 6/6] Add specializations for null / non null --- .../src/aggregates/group_values/column.rs | 79 +++++------- .../aggregates/group_values/group_column.rs | 120 +++++++++++------- 2 files changed, 108 insertions(+), 91 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 977b40922f7c..1565c483c24c 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -16,7 +16,8 @@ // under the License. use crate::aggregates::group_values::group_column::{ - ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder, + ByteGroupValueBuilder, GroupColumn, NonNullPrimitiveGroupValueBuilder, + PrimitiveGroupValueBuilder, }; use crate::aggregates::group_values::GroupValues; use ahash::RandomState; @@ -116,6 +117,26 @@ impl GroupValuesColumn { } } +/// instantiates a [`PrimitiveGroupValueBuilder`] or +/// [`NonNullPrimitiveGroupValueBuilder`] and pushes it into $v +/// +/// Arguments: +/// `$v`: the vector to push the new builder into +/// `$nullable`: whether the input can contains nulls +/// `$t`: the primitive type of the builder +/// +macro_rules! instantiate_primitive { + ($v:expr, $nullable:expr, $t:ty) => { + if $nullable { + let b = PrimitiveGroupValueBuilder::<$t>::new(); + $v.push(Box::new(b) as _) + } else { + let b = NonNullPrimitiveGroupValueBuilder::<$t>::new(); + $v.push(Box::new(b) as _) + } + }; +} + impl GroupValues for GroupValuesColumn { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { let n_rows = cols[0].len(); @@ -126,54 +147,22 @@ impl GroupValues for GroupValuesColumn { for f in self.schema.fields().iter() { let nullable = f.is_nullable(); match f.data_type() { - &DataType::Int8 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::Int16 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::Int32 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::Int64 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::UInt8 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::UInt16 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::UInt32 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::UInt64 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } + &DataType::Int8 => instantiate_primitive!(v, nullable, Int8Type), + &DataType::Int16 => instantiate_primitive!(v, nullable, Int16Type), + &DataType::Int32 => instantiate_primitive!(v, nullable, Int32Type), + &DataType::Int64 => instantiate_primitive!(v, nullable, Int64Type), + &DataType::UInt8 => instantiate_primitive!(v, nullable, UInt8Type), + &DataType::UInt16 => instantiate_primitive!(v, nullable, UInt16Type), + &DataType::UInt32 => instantiate_primitive!(v, nullable, UInt32Type), + &DataType::UInt64 => instantiate_primitive!(v, nullable, UInt64Type), &DataType::Float32 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) + instantiate_primitive!(v, nullable, Float32Type) } &DataType::Float64 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::Date32 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::Date64 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) + instantiate_primitive!(v, nullable, Float64Type) } + &DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type), + &DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type), &DataType::Utf8 => { let b = ByteGroupValueBuilder::::new(OutputType::Utf8); v.push(Box::new(b) as _) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 5917f87d6910..bde871836258 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -62,62 +62,96 @@ pub trait GroupColumn: Send + Sync { fn take_n(&mut self, n: usize) -> ArrayRef; } +/// Stores a collection of primitive group values which are known to have no nulls +#[derive(Debug)] +pub struct NonNullPrimitiveGroupValueBuilder { + group_values: Vec, +} + +impl NonNullPrimitiveGroupValueBuilder +where + T: ArrowPrimitiveType, +{ + pub fn new() -> Self { + Self { + group_values: vec![], + } + } +} + +impl GroupColumn for NonNullPrimitiveGroupValueBuilder { + fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + // know input has no nulls + self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) + } + + fn append_val(&mut self, array: &ArrayRef, row: usize) { + // input can't possibly have nulls, so don't worry about them + self.group_values.push(array.as_primitive::().value(row)) + } + + fn len(&self) -> usize { + self.group_values.len() + } + + fn size(&self) -> usize { + self.group_values.allocated_size() + } + + fn build(self: Box) -> ArrayRef { + let Self { group_values } = *self; + + let nulls = None; + + Arc::new(PrimitiveArray::::new( + ScalarBuffer::from(group_values), + nulls, + )) + } + + fn take_n(&mut self, n: usize) -> ArrayRef { + let first_n = self.group_values.drain(0..n).collect::>(); + let first_n_nulls = None; + + Arc::new(PrimitiveArray::::new( + ScalarBuffer::from(first_n), + first_n_nulls, + )) + } +} + +/// Stores a collection of primitive group values which may have nulls +#[derive(Debug)] pub struct PrimitiveGroupValueBuilder { group_values: Vec, - /// Null state (when None, input is guaranteed not to have nulls) - nulls: Option, + nulls: MaybeNullBufferBuilder, } impl PrimitiveGroupValueBuilder where T: ArrowPrimitiveType, { - /// Create a new [`PrimitiveGroupValueBuilder`] - /// - /// If `nullable` is false, it means the input will never have nulls - pub fn new(nullable: bool) -> Self { - let nulls = if nullable { - Some(MaybeNullBufferBuilder::new()) - } else { - None - }; - + pub fn new() -> Self { Self { group_values: vec![], - nulls, + nulls: MaybeNullBufferBuilder::new(), } } } impl GroupColumn for PrimitiveGroupValueBuilder { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { - // fast path when input has no nulls - match self.nulls.as_ref() { - None => { - self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) - } - Some(nulls) => { - // slower path if the input could have nulls - nulls.is_null(lhs_row) == array.is_null(rhs_row) - && self.group_values[lhs_row] - == array.as_primitive::().value(rhs_row) - } - } + self.nulls.is_null(lhs_row) == array.is_null(rhs_row) + && self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) } fn append_val(&mut self, array: &ArrayRef, row: usize) { - match self.nulls.as_mut() { - // input can't possibly have nulls, so don't worry about them - None => self.group_values.push(array.as_primitive::().value(row)), - Some(nulls) => { - if array.is_null(row) { - nulls.append(true); - self.group_values.push(T::default_value()); - } else { - nulls.append(false); - self.group_values.push(array.as_primitive::().value(row)); - } - } + if array.is_null(row) { + self.nulls.append(true); + self.group_values.push(T::default_value()); + } else { + self.nulls.append(false); + self.group_values.push(array.as_primitive::().value(row)); } } @@ -126,13 +160,7 @@ impl GroupColumn for PrimitiveGroupValueBuilder { } fn size(&self) -> usize { - let nulls_size = self - .nulls - .as_ref() - .map(|nulls| nulls.allocated_size()) - .unwrap_or(0); - - self.group_values.allocated_size() + nulls_size + self.group_values.allocated_size() + self.nulls.allocated_size() } fn build(self: Box) -> ArrayRef { @@ -141,7 +169,7 @@ impl GroupColumn for PrimitiveGroupValueBuilder { nulls, } = *self; - let nulls = nulls.and_then(|nulls| nulls.build()); + let nulls = nulls.build(); Arc::new(PrimitiveArray::::new( ScalarBuffer::from(group_values), @@ -151,7 +179,7 @@ impl GroupColumn for PrimitiveGroupValueBuilder { fn take_n(&mut self, n: usize) -> ArrayRef { let first_n = self.group_values.drain(0..n).collect::>(); - let first_n_nulls = self.nulls.as_mut().and_then(|nulls| nulls.take_n(n)); + let first_n_nulls = self.nulls.take_n(n); Arc::new(PrimitiveArray::::new( ScalarBuffer::from(first_n),