Skip to content

Commit

Permalink
Removed earlier attempt at making an action plan
Browse files Browse the repository at this point in the history
  • Loading branch information
josephg committed Nov 29, 2023
1 parent 630e490 commit f4a431f
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 223 deletions.
14 changes: 1 addition & 13 deletions src/listmerge/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,19 +664,7 @@ impl<'a> TransformedOpsIter2<'a> {
pub(crate) fn new(subgraph: &'a Graph, aa: &'a AgentAssignment, op_ctx: &'a ListOperationCtx,
ops: &'a RleVec<KVPair<ListOpMetrics>>,
from_frontier: &[LV], merge_frontier: &[LV]) -> Self {
// let (plan, common) = subgraph.make_m1_plan(from_frontier, merge_frontier);
let (plan, common) = subgraph.make_m1_plan_2(Some(ops), from_frontier, merge_frontier);
// let (plan, common) = subgraph.make_m1_plan_2(None, from_frontier, merge_frontier);
// println!("=====\n");
// plan.dbg_print();
// println!();
// println!("common {:?} from {:?} + {:?}", common.as_ref(), from_frontier, merge_frontier);
// plan.dbg_check(common.as_ref(), from_frontier, merge_frontier, subgraph);
// println!("=====\n");


// println!("From: {:?} / merge: {:?} common {:?}", from_frontier, merge_frontier, common);
// plan.dbg_print();
let (plan, common) = subgraph.make_m1_plan(Some(ops), from_frontier, merge_frontier);

Self {
subgraph,
Expand Down
2 changes: 1 addition & 1 deletion src/listmerge/txn_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ mod test {

// -----

let (plan, _) = cg.graph.make_m1_plan(&[], cg.version.as_ref());
let (plan, _) = cg.graph.make_m1_plan(Some(&o.operations), &[], cg.version.as_ref());

let mut cost_estimate = 0;
let mut clears = 0;
Expand Down
216 changes: 7 additions & 209 deletions src/listmerge2/merge1plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ impl ConflictSubgraph<M1EntryState> {
}



// This function does a BFS through the graph, setting the state appropriately.
// fn prepare(&mut self) -> SubgraphChildren {
// This function does a BFS through the graph, setting critical_path.
fn prepare(&mut self) {
// if self.0.is_empty() { return SubgraphChildren(vec![]); }
if self.entries.is_empty() { return; }
Expand Down Expand Up @@ -278,7 +276,7 @@ impl ConflictSubgraph<M1EntryState> {
}
}

pub(super) fn m1_plan_2(&mut self, metrics: Option<&Metrics>) -> M1Plan {
pub(super) fn make_m1_plan(&mut self, metrics: Option<&Metrics>) -> M1Plan {
let mut actions = vec![];
if self.entries.is_empty() {
return M1Plan(actions);
Expand Down Expand Up @@ -484,158 +482,20 @@ impl ConflictSubgraph<M1EntryState> {

}


M1Plan(actions)
}


pub(super) fn make_m1_plan(&mut self) -> M1Plan {
let mut actions = vec![];

// The flag for b_root will only be OnlyB if we're adding something to the graph.
// if self.entries.is_empty() || self.entries[self.b_root].flag != DiffFlag::OnlyB {
if self.entries.is_empty() {
return M1Plan(actions);
}
self.prepare();

let mut nonempty_spans_remaining = self.entries.iter()
.filter(|e| !e.span.is_empty())
.count();

let mut last_processed_after: bool = false;
let mut last_processed_idx: usize = self.entries.len() - 1; // Might be cleaner to start this at None or something.

// Basically, process a_root then b_root.
let mut current_idx = self.a_root;
let mut stack: Vec<usize> = vec![];

let mut done_b = false;

let mut dirty = false;

'outer: loop {
// println!("{current_idx} / {:?}", stack);

// Borrowing immutably to please the borrow checker.
let e = &self.entries[current_idx];

assert_eq!(e.state.visited, false);

// There's two things we could do here:
// 1. Go up to one of our parents
// 2. Visit this item and go down.

let parents_len = e.parents.len();
// Go to the next unvisited parent.
let mut e_next = e.state.next;
while e_next < parents_len {
let p = e.parents[e_next];
if self.entries[p].state.visited { // But it might have already been visited.
// g[current_idx].state.next += 1;
e_next += 1;
} else {
// Go up and process this child.
self.entries[current_idx].state.next = e_next + 1;
stack.push(current_idx);
current_idx = p;
// println!("Bumping to parent {current_idx}");
continue 'outer;
}
}

// Ok, process this element.
let e = &mut self.entries[current_idx];
e.state.next = e_next;
// debug_assert_eq!(e.state.next, e.parents.len());
// println!("Processing {current_idx} {:?}", e.span);
e.state.visited = true;
let e = &self.entries[current_idx];

// Process this span.
if !e.span.is_empty() {
if e.state.critical_path {
if dirty {
actions.push(M1PlanAction::Clear);
dirty = false;
}
actions.push_rle(M1PlanAction::FF(e.span));
} else {
// Note we only advance & retreat if the item is not on the critical path.
// If we're on the critical path, the clear operation will flush everything
// anyway.
let mut advances: SmallVec<[DTRange; 2]> = smallvec![];
let mut retreats: SmallVec<[DTRange; 2]> = smallvec![];
self.diff_trace(last_processed_idx, last_processed_after, current_idx, |idx, flag| {
let list = match flag {
DiffFlag::OnlyA => &mut retreats,
DiffFlag::OnlyB => &mut advances,
DiffFlag::Shared => { return; }
};
let span = self.entries[idx].span;
if !span.is_empty() {
list.push(span);
}
});

if !retreats.is_empty() {
actions.extend_rle(retreats.into_iter().map(M1PlanAction::Retreat));
}
if !advances.is_empty() {
// .rev() here because diff visits everything in reverse order.
actions.extend_rle(advances.into_iter().rev().map(M1PlanAction::Advance));
}

dirty = true;
actions.push_rle(M1PlanAction::Apply(e.span));
}

// We can stop as soon as we've processed all the spans.
nonempty_spans_remaining -= 1;
if nonempty_spans_remaining == 0 { break 'outer; } // break;

last_processed_after = true;
last_processed_idx = current_idx;
}

// Then go down again.
if let Some(next_idx) = stack.pop() {
current_idx = next_idx;
} else if !done_b {
// println!("DOING B");
current_idx = self.b_root;
actions.push(M1PlanAction::BeginOutput);
done_b = true;
} else {
panic!("Should have stopped");
// break;
}
}

M1Plan(actions)
}
}

impl Graph {
pub(crate) fn make_m1_plan(&self, a: &[LV], b: &[LV]) -> (M1Plan, Frontier) {
if self.frontier_contains_frontier(a, b) {
// Nothing to merge. Do nothing.
return (M1Plan(vec![]), a.into());
}

let mut sg = self.make_conflict_graph_between(a, b);
(sg.make_m1_plan(), sg.base_version)
}

pub(crate) fn make_m1_plan_2(&self, metrics: Option<&Metrics>, a: &[LV], b: &[LV]) -> (M1Plan, Frontier) {
pub(crate) fn make_m1_plan(&self, metrics: Option<&Metrics>, a: &[LV], b: &[LV]) -> (M1Plan, Frontier) {
if self.frontier_contains_frontier(a, b) {
// Nothing to merge. Do nothing.
return (M1Plan(vec![]), a.into());
}

let mut sg = self.make_conflict_graph_between(a, b);
// sg.dbg_print();
(sg.m1_plan_2(metrics), sg.base_version)
(sg.make_m1_plan(metrics), sg.base_version)
}
}

Expand Down Expand Up @@ -802,13 +662,7 @@ mod test {
// g.dbg_print();
g.dbg_check();

g.prepare();
let critical_path: Vec<_> = g.entries.iter()
.map(|e| e.state.critical_path)
.collect();
assert_eq!(&critical_path, &[true, false, false, true, true]);

let plan = g.make_m1_plan();
let plan = g.make_m1_plan(None);
// dbg!(&plan);
plan.dbg_check(g.base_version.as_ref(), &[], &[1, 2], &graph);
}
Expand All @@ -825,71 +679,15 @@ mod test {

let mut g = graph.make_conflict_graph_between(&[], &[3]);
// g.dbg_print();

g.dbg_check();

g.prepare();
let critical_path: Vec<_> = g.entries.iter()
.map(|e| e.state.critical_path)
.collect();

// dbg!(critical_path);
assert_eq!(&critical_path, &[true, true, false, false, true, true]);

let plan = g.make_m1_plan();
let plan = g.make_m1_plan(None);
// plan.dbg_print();
plan.dbg_check(g.base_version.as_ref(), &[], &[3], &graph);
}

#[test]
fn test_simple_graph_2_2() {
// Same as above, but this time with an extra entry after the concurrent zone.
let graph = Graph::from_simple_items(&[
GraphEntrySimple { span: 0.into(), parents: Frontier::root() },
GraphEntrySimple { span: 1.into(), parents: Frontier::new_1(0) },
GraphEntrySimple { span: (2..4).into(), parents: Frontier::new_1(0) },
GraphEntrySimple { span: 4.into(), parents: Frontier::from_sorted(&[1, 3]) },
]);

// let mut g = graph.make_conflict_graph_between(&[], &[3]);
let mut g = graph.make_conflict_graph_between(&[1], &[4]);
let plan = g.m1_plan_2(None);
plan.dbg_print();
}

#[test]
fn fuzz_m1_plans() {
with_random_cgs(3232, (100, 10), |(_i, _k), cg, frontiers| {
// Iterate through the frontiers, and [root -> cg.version].
for (_j, fs) in std::iter::once([Frontier::root(), cg.version.clone()].as_slice())
.chain(frontiers.windows(2))
.enumerate()
{
// println!("{_i} {_k} {_j}");

let (a, b) = (fs[0].as_ref(), fs[1].as_ref());

// println!("\n\n");
// dbg!(&cg.graph);
// println!("f: {:?} + {:?}", a, b);

// Alternatively:
// let plan = cg.graph.make_m1_plan(a, b);
let mut subgraph = cg.graph.make_conflict_graph_between(a, b);
subgraph.dbg_check();
subgraph.dbg_check_conflicting(&cg.graph, a, b);

let plan = subgraph.make_m1_plan();
// subgraph.dbg_print();
// plan.dbg_print();
// dbg!(&plan);
plan.dbg_check(subgraph.base_version.as_ref(), a, b, &cg.graph);
}
});
}

#[test]
fn fuzz_m1_plans_2() {
with_random_cgs(3232, (100, 10), |(_i, _k), cg, frontiers| {
// with_random_cgs(2231, (100, 3), |(_i, _k), cg, frontiers| {
// Iterate through the frontiers, and [root -> cg.version].
Expand Down Expand Up @@ -919,7 +717,7 @@ mod test {
subgraph.dbg_check_conflicting(&cg.graph, a, b);

// subgraph.dbg_print();
let plan = subgraph.m1_plan_2(None);
let plan = subgraph.make_m1_plan(None);
// plan.dbg_print();
// dbg!(&plan);
plan.dbg_check(subgraph.base_version.as_ref(), a, b, &cg.graph);
Expand Down

0 comments on commit f4a431f

Please sign in to comment.