Skip to content

Commit

Permalink
Skip over queries without accumulated values
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaReiser committed Dec 3, 2024
1 parent 297dd2b commit 76079d2
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 6 deletions.
52 changes: 52 additions & 0 deletions src/accumulator/accumulated_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use super::{accumulated::Accumulated, Accumulator, AnyAccumulated};
#[derive(Default, Debug)]
pub struct AccumulatedMap {
map: FxHashMap<IngredientIndex, Box<dyn AnyAccumulated>>,

/// [`InputAccumulatedValues::Empty`] if any input read during the query's execution
/// has any direct or indirect accumulated values.
inputs: InputAccumulatedValues,
}

impl AccumulatedMap {
Expand All @@ -17,6 +21,21 @@ impl AccumulatedMap {
.accumulate(value);
}

/// Adds the accumulated state of an input to this accumulated map.
pub(crate) fn add_input(&mut self, input: InputAccumulatedValues) {
if input.is_any() {
self.inputs = InputAccumulatedValues::Any;
}
}

/// Returns whether an input of the associated query has any accumulated values.
///
/// Note: Use [`InputAccumulatedValues::from_map`] to check if the associated query itself
/// or any of its inputs has accumulated values.
pub(crate) fn inputs(&self) -> InputAccumulatedValues {
self.inputs
}

pub fn extend_with_accumulated<A: Accumulator>(
&self,
index: IngredientIndex,
Expand All @@ -41,6 +60,39 @@ impl Clone for AccumulatedMap {
.iter()
.map(|(&key, value)| (key, value.cloned()))
.collect(),
inputs: self.inputs,
}
}
}

/// Tracks whether any input read during a query's execution has any accumulated values.
///
/// Knowning whether any input has accumulated values makes aggregating the accumulated values
/// cheaper because we can skip over entire subtrees.
#[derive(Copy, Clone, Debug, Default)]
pub(crate) enum InputAccumulatedValues {
/// The query nor any of its inputs have any accumulated values.
#[default]
Empty,

/// The query or any of its inputs have at least one accumulated value.
Any,
}

impl InputAccumulatedValues {
pub(crate) fn from_map(accumulated: &AccumulatedMap) -> Self {
if accumulated.map.is_empty() {
accumulated.inputs
} else {
Self::Any
}
}

pub(crate) const fn is_any(self) -> bool {
matches!(self, Self::Any)
}

pub(crate) const fn is_empty(self) -> bool {
matches!(self, Self::Empty)
}
}
4 changes: 3 additions & 1 deletion src/active_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use rustc_hash::FxHashMap;
use super::zalsa_local::{EdgeKind, QueryEdges, QueryOrigin, QueryRevisions};
use crate::tracked_struct::IdentityHash;
use crate::{
accumulator::accumulated_map::AccumulatedMap,
accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues},
durability::Durability,
hash::FxIndexSet,
key::{DatabaseKeyIndex, DependencyIndex},
Expand Down Expand Up @@ -76,10 +76,12 @@ impl ActiveQuery {
input: DependencyIndex,
durability: Durability,
revision: Revision,
accumulated: InputAccumulatedValues,
) {
self.input_outputs.insert((EdgeKind::Input, input));
self.durability = self.durability.min(durability);
self.changed_at = self.changed_at.max(revision);
self.accumulated.add_input(accumulated);
}

pub(super) fn add_untracked_read(&mut self, changed_at: Revision) {
Expand Down
5 changes: 5 additions & 0 deletions src/function/accumulated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ where
// Extend `output` with any values accumulated by `k`.
if let Some(accumulated_map) = k.accumulated(db) {
accumulated_map.extend_with_accumulated(accumulator.index(), &mut output);

// Skip over the inputs because we know that the entire sub-graph has no accumulated values
if accumulated_map.inputs().is_empty() {
continue;
}
}

// Find the inputs of `k` and push them onto the stack.
Expand Down
11 changes: 8 additions & 3 deletions src/function/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{runtime::StampedValue, zalsa::ZalsaDatabase, AsDynDatabase as _, Id};

use super::{memo::Memo, Configuration, IngredientImpl};
use crate::accumulator::accumulated_map::InputAccumulatedValues;
use crate::{runtime::StampedValue, zalsa::ZalsaDatabase, AsDynDatabase as _, Id};

impl<C> IngredientImpl<C>
where
Expand All @@ -21,7 +21,12 @@ where
self.evict_value_from_memo_for(zalsa, evicted);
}

zalsa_local.report_tracked_read(self.database_key_index(id).into(), durability, changed_at);
zalsa_local.report_tracked_read(
self.database_key_index(id).into(),
durability,
changed_at,
InputAccumulatedValues::from_map(&memo.revisions.accumulated),
);

value
}
Expand Down
2 changes: 2 additions & 0 deletions src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use input_field::FieldIngredientImpl;
use parking_lot::Mutex;

use crate::{
accumulator::accumulated_map::InputAccumulatedValues,
cycle::CycleRecoveryStrategy,
id::{AsId, FromId},
ingredient::{fmt_index, Ingredient},
Expand Down Expand Up @@ -188,6 +189,7 @@ impl<C: Configuration> IngredientImpl<C> {
},
stamp.durability,
stamp.changed_at,
InputAccumulatedValues::Empty,
);
&value.fields
}
Expand Down
2 changes: 2 additions & 0 deletions src/interned.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::accumulator::accumulated_map::InputAccumulatedValues;
use crate::durability::Durability;
use crate::id::AsId;
use crate::ingredient::fmt_index;
Expand Down Expand Up @@ -133,6 +134,7 @@ where
DependencyIndex::for_table(self.ingredient_index),
Durability::MAX,
self.reset_at,
InputAccumulatedValues::Empty,
);

// Optimisation to only get read lock on the map if the data has already
Expand Down
2 changes: 2 additions & 0 deletions src/tracked_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crossbeam::{atomic::AtomicCell, queue::SegQueue};
use tracked_field::FieldIngredientImpl;

use crate::{
accumulator::accumulated_map::InputAccumulatedValues,
cycle::CycleRecoveryStrategy,
ingredient::{fmt_index, Ingredient, Jar, JarAux},
key::{DatabaseKeyIndex, DependencyIndex},
Expand Down Expand Up @@ -561,6 +562,7 @@ where
},
data.durability,
field_changed_at,
InputAccumulatedValues::Empty,
);

unsafe { self.to_self_ref(&data.fields) }
Expand Down
5 changes: 3 additions & 2 deletions src/zalsa_local.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use rustc_hash::FxHashMap;
use tracing::debug;

use crate::accumulator::accumulated_map::AccumulatedMap;
use crate::accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues};
use crate::active_query::ActiveQuery;
use crate::durability::Durability;
use crate::key::DatabaseKeyIndex;
Expand Down Expand Up @@ -170,14 +170,15 @@ impl ZalsaLocal {
input: DependencyIndex,
durability: Durability,
changed_at: Revision,
accumulated: InputAccumulatedValues,
) {
debug!(
"report_tracked_read(input={:?}, durability={:?}, changed_at={:?})",
input, durability, changed_at
);
self.with_query_stack(|stack| {
if let Some(top_query) = stack.last_mut() {
top_query.add_read(input, durability, changed_at);
top_query.add_read(input, durability, changed_at, accumulated);

// We are a cycle participant:
//
Expand Down

0 comments on commit 76079d2

Please sign in to comment.