Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic aggregate support #365

Merged
merged 6 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions nemo-cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ impl LoggingArgs {
/// * `Warn` otherwise
pub fn initialize_logging(&self) {
let mut builder = env_logger::Builder::new();

// Default log level
builder.filter_level(log::LevelFilter::Warn);

builder.parse_env("NMO_LOG");
if let Some(ref level) = self.log_level {
builder.parse_filters(level);
Expand Down
7 changes: 7 additions & 0 deletions nemo-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ fn run(mut cli: CliApp) -> Result<(), Error> {
log::info!("Rules parsed");
log::trace!("{:?}", program);

for atom in program.rules().iter().flat_map(|rule| rule.head()) {
if atom.aggregates().next().is_some() {
log::warn!("Program is using the experimental aggregates feature and currently depends on the internally chosen variable orders for predicates.",);
break;
}
}

let parsed_fact = cli.trace_fact.map(parse_fact).transpose()?;

if cli.write_all_idb_predicates {
Expand Down
8 changes: 8 additions & 0 deletions nemo-physical/src/aggregates.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! Physical layer aggregates, which have no understanding of the logical meaning of the aggregated values.
//!
//! This for example allows aggregating [`crate::datatypes::StorageTypeName::U64`] values, even tough this may not make sense on the logical layer,
//! where these values may correspond to dictionary entries in arbitrary order.
//! Thus, any users of this module have to ensure they use the aggregate operations in a logically meaningful way.

pub mod operation;
pub mod processors;
45 changes: 45 additions & 0 deletions nemo-physical/src/aggregates/operation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//! Exposes supported aggregate operations and allows created the associated processors

use crate::datatypes::DataTypeName;

use super::processors::{
aggregate::Aggregate, count_aggregate::CountAggregateProcessor,
max_aggregate::MaxAggregateProcessor, min_aggregate::MinAggregateProcessor,
processor::AggregateProcessor, sum_aggregate::SumAggregateProcessor,
};

#[derive(Clone, Copy, Debug, PartialEq)]
/// Aggregate operations supported by the physical layer
pub enum AggregateOperation {
/// Minimum value
Min,
/// Maximum value
Max,
/// Sum of all values
Sum,
/// Count of values
Count,
}

impl AggregateOperation {
/// Creates a new aggregate processor for the given aggregate operation.
/// TODO: This is currently implemented using dynamic dispatch, but this may change in the future.
pub fn create_processor<A: Aggregate>(&self) -> Box<dyn AggregateProcessor<A>> {
match self {
AggregateOperation::Count => Box::new(CountAggregateProcessor::new()),
AggregateOperation::Max => Box::new(MaxAggregateProcessor::new()),
AggregateOperation::Min => Box::new(MinAggregateProcessor::new()),
AggregateOperation::Sum => Box::new(SumAggregateProcessor::new()),
}
}

/// Returns whether the aggregate operation always produces an aggregate output column of the same type.
/// If [`Some`] is returned, this is the static output type of the aggregate operation.
/// If [`None`] is returned, the aggregate operation will always have the same output and input type.
pub fn static_output_type(&self) -> Option<DataTypeName> {
match self {
AggregateOperation::Count => Some(DataTypeName::I64),
_ => None,
}
}
}
8 changes: 8 additions & 0 deletions nemo-physical/src/aggregates/processors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! This module contains the built-in aggregate operators, which determine how to aggregate multiple values in a group into a single output value.

pub mod aggregate;
pub mod count_aggregate;
pub mod max_aggregate;
pub mod min_aggregate;
pub mod processor;
pub mod sum_aggregate;
32 changes: 32 additions & 0 deletions nemo-physical/src/aggregates/processors/aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! Aggregate value type

use std::fmt::Debug;

use num::CheckedAdd;

use crate::datatypes::StorageValueT;

/// A value that can be aggregated using any of the supported aggregates.
///
/// [`CheckedAdd`] is required for sum aggregates.
/// [`Clone`] and [`Into<StorageValueT>`] is required to return a storage value in `finish` function in min/max/sum aggregates.
/// [`Debug`] for debugging
/// [`Default`] is required to initialize the aggregator in sum aggregates.
/// [`PartialOrd`] is required for min/max aggregates.
/// `'static` is required to store the value e.g. in min/max aggregates.
pub trait Aggregate:
CheckedAdd + Clone + Debug + Default + Into<StorageValueT> + PartialEq + PartialOrd + 'static
{
}

impl<T> Aggregate for T where
T: CheckedAdd
+ Clone
+ Debug
+ Default
+ Into<StorageValueT>
+ PartialEq
+ PartialOrd
+ 'static
{
}
62 changes: 62 additions & 0 deletions nemo-physical/src/aggregates/processors/count_aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//! Count the input values. Always returns an [i64], independent of the input value type.

use std::marker::PhantomData;

use crate::datatypes::StorageValueT;

use super::{
aggregate::Aggregate,
processor::{AggregateGroupProcessor, AggregateProcessor},
};

pub(crate) struct CountAggregateProcessor<A>
where
A: Aggregate,
{
phantom_data: PhantomData<A>,
}

impl<A: Aggregate> CountAggregateProcessor<A> {
pub fn new() -> Self {
Self {
phantom_data: PhantomData,
}
}
}

impl<A: Aggregate> AggregateProcessor<A> for CountAggregateProcessor<A> {
fn idempotent(&self) -> bool {
false
}

fn group(&self) -> Box<dyn AggregateGroupProcessor<A>> {
Box::new(CountAggregateGroupProcessor::new())
}
}

pub(crate) struct CountAggregateGroupProcessor<A>
where
A: Aggregate,
{
current_count: i64,
phantom_data: PhantomData<A>,
}

impl<A: Aggregate> CountAggregateGroupProcessor<A> {
pub fn new() -> Self {
Self {
current_count: 0,
phantom_data: PhantomData,
}
}
}

impl<A: Aggregate> AggregateGroupProcessor<A> for CountAggregateGroupProcessor<A> {
fn write_aggregate_input_value(&mut self, _value: A) {
self.current_count += 1;
}

fn finish(&self) -> Option<StorageValueT> {
Some(self.current_count.into())
}
}
69 changes: 69 additions & 0 deletions nemo-physical/src/aggregates/processors/max_aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//! Computes the maximum of all input values.

use std::marker::PhantomData;

use crate::datatypes::StorageValueT;

use super::{
aggregate::Aggregate,
processor::{AggregateGroupProcessor, AggregateProcessor},
};

pub(crate) struct MaxAggregateProcessor<A>
where
A: PartialEq + PartialOrd + 'static,
{
phantom_data: PhantomData<A>,
}

impl<A: Aggregate> MaxAggregateProcessor<A> {
pub fn new() -> Self {
Self {
phantom_data: PhantomData,
}
}
}

impl<A: Aggregate> AggregateProcessor<A> for MaxAggregateProcessor<A> {
fn idempotent(&self) -> bool {
true
}

fn group(&self) -> Box<dyn AggregateGroupProcessor<A>> {
Box::new(MaxAggregateGroupProcessor::new())
}
}

pub(crate) struct MaxAggregateGroupProcessor<A>
where
A: Aggregate,
{
current_max_value: Option<A>,
}

impl<A: Aggregate> MaxAggregateGroupProcessor<A> {
pub fn new() -> Self {
Self {
current_max_value: None,
}
}
}

impl<A: Aggregate> AggregateGroupProcessor<A> for MaxAggregateGroupProcessor<A> {
fn write_aggregate_input_value(&mut self, value: A) {
match &self.current_max_value {
Some(current_max_value) => {
if value > *current_max_value {
self.current_max_value = Some(value);
}
}
None => self.current_max_value = Some(value),
}
}

fn finish(&self) -> Option<StorageValueT> {
self.current_max_value
.as_ref()
.map(|value| value.clone().into())
}
}
69 changes: 69 additions & 0 deletions nemo-physical/src/aggregates/processors/min_aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//! Computes the minimum of all input values.

use std::marker::PhantomData;

use crate::datatypes::StorageValueT;

use super::{
aggregate::Aggregate,
processor::{AggregateGroupProcessor, AggregateProcessor},
};

pub(crate) struct MinAggregateProcessor<A>
where
A: Aggregate,
{
phantom_data: PhantomData<A>,
}

impl<A: Aggregate> MinAggregateProcessor<A> {
pub fn new() -> Self {
Self {
phantom_data: PhantomData,
}
}
}

impl<A: Aggregate> AggregateProcessor<A> for MinAggregateProcessor<A> {
fn idempotent(&self) -> bool {
true
}

fn group(&self) -> Box<dyn AggregateGroupProcessor<A>> {
Box::new(MinAggregateGroupProcessor::new())
}
}

pub(crate) struct MinAggregateGroupProcessor<A>
where
A: Aggregate,
{
current_min_value: Option<A>,
}

impl<A: Aggregate> MinAggregateGroupProcessor<A> {
pub fn new() -> Self {
Self {
current_min_value: None,
}
}
}

impl<A: Aggregate> AggregateGroupProcessor<A> for MinAggregateGroupProcessor<A> {
fn write_aggregate_input_value(&mut self, value: A) {
match &self.current_min_value {
Some(current_min_value) => {
if value < *current_min_value {
self.current_min_value = Some(value);
}
}
None => self.current_min_value = Some(value),
}
}

fn finish(&self) -> Option<StorageValueT> {
self.current_min_value
.as_ref()
.map(|value| value.clone().into())
}
}
29 changes: 29 additions & 0 deletions nemo-physical/src/aggregates/processors/processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//! Traits for implementing new aggregate operations

use crate::datatypes::StorageValueT;

use super::aggregate::Aggregate;

/// Allows for aggregation of a column, by providing [`AggregateGroupProcessor`] for every group in the input trie scan.
pub trait AggregateProcessor<A: Aggregate> {
/// Returns whether the aggregate processor is invariant to being called with the same aggregated value multiple times in a row.
/// This function has to return the same value independent of the aggregated value type.
///
/// If `true` is returned this allows for additional optimizations when creating the execution plan (e.g. not needing to reorder if the distinct variables are in the wrong variable order).
fn idempotent(&self) -> bool;

/// Creates a [`AggregateGroupProcessor`] for aggregating values with the same values in group-by columns.
fn group(&self) -> Box<dyn AggregateGroupProcessor<A>>;
}

/// Allows aggregation of multiple rows (all with the same group-by values) to produce a single aggregate value.
pub trait AggregateGroupProcessor<A>
where
A: Aggregate,
{
/// Processes a row of the aggregated input column and updates the internal state.
fn write_aggregate_input_value(&mut self, value: A);

/// Returns the resulting aggregated value of all the processed input values.
fn finish(&self) -> Option<StorageValueT>;
rlwww marked this conversation as resolved.
Show resolved Hide resolved
}
Loading