Skip to content

Commit

Permalink
cleanup gj somewhat
Browse files Browse the repository at this point in the history
  • Loading branch information
oflatt committed Sep 8, 2023
1 parent f35fca2 commit ece10b3
Showing 1 changed file with 81 additions and 74 deletions.
155 changes: 81 additions & 74 deletions src/gj.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,78 @@ impl EGraph {
}
}

fn gj_for_atom<F>(
&self,
atom_i: usize,
timestamp_ranges: &Vec<Range<u32>>,
cq: &CompiledQuery,
mut f: F,
) where
F: FnMut(&[Value]) -> Result,
{
let atom = &cq.query.atoms[atom_i];
// do the gj
if let Some((mut ctx, program, cols)) = Context::new(self, cq, timestamp_ranges) {
let start = Instant::now();
log::debug!(
"Query:\n{q}\nNew atom: {atom}\nTuple: {tuple}\nJoin order: {order}\nProgram\n{program}",
q = cq.query,
order = ListDisplay(&ctx.join_var_ordering, " "),
tuple = ListDisplay(cq.vars.keys(), " "),
);
let mut tries = Vec::with_capacity(cq.query.atoms.len());
for ((atom, ts), col) in cq
.query
.atoms
.iter()
.zip(timestamp_ranges.iter())
.zip(cols.iter())
{
// tries.push(LazyTrie::default());
if let Some(target) = col {
if let Some(col) = self.functions[&atom.head].column_index(*target, ts) {
tries.push(LazyTrie::from_column_index(col))
} else {
tries.push(LazyTrie::default());
}
} else {
tries.push(LazyTrie::default());
}
}
let mut trie_refs = tries.iter().collect::<Vec<_>>();
let mut meausrements = HashMap::<usize, Vec<usize>>::default();
let stages = InputSizes {
stage_sizes: &mut meausrements,
cur_stage: 0,
};
ctx.eval(&mut trie_refs, &program.0, stages, &mut f)
.unwrap_or(());
let mut sums = Vec::from_iter(
meausrements
.iter()
.map(|(x, y)| (*x, y.iter().copied().sum::<usize>())),
);
sums.sort_by_key(|(i, _sum)| *i);
if log_enabled!(log::Level::Debug) {
for (i, sum) in sums {
log::debug!("stage {i} total cost {sum}");
}
}
let duration = start.elapsed();
log::debug!("Matched {} times (took {:?})", ctx.matches, duration,);
let iteration = self
.ruleset_iteration
.get::<Symbol>(&"".into())
.unwrap_or(&0);
if duration.as_millis() > 1000 {
log::warn!(
"Query took a long time at iter {iteration} : {:?}",
duration
);
}
}
}

pub(crate) fn run_query<F>(&self, cq: &CompiledQuery, timestamp: u32, mut f: F)
where
F: FnMut(&[Value]) -> Result,
Expand All @@ -642,82 +714,17 @@ impl EGraph {
let do_seminaive = self.seminaive && !global_updated;
// for the later atoms, we consider everything
let mut timestamp_ranges = vec![0..u32::MAX; cq.query.atoms.len()];
for (atom_i, atom) in cq.query.atoms.iter().enumerate() {
// this time, we only consider "new stuff" for this atom
if do_seminaive {
if do_seminaive {
for (atom_i, _atom) in cq.query.atoms.iter().enumerate() {
timestamp_ranges[atom_i] = timestamp..u32::MAX;
self.gj_for_atom(atom_i, &timestamp_ranges, cq, &mut f);
// now we can fix this atom to be "old stuff" only
// range is half-open; timestamp is excluded
timestamp_ranges[atom_i] = 0..timestamp;
}

// do the gj

if let Some((mut ctx, program, cols)) = Context::new(self, cq, &timestamp_ranges) {
let start = Instant::now();
log::debug!(
"Query:\n{q}\nNew atom: {atom}\nTuple: {tuple}\nJoin order: {order}\nProgram\n{program}",
q = cq.query,
order = ListDisplay(&ctx.join_var_ordering, " "),
tuple = ListDisplay(cq.vars.keys(), " "),
);
let mut tries = Vec::with_capacity(cq.query.atoms.len());
for ((atom, ts), col) in cq
.query
.atoms
.iter()
.zip(timestamp_ranges.iter())
.zip(cols.iter())
{
// tries.push(LazyTrie::default());
if let Some(target) = col {
if let Some(col) = self.functions[&atom.head].column_index(*target, ts)
{
tries.push(LazyTrie::from_column_index(col))
} else {
tries.push(LazyTrie::default());
}
} else {
tries.push(LazyTrie::default());
}
}
let mut trie_refs = tries.iter().collect::<Vec<_>>();
let mut meausrements = HashMap::<usize, Vec<usize>>::default();
let stages = InputSizes {
stage_sizes: &mut meausrements,
cur_stage: 0,
};
ctx.eval(&mut trie_refs, &program.0, stages, &mut f)
.unwrap_or(());
let mut sums = Vec::from_iter(
meausrements
.iter()
.map(|(x, y)| (*x, y.iter().copied().sum::<usize>())),
);
sums.sort_by_key(|(i, _sum)| *i);
if log_enabled!(log::Level::Debug) {
for (i, sum) in sums {
log::debug!("stage {i} total cost {sum}");
}
}
let duration = start.elapsed();
log::debug!("Matched {} times (took {:?})", ctx.matches, duration,);
let iteration = self
.ruleset_iteration
.get::<Symbol>(&"".into())
.unwrap_or(&0);
if duration.as_millis() > 1000 {
log::warn!(
"Query took a long time at iter {iteration} : {:?}",
duration
);
}
}

if !do_seminaive {
break;
}

// now we can fix this atom to be "old stuff" only
// range is half-open; timestamp is excluded
timestamp_ranges[atom_i] = 0..timestamp;
} else {
let atom_i = 0;
self.gj_for_atom(atom_i, &timestamp_ranges, cq, &mut f);
}
} else if let Some((mut ctx, program, _)) = Context::new(self, cq, &[]) {
let mut meausrements = HashMap::<usize, Vec<usize>>::default();
Expand Down

0 comments on commit ece10b3

Please sign in to comment.