Skip to content

Commit

Permalink
Split ExecutionEngine::execute into smaller functions (#549)
Browse files Browse the repository at this point in the history
note: based on #548

Rule execution is now performed in `ExecutionEngine::step`, which
returns a `Vec` of the changed predicates. De fragmentation of tables is
performed separately in `ExecutionEngine::defrag`.

Also `ExectionEngine::initialize` takes ownership of its `Program`
parameter instead of taking a reference and then cloning it, which is
just bad practice.

Finally the odd `ChaseProgram` parameter, which was handed down to the
tracing functions is removed, by being a bit more conscious about where
and what to clone. Note that this will likely change later, as I'm
cleaning up more of the tracing code.
  • Loading branch information
matzemathics authored Nov 12, 2024
2 parents 781e806 + e0a97d3 commit 8a7b9c7
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 65 deletions.
3 changes: 2 additions & 1 deletion nemo-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ fn run(mut cli: CliApp) -> Result<(), CliError> {
cli.import_directory.clone(),
));

let mut engine: DefaultExecutionEngine = ExecutionEngine::initialize(&program, import_manager)?;
let mut engine: DefaultExecutionEngine =
ExecutionEngine::initialize(program.clone(), import_manager)?;

for (predicate, handler) in engine.exports() {
export_manager.validate(&predicate, &*handler)?;
Expand Down
2 changes: 1 addition & 1 deletion nemo-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl NemoEngine {
fn py_new(program: NemoProgram) -> PyResult<Self> {
TimedCode::instance().reset();
let import_manager = ImportManager::new(ResourceProviders::default());
let engine = ExecutionEngine::initialize(&program.0, import_manager).py_res()?;
let engine = ExecutionEngine::initialize(program.0.clone(), import_manager).py_res()?;
Ok(NemoEngine { program, engine })
}

Expand Down
2 changes: 1 addition & 1 deletion nemo-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl NemoEngine {
};
let import_manager = ImportManager::new(resource_providers);

let engine = ExecutionEngine::initialize(&program.0, import_manager)
let engine = ExecutionEngine::initialize(program.0.clone(), import_manager)
.map_err(WasmOrInternalNemoError::Nemo)
.map_err(NemoError)?;

Expand Down
2 changes: 1 addition & 1 deletion nemo/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn load_string(input: String) -> Result<Engine, Error> {
.translate(&program_ast)
.map_err(|_| Error::ProgramParseError)?;

ExecutionEngine::initialize(&program, ImportManager::new(ResourceProviders::default()))
ExecutionEngine::initialize(program, ImportManager::new(ResourceProviders::default()))
}

/// Executes the reasoning process of the [Engine].
Expand Down
120 changes: 59 additions & 61 deletions nemo/src/execution/execution_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ pub struct ExecutionEngine<RuleSelectionStrategy> {

impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
/// Initialize [ExecutionEngine].
pub fn initialize(program: &Program, input_manager: ImportManager) -> Result<Self, Error> {
let chase_program = ProgramChaseTranslation::new().translate(program.clone());
pub fn initialize(program: Program, input_manager: ImportManager) -> Result<Self, Error> {
let chase_program = ProgramChaseTranslation::new().translate(program);
let analysis = chase_program.analyze();

let mut table_manager = TableManager::new();
Expand Down Expand Up @@ -175,76 +175,77 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
Ok(())
}

/// Executes the program.
pub fn execute(&mut self) -> Result<(), Error> {
TimedCode::instance().sub("Reasoning/Rules").start();
TimedCode::instance().sub("Reasoning/Execution").start();
fn step(&mut self, rule_index: usize, execution: &RuleExecution) -> Result<Vec<Tag>, Error> {
let timing_string = format!("Reasoning/Rules/Rule {rule_index}");

let rule_execution: Vec<RuleExecution> = self
.program
.rules()
.iter()
.zip(self.analysis.rule_analysis.iter())
.map(|(r, a)| RuleExecution::initialize(r, a))
.collect();
TimedCode::instance().sub(&timing_string).start();
log::info!("<<< {0}: APPLYING RULE {rule_index} >>>", self.current_step);

let mut new_derivations: Option<bool> = None;
self.rule_history.push(rule_index);

while let Some(current_rule_index) = self.rule_strategy.next_rule(new_derivations) {
let timing_string = format!("Reasoning/Rules/Rule {current_rule_index}");
let current_info = &mut self.rule_infos[rule_index];

TimedCode::instance().sub(&timing_string).start();
log::info!(
"<<< {0}: APPLYING RULE {current_rule_index} >>>",
self.current_step
);
let updated_predicates =
execution.execute(&mut self.table_manager, current_info, self.current_step)?;

self.rule_history.push(current_rule_index);
current_info.step_last_applied = self.current_step;

let current_info = &mut self.rule_infos[current_rule_index];
let current_execution = &rule_execution[current_rule_index];
let rule_duration = TimedCode::instance().sub(&timing_string).stop();
log::info!("Rule duration: {} ms", rule_duration.as_millis());

let updated_predicates = current_execution.execute(
&mut self.table_manager,
current_info,
self.current_step,
)?;
self.current_step += 1;
Ok(updated_predicates)
}

new_derivations = Some(!updated_predicates.is_empty());
fn defrag(&mut self, updated_predicates: Vec<Tag>) -> Result<(), Error> {
for updated_pred in updated_predicates {
let counter = self
.predicate_fragmentation
.entry(updated_pred.clone())
.or_insert(0);
*counter += 1;

current_info.step_last_applied = self.current_step;
if *counter == MAX_FRAGMENTATION {
let start = if let Some(last_union) = self.predicate_last_union.get(&updated_pred) {
last_union + 1
} else {
0
};

let rule_duration = TimedCode::instance().sub(&timing_string).stop();
log::info!("Rule duration: {} ms", rule_duration.as_millis());
let range = start..(self.current_step + 1);

// We prevent fragmentation by periodically collecting single-step tables into larger ones
for updated_pred in updated_predicates {
let counter = self
.predicate_fragmentation
.entry(updated_pred.clone())
.or_insert(0);
*counter += 1;
self.table_manager.combine_tables(&updated_pred, range)?;

self.predicate_last_union
.insert(updated_pred, self.current_step);

*counter = 0;
}
}

if *counter == MAX_FRAGMENTATION {
let start =
if let Some(last_union) = self.predicate_last_union.get(&updated_pred) {
last_union + 1
} else {
0
};
Ok(())
}

let range = start..(self.current_step + 1);
/// Executes the program.
pub fn execute(&mut self) -> Result<(), Error> {
TimedCode::instance().sub("Reasoning/Rules").start();
TimedCode::instance().sub("Reasoning/Execution").start();

self.table_manager.combine_tables(&updated_pred, range)?;
let rule_execution: Vec<RuleExecution> = self
.program
.rules()
.iter()
.zip(self.analysis.rule_analysis.iter())
.map(|(r, a)| RuleExecution::initialize(r, a))
.collect();

self.predicate_last_union
.insert(updated_pred, self.current_step);
let mut new_derivations: Option<bool> = None;

*counter = 0;
}
}
while let Some(index) = self.rule_strategy.next_rule(new_derivations) {
let updated_predicates = self.step(index, &rule_execution[index])?;
new_derivations = Some(!updated_predicates.is_empty());

self.current_step += 1;
self.defrag(updated_predicates)?;
}

TimedCode::instance().sub("Reasoning/Rules").stop();
Expand Down Expand Up @@ -314,7 +315,6 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {

fn trace_recursive(
&mut self,
program: &ChaseProgram,
trace: &mut ExecutionTrace,
fact: GroundAtom,
) -> Result<TraceFactHandle, TracingError> {
Expand Down Expand Up @@ -346,7 +346,7 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {

// Rule index of the rule that was applied to derive the given fact
let rule_index = self.rule_history[step];
let rule = &program.rules()[rule_index];
let rule = self.program.rules()[rule_index].clone();

// Iterate over all head atoms which could have derived the given fact
for (head_index, head_atom) in rule.head().iter().enumerate() {
Expand Down Expand Up @@ -436,7 +436,7 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {

let next_fact = GroundAtom::new(next_fact_predicate, next_fact_terms);

let next_handle = self.trace_recursive(program, trace, next_fact)?;
let next_handle = self.trace_recursive(trace, next_fact)?;

if trace.status(next_handle).is_success() {
subtraces.push(next_handle);
Expand Down Expand Up @@ -481,14 +481,12 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
facts: Vec<Fact>,
) -> Result<(ExecutionTrace, Vec<TraceFactHandle>), Error> {
let mut trace = ExecutionTrace::new(program);
let chase_program = self.program.clone();

let mut handles = Vec::new();

for fact in facts {
let chase_fact = ProgramChaseTranslation::new().build_fact(&fact);

handles.push(self.trace_recursive(&chase_program, &mut trace, chase_fact)?);
handles.push(self.trace_recursive(&mut trace, chase_fact)?);
}

Ok((trace, handles))
Expand Down

0 comments on commit 8a7b9c7

Please sign in to comment.