diff --git a/src/gj.rs b/src/gj.rs index bdc69f63..8a7b38e6 100644 --- a/src/gj.rs +++ b/src/gj.rs @@ -403,6 +403,8 @@ impl EGraph { self.make_trie_access_for_column(atom, column, timestamp_range) } + // Returns `None` when no program is needed, + // for example when there is nothing in one of the tables. fn compile_program( &self, query: &CompiledQuery, @@ -620,6 +622,84 @@ impl EGraph { } } + fn gj_for_atom( + &self, + // for debugging, the atom seminaive is focusing on + atom_i: Option, + timestamp_ranges: &[Range], + cq: &CompiledQuery, + mut f: F, + ) where + F: FnMut(&[Value]) -> Result, + { + // do the gj + if let Some((mut ctx, program, cols)) = Context::new(self, cq, timestamp_ranges) { + let start = Instant::now(); + let atom_info = if let Some(atom_i) = atom_i { + let atom = &cq.query.atoms[atom_i]; + format!("New atom: {atom}") + } else { + "Seminaive disabled".to_string() + }; + log::debug!( + "Query:\n{q}\n{atom_info}\nTuple: {tuple}\nJoin order: {order}\nProgram\n{program}", + q = cq.query, + order = ListDisplay(&ctx.join_var_ordering, " "), + tuple = ListDisplay(cq.vars.keys(), " "), + ); + let mut tries = Vec::with_capacity(cq.query.atoms.len()); + for ((atom, ts), col) in cq + .query + .atoms + .iter() + .zip(timestamp_ranges.iter()) + .zip(cols.iter()) + { + // tries.push(LazyTrie::default()); + if let Some(target) = col { + if let Some(col) = self.functions[&atom.head].column_index(*target, ts) { + tries.push(LazyTrie::from_column_index(col)) + } else { + tries.push(LazyTrie::default()); + } + } else { + tries.push(LazyTrie::default()); + } + } + let mut trie_refs = tries.iter().collect::>(); + let mut meausrements = HashMap::>::default(); + let stages = InputSizes { + stage_sizes: &mut meausrements, + cur_stage: 0, + }; + ctx.eval(&mut trie_refs, &program.0, stages, &mut f) + .unwrap_or(()); + let mut sums = Vec::from_iter( + meausrements + .iter() + .map(|(x, y)| (*x, y.iter().copied().sum::())), + ); + sums.sort_by_key(|(i, _sum)| *i); + if log_enabled!(log::Level::Debug) { + for (i, sum) in sums { + log::debug!("stage {i} total cost {sum}"); + } + } + let duration = start.elapsed(); + log::debug!("Matched {} times (took {:?})", ctx.matches, duration,); + let iteration = self + .ruleset_iteration + .get::(&"".into()) + .unwrap_or(&0); + if duration.as_millis() > 1000 { + log::warn!( + "Query took a long time at iter {iteration} : {:?}", + duration + ); + } + } + } + pub(crate) fn run_query(&self, cq: &CompiledQuery, timestamp: u32, mut f: F) where F: FnMut(&[Value]) -> Result, @@ -642,82 +722,16 @@ impl EGraph { let do_seminaive = self.seminaive && !global_updated; // for the later atoms, we consider everything let mut timestamp_ranges = vec![0..u32::MAX; cq.query.atoms.len()]; - for (atom_i, atom) in cq.query.atoms.iter().enumerate() { - // this time, we only consider "new stuff" for this atom - if do_seminaive { + if do_seminaive { + for (atom_i, _atom) in cq.query.atoms.iter().enumerate() { timestamp_ranges[atom_i] = timestamp..u32::MAX; + self.gj_for_atom(Some(atom_i), ×tamp_ranges, cq, &mut f); + // now we can fix this atom to be "old stuff" only + // range is half-open; timestamp is excluded + timestamp_ranges[atom_i] = 0..timestamp; } - - // do the gj - - if let Some((mut ctx, program, cols)) = Context::new(self, cq, ×tamp_ranges) { - let start = Instant::now(); - log::debug!( - "Query:\n{q}\nNew atom: {atom}\nTuple: {tuple}\nJoin order: {order}\nProgram\n{program}", - q = cq.query, - order = ListDisplay(&ctx.join_var_ordering, " "), - tuple = ListDisplay(cq.vars.keys(), " "), - ); - let mut tries = Vec::with_capacity(cq.query.atoms.len()); - for ((atom, ts), col) in cq - .query - .atoms - .iter() - .zip(timestamp_ranges.iter()) - .zip(cols.iter()) - { - // tries.push(LazyTrie::default()); - if let Some(target) = col { - if let Some(col) = self.functions[&atom.head].column_index(*target, ts) - { - tries.push(LazyTrie::from_column_index(col)) - } else { - tries.push(LazyTrie::default()); - } - } else { - tries.push(LazyTrie::default()); - } - } - let mut trie_refs = tries.iter().collect::>(); - let mut meausrements = HashMap::>::default(); - let stages = InputSizes { - stage_sizes: &mut meausrements, - cur_stage: 0, - }; - ctx.eval(&mut trie_refs, &program.0, stages, &mut f) - .unwrap_or(()); - let mut sums = Vec::from_iter( - meausrements - .iter() - .map(|(x, y)| (*x, y.iter().copied().sum::())), - ); - sums.sort_by_key(|(i, _sum)| *i); - if log_enabled!(log::Level::Debug) { - for (i, sum) in sums { - log::debug!("stage {i} total cost {sum}"); - } - } - let duration = start.elapsed(); - log::debug!("Matched {} times (took {:?})", ctx.matches, duration,); - let iteration = self - .ruleset_iteration - .get::(&"".into()) - .unwrap_or(&0); - if duration.as_millis() > 1000 { - log::warn!( - "Query took a long time at iter {iteration} : {:?}", - duration - ); - } - } - - if !do_seminaive { - break; - } - - // now we can fix this atom to be "old stuff" only - // range is half-open; timestamp is excluded - timestamp_ranges[atom_i] = 0..timestamp; + } else { + self.gj_for_atom(None, ×tamp_ranges, cq, &mut f); } } else if let Some((mut ctx, program, _)) = Context::new(self, cq, &[]) { let mut meausrements = HashMap::>::default();