Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ColumnStatistics::Sum #14074

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 73 additions & 12 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::fmt::{self, Debug, Display};

use crate::{Result, ScalarValue};

use arrow_schema::{Schema, SchemaRef};
use arrow_schema::{DataType, Schema, SchemaRef};

/// Represents a value with a degree of certainty. `Precision` is used to
/// propagate information the precision of statistical values.
Expand Down Expand Up @@ -170,24 +170,63 @@ impl Precision<ScalarValue> {
pub fn add(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => {
if let Ok(result) = a.add(b) {
Precision::Exact(result)
} else {
Precision::Absent
}
a.add(b).map(Precision::Exact).unwrap_or(Precision::Absent)
}
(Precision::Inexact(a), Precision::Exact(b))
| (Precision::Exact(a), Precision::Inexact(b))
| (Precision::Inexact(a), Precision::Inexact(b)) => {
if let Ok(result) = a.add(b) {
Precision::Inexact(result)
} else {
Precision::Absent
}
| (Precision::Inexact(a), Precision::Inexact(b)) => a
.add(b)
.map(Precision::Inexact)
.unwrap_or(Precision::Absent),
(_, _) => Precision::Absent,
}
}

/// Calculates the difference of two (possibly inexact) [`ScalarValue`] values,
/// conservatively propagating exactness information. If one of the input
/// values is [`Precision::Absent`], the result is `Absent` too.
pub fn sub(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => {
a.add(b).map(Precision::Exact).unwrap_or(Precision::Absent)
}
(Precision::Inexact(a), Precision::Exact(b))
| (Precision::Exact(a), Precision::Inexact(b))
| (Precision::Inexact(a), Precision::Inexact(b)) => a
.add(b)
.map(Precision::Inexact)
.unwrap_or(Precision::Absent),
(_, _) => Precision::Absent,
}
}

/// Calculates the multiplication of two (possibly inexact) [`ScalarValue`] values,
/// conservatively propagating exactness information. If one of the input
/// values is [`Precision::Absent`], the result is `Absent` too.
pub fn multiply(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => a
.mul_checked(b)
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
(Precision::Inexact(a), Precision::Exact(b))
| (Precision::Exact(a), Precision::Inexact(b))
| (Precision::Inexact(a), Precision::Inexact(b)) => a
.mul_checked(b)
.map(Precision::Inexact)
.unwrap_or(Precision::Absent),
(_, _) => Precision::Absent,
}
}

/// Casts the value to the given data type, propagating exactness information.
pub fn cast_to(&self, data_type: &DataType) -> Result<Precision<ScalarValue>> {
Copy link
Author

@gatesn gatesn Jan 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb one question I have is whether this should return a Result, or we should assume that a failed cast implies overflow and therefore return Precision::Absent?

The caller (currently in cross-join) unwraps to Absent, I just didn't know whether to internalize that here.

Edit: I decided it was better to propagate the error and allow the caller to decide. It was more useful in a couple of places.

match self {
Precision::Exact(value) => value.cast_to(data_type).map(Precision::Exact),
Precision::Inexact(value) => value.cast_to(data_type).map(Precision::Inexact),
Precision::Absent => Ok(Precision::Absent),
}
}
gatesn marked this conversation as resolved.
Show resolved Hide resolved
}

impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Debug for Precision<T> {
Expand All @@ -210,6 +249,18 @@ impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Display for Precision<T> {
}
}

impl From<Precision<usize>> for Precision<ScalarValue> {
fn from(value: Precision<usize>) -> Self {
match value {
Precision::Exact(v) => Precision::Exact(ScalarValue::UInt64(Some(v as u64))),
Precision::Inexact(v) => {
Precision::Inexact(ScalarValue::UInt64(Some(v as u64)))
}
Precision::Absent => Precision::Absent,
}
}
}

/// Statistics for a relation
/// Fields are optional and can be inexact because the sources
/// sometimes provide approximate estimates for performance reasons
Expand Down Expand Up @@ -401,6 +452,11 @@ impl Display for Statistics {
} else {
s
};
let s = if cs.sum_value != Precision::Absent {
format!("{} Sum={}", s, cs.sum_value)
} else {
s
};
let s = if cs.null_count != Precision::Absent {
format!("{} Null={}", s, cs.null_count)
} else {
Expand Down Expand Up @@ -436,6 +492,8 @@ pub struct ColumnStatistics {
pub max_value: Precision<ScalarValue>,
/// Minimum value of column
pub min_value: Precision<ScalarValue>,
/// Sum value of a column
pub sum_value: Precision<ScalarValue>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I think we mentioned in #13736 my only real concern with this addition is that it will make ColumnStatistics even bigger (each ScalarValue is quite large already and ColumnStatistics are copied a bunch

However, I think the "right" fix for that is to move to using a different statistics representation (e.g. Arc::ColumnStatistics) so I don't see this as a blocker

/// Number of distinct values
pub distinct_count: Precision<usize>,
}
Expand All @@ -458,6 +516,7 @@ impl ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
}
}
Expand All @@ -469,6 +528,7 @@ impl ColumnStatistics {
self.null_count = self.null_count.to_inexact();
self.max_value = self.max_value.to_inexact();
self.min_value = self.min_value.to_inexact();
self.sum_value = self.sum_value.to_inexact();
self.distinct_count = self.distinct_count.to_inexact();
self
}
Expand Down Expand Up @@ -646,6 +706,7 @@ mod tests {
null_count: Precision::Exact(null_count),
max_value: Precision::Exact(ScalarValue::Int64(Some(42))),
min_value: Precision::Exact(ScalarValue::Int64(Some(64))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(4600))),
distinct_count: Precision::Exact(100),
}
}
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub async fn get_statistics_with_limit(
col_stats_set[index].null_count = file_column.null_count;
col_stats_set[index].max_value = file_column.max_value;
col_stats_set[index].min_value = file_column.min_value;
col_stats_set[index].sum_value = file_column.sum_value;
}

// If the number of rows exceeds the limit, we can stop processing
Expand Down Expand Up @@ -113,12 +114,14 @@ pub async fn get_statistics_with_limit(
null_count: file_nc,
max_value: file_max,
min_value: file_min,
sum_value: file_sum,
distinct_count: _,
} = file_col_stats;

col_stats.null_count = add_row_stats(*file_nc, col_stats.null_count);
set_max_if_greater(file_max, &mut col_stats.max_value);
set_min_if_lesser(file_min, &mut col_stats.min_value)
set_min_if_lesser(file_min, &mut col_stats.min_value);
col_stats.sum_value = file_sum.add(&col_stats.sum_value);
}

// If the number of rows exceeds the limit, we can stop processing
Expand Down Expand Up @@ -204,6 +207,7 @@ pub(crate) fn get_col_stats(
null_count: null_counts[i],
max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent),
min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
}
})
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,14 @@ fn fully_defined() -> (Statistics, Schema) {
distinct_count: Precision::Exact(2),
max_value: Precision::Exact(ScalarValue::Int32(Some(1023))),
min_value: Precision::Exact(ScalarValue::Int32(Some(-24))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(10))),
null_count: Precision::Exact(0),
},
ColumnStatistics {
distinct_count: Precision::Exact(13),
max_value: Precision::Exact(ScalarValue::Int64(Some(5486))),
min_value: Precision::Exact(ScalarValue::Int64(Some(-6783))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(10))),
null_count: Precision::Exact(5),
},
],
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,14 @@ mod tests {
distinct_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
null_count: Precision::Exact(0),
},
ColumnStatistics {
distinct_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
null_count: Precision::Exact(0),
},
],
Expand Down Expand Up @@ -371,6 +373,7 @@ mod tests {
distinct_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
null_count: Precision::Exact(3),
}],
};
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ fn collect_new_statistics(
null_count: input_column_stats[idx].null_count.to_inexact(),
max_value,
min_value,
sum_value: Precision::Absent,
distinct_count: distinct_count.to_inexact(),
}
},
Expand Down Expand Up @@ -1149,6 +1150,7 @@ mod tests {
null_count: Precision::Absent,
min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
}],
};
Expand Down
40 changes: 40 additions & 0 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,34 @@ fn stats_cartesian_product(
distinct_count: s.distinct_count,
min_value: s.min_value,
max_value: s.max_value,
sum_value: s
.sum_value
.get_value()
// Cast the row count into the same type as any existing sum value
.and_then(|v| {
Precision::<ScalarValue>::from(right_row_count)
.cast_to(&v.data_type())
.ok()
})
.map(|row_count| s.sum_value.multiply(&row_count))
.unwrap_or(Precision::Absent),
})
.chain(right_col_stats.into_iter().map(|s| ColumnStatistics {
null_count: s.null_count.multiply(&left_row_count),
distinct_count: s.distinct_count,
min_value: s.min_value,
max_value: s.max_value,
sum_value: s
.sum_value
.get_value()
// Cast the row count into the same type as any existing sum value
.and_then(|v| {
Precision::<ScalarValue>::from(left_row_count)
.cast_to(&v.data_type())
.ok()
})
.map(|row_count| s.sum_value.multiply(&row_count))
.unwrap_or(Precision::Absent),
}))
.collect();

Expand Down Expand Up @@ -604,12 +626,14 @@ mod tests {
distinct_count: Precision::Exact(5),
max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
null_count: Precision::Exact(0),
},
ColumnStatistics {
distinct_count: Precision::Exact(1),
max_value: Precision::Exact(ScalarValue::from("x")),
min_value: Precision::Exact(ScalarValue::from("a")),
sum_value: Precision::Absent,
null_count: Precision::Exact(3),
},
],
Expand All @@ -622,6 +646,7 @@ mod tests {
distinct_count: Precision::Exact(3),
max_value: Precision::Exact(ScalarValue::Int64(Some(12))),
min_value: Precision::Exact(ScalarValue::Int64(Some(0))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(20))),
null_count: Precision::Exact(2),
}],
};
Expand All @@ -636,18 +661,25 @@ mod tests {
distinct_count: Precision::Exact(5),
max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(
42 * right_row_count as i64,
gatesn marked this conversation as resolved.
Show resolved Hide resolved
))),
null_count: Precision::Exact(0),
},
ColumnStatistics {
distinct_count: Precision::Exact(1),
max_value: Precision::Exact(ScalarValue::from("x")),
min_value: Precision::Exact(ScalarValue::from("a")),
sum_value: Precision::Absent,
null_count: Precision::Exact(3 * right_row_count),
},
ColumnStatistics {
distinct_count: Precision::Exact(3),
max_value: Precision::Exact(ScalarValue::Int64(Some(12))),
min_value: Precision::Exact(ScalarValue::Int64(Some(0))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(
20 * left_row_count as i64,
))),
null_count: Precision::Exact(2 * left_row_count),
},
],
Expand All @@ -668,12 +700,14 @@ mod tests {
distinct_count: Precision::Exact(5),
max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
null_count: Precision::Exact(0),
},
ColumnStatistics {
distinct_count: Precision::Exact(1),
max_value: Precision::Exact(ScalarValue::from("x")),
min_value: Precision::Exact(ScalarValue::from("a")),
sum_value: Precision::Absent,
null_count: Precision::Exact(3),
},
],
Expand All @@ -686,6 +720,7 @@ mod tests {
distinct_count: Precision::Exact(3),
max_value: Precision::Exact(ScalarValue::Int64(Some(12))),
min_value: Precision::Exact(ScalarValue::Int64(Some(0))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(20))),
null_count: Precision::Exact(2),
}],
};
Expand All @@ -700,18 +735,23 @@ mod tests {
distinct_count: Precision::Exact(5),
max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
sum_value: Precision::Absent, // we don't know the row count on the right
null_count: Precision::Absent, // we don't know the row count on the right
},
ColumnStatistics {
distinct_count: Precision::Exact(1),
max_value: Precision::Exact(ScalarValue::from("x")),
min_value: Precision::Exact(ScalarValue::from("a")),
sum_value: Precision::Absent,
null_count: Precision::Absent, // we don't know the row count on the right
},
ColumnStatistics {
distinct_count: Precision::Exact(3),
max_value: Precision::Exact(ScalarValue::Int64(Some(12))),
min_value: Precision::Exact(ScalarValue::Int64(Some(0))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(
20 * left_row_count as i64,
))),
null_count: Precision::Exact(2 * left_row_count),
},
],
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1983,6 +1983,7 @@ mod tests {
distinct_count,
min_value: min.map(ScalarValue::from),
max_value: max.map(ScalarValue::from),
sum_value: Absent,
null_count,
}
}
Expand Down
Loading
Loading