diff --git a/src/accumulator/accumulated_map.rs b/src/accumulator/accumulated_map.rs index bda989a7..5c9e9254 100644 --- a/src/accumulator/accumulated_map.rs +++ b/src/accumulator/accumulated_map.rs @@ -7,6 +7,10 @@ use super::{accumulated::Accumulated, Accumulator, AnyAccumulated}; #[derive(Default, Debug)] pub struct AccumulatedMap { map: FxHashMap>, + + /// [`InputAccumulatedValues::Empty`] if any input read during the query's execution + /// has any direct or indirect accumulated values. + inputs: InputAccumulatedValues, } impl AccumulatedMap { @@ -17,6 +21,21 @@ impl AccumulatedMap { .accumulate(value); } + /// Adds the accumulated state of an input to this accumulated map. + pub(crate) fn add_input(&mut self, input: InputAccumulatedValues) { + if input.is_any() { + self.inputs = InputAccumulatedValues::Any; + } + } + + /// Returns whether an input of the associated query has any accumulated values. + /// + /// Note: Use [`InputAccumulatedValues::from_map`] to check if the associated query itself + /// or any of its inputs has accumulated values. + pub(crate) fn inputs(&self) -> InputAccumulatedValues { + self.inputs + } + pub fn extend_with_accumulated( &self, index: IngredientIndex, @@ -41,6 +60,39 @@ impl Clone for AccumulatedMap { .iter() .map(|(&key, value)| (key, value.cloned())) .collect(), + inputs: self.inputs, } } } + +/// Tracks whether any input read during a query's execution has any accumulated values. +/// +/// Knowning whether any input has accumulated values makes aggregating the accumulated values +/// cheaper because we can skip over entire subtrees. +#[derive(Copy, Clone, Debug, Default)] +pub(crate) enum InputAccumulatedValues { + /// The query nor any of its inputs have any accumulated values. + #[default] + Empty, + + /// The query or any of its inputs have at least one accumulated value. + Any, +} + +impl InputAccumulatedValues { + pub(crate) fn from_map(accumulated: &AccumulatedMap) -> Self { + if accumulated.map.is_empty() { + accumulated.inputs + } else { + Self::Any + } + } + + pub(crate) const fn is_any(self) -> bool { + matches!(self, Self::Any) + } + + pub(crate) const fn is_empty(self) -> bool { + matches!(self, Self::Empty) + } +} diff --git a/src/active_query.rs b/src/active_query.rs index aadcf196..752756c9 100644 --- a/src/active_query.rs +++ b/src/active_query.rs @@ -3,7 +3,7 @@ use rustc_hash::FxHashMap; use super::zalsa_local::{EdgeKind, QueryEdges, QueryOrigin, QueryRevisions}; use crate::tracked_struct::IdentityHash; use crate::{ - accumulator::accumulated_map::AccumulatedMap, + accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues}, durability::Durability, hash::FxIndexSet, key::{DatabaseKeyIndex, DependencyIndex}, @@ -76,10 +76,12 @@ impl ActiveQuery { input: DependencyIndex, durability: Durability, revision: Revision, + accumulated: InputAccumulatedValues, ) { self.input_outputs.insert((EdgeKind::Input, input)); self.durability = self.durability.min(durability); self.changed_at = self.changed_at.max(revision); + self.accumulated.add_input(accumulated); } pub(super) fn add_untracked_read(&mut self, changed_at: Revision) { diff --git a/src/function/accumulated.rs b/src/function/accumulated.rs index 21e017a5..83379a91 100644 --- a/src/function/accumulated.rs +++ b/src/function/accumulated.rs @@ -56,6 +56,11 @@ where // Extend `output` with any values accumulated by `k`. if let Some(accumulated_map) = k.accumulated(db) { accumulated_map.extend_with_accumulated(accumulator.index(), &mut output); + + // Skip over the inputs because we know that the entire sub-graph has no accumulated values + if accumulated_map.inputs().is_empty() { + continue; + } } // Find the inputs of `k` and push them onto the stack. diff --git a/src/function/fetch.rs b/src/function/fetch.rs index 07a08d96..f6d495df 100644 --- a/src/function/fetch.rs +++ b/src/function/fetch.rs @@ -1,6 +1,6 @@ -use crate::{runtime::StampedValue, zalsa::ZalsaDatabase, AsDynDatabase as _, Id}; - use super::{memo::Memo, Configuration, IngredientImpl}; +use crate::accumulator::accumulated_map::InputAccumulatedValues; +use crate::{runtime::StampedValue, zalsa::ZalsaDatabase, AsDynDatabase as _, Id}; impl IngredientImpl where @@ -21,7 +21,12 @@ where self.evict_value_from_memo_for(zalsa, evicted); } - zalsa_local.report_tracked_read(self.database_key_index(id).into(), durability, changed_at); + zalsa_local.report_tracked_read( + self.database_key_index(id).into(), + durability, + changed_at, + InputAccumulatedValues::from_map(&memo.revisions.accumulated), + ); value } diff --git a/src/input.rs b/src/input.rs index fdad27ac..ee08e959 100644 --- a/src/input.rs +++ b/src/input.rs @@ -8,6 +8,7 @@ use input_field::FieldIngredientImpl; use parking_lot::Mutex; use crate::{ + accumulator::accumulated_map::InputAccumulatedValues, cycle::CycleRecoveryStrategy, id::{AsId, FromId}, ingredient::{fmt_index, Ingredient}, @@ -188,6 +189,7 @@ impl IngredientImpl { }, stamp.durability, stamp.changed_at, + InputAccumulatedValues::Empty, ); &value.fields } diff --git a/src/interned.rs b/src/interned.rs index 0c6d32cd..ba58b512 100644 --- a/src/interned.rs +++ b/src/interned.rs @@ -1,3 +1,4 @@ +use crate::accumulator::accumulated_map::InputAccumulatedValues; use crate::durability::Durability; use crate::id::AsId; use crate::ingredient::fmt_index; @@ -133,6 +134,7 @@ where DependencyIndex::for_table(self.ingredient_index), Durability::MAX, self.reset_at, + InputAccumulatedValues::Empty, ); // Optimisation to only get read lock on the map if the data has already diff --git a/src/tracked_struct.rs b/src/tracked_struct.rs index 540bb765..367cc36c 100644 --- a/src/tracked_struct.rs +++ b/src/tracked_struct.rs @@ -4,6 +4,7 @@ use crossbeam::{atomic::AtomicCell, queue::SegQueue}; use tracked_field::FieldIngredientImpl; use crate::{ + accumulator::accumulated_map::InputAccumulatedValues, cycle::CycleRecoveryStrategy, ingredient::{fmt_index, Ingredient, Jar, JarAux}, key::{DatabaseKeyIndex, DependencyIndex}, @@ -561,6 +562,7 @@ where }, data.durability, field_changed_at, + InputAccumulatedValues::Empty, ); unsafe { self.to_self_ref(&data.fields) } diff --git a/src/zalsa_local.rs b/src/zalsa_local.rs index 9076ffa9..95671361 100644 --- a/src/zalsa_local.rs +++ b/src/zalsa_local.rs @@ -1,7 +1,7 @@ use rustc_hash::FxHashMap; use tracing::debug; -use crate::accumulator::accumulated_map::AccumulatedMap; +use crate::accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues}; use crate::active_query::ActiveQuery; use crate::durability::Durability; use crate::key::DatabaseKeyIndex; @@ -170,6 +170,7 @@ impl ZalsaLocal { input: DependencyIndex, durability: Durability, changed_at: Revision, + accumulated: InputAccumulatedValues, ) { debug!( "report_tracked_read(input={:?}, durability={:?}, changed_at={:?})", @@ -177,7 +178,7 @@ impl ZalsaLocal { ); self.with_query_stack(|stack| { if let Some(top_query) = stack.last_mut() { - top_query.add_read(input, durability, changed_at); + top_query.add_read(input, durability, changed_at, accumulated); // We are a cycle participant: //