diff --git a/src/compute/src/render/join/mz_join_core.rs b/src/compute/src/render/join/mz_join_core.rs index 206b5ccb12cca..be27d996a68e3 100644 --- a/src/compute/src/render/join/mz_join_core.rs +++ b/src/compute/src/render/join/mz_join_core.rs @@ -198,10 +198,10 @@ where trace2.cursor_through(acknowledged2.borrow()).unwrap(); let batch1_cursor = batch1.cursor(); todo1.push_back(Deferred::new( - trace2_cursor, - trace2_storage, batch1_cursor, batch1.clone(), + trace2_cursor, + trace2_storage, capability.clone(), )); } @@ -273,27 +273,13 @@ where // which results in unintentionally quadratic processing time (each batch of either // input must scan all batches from the other input). - let mut work_result = |k: &Tr1::Key, - v1: &Tr1::Val, - v2: &Tr2::Val, - t: &G::Timestamp, - r1: &Tr1::R, - r2: &Tr2::R| { - let t = t.clone(); - let r = r1.clone().multiply(r2); - result(k, v1, v2) - .into_iter() - .map(move |d| (d, t.clone(), r.clone())) - }; - // Perform some amount of outstanding work. let mut fuel = 1_000_000; while !todo1.is_empty() && fuel > 0 { - todo1.front_mut().unwrap().work( - output, - |k, v2, v1, t, r2, r1| work_result(k, v1, v2, t, r1, r2), - &mut fuel, - ); + todo1 + .front_mut() + .unwrap() + .work(output, &mut result, &mut fuel); if !todo1.front().unwrap().work_remains() { todo1.pop_front(); } @@ -305,7 +291,7 @@ where todo2 .front_mut() .unwrap() - .work(output, &mut work_result, &mut fuel); + .work(output, &mut result, &mut fuel); if !todo2.front().unwrap().work_remains() { todo2.pop_front(); } @@ -373,10 +359,10 @@ where C1: Cursor, C2: Cursor, { - trace: C1, - trace_storage: C1::Storage, - batch: C2, - batch_storage: C2::Storage, + cursor1: C1, + storage1: C1::Storage, + cursor2: C2, + storage2: C2::Storage, capability: Capability, done: bool, temp: Vec<(D, T, Diff)>, @@ -390,17 +376,17 @@ where D: Data, { fn new( - trace: C1, - trace_storage: C1::Storage, - batch: C2, - batch_storage: C2::Storage, + cursor1: C1, + storage1: C1::Storage, + cursor2: C2, + storage2: C2::Storage, capability: Capability, ) -> Self { Deferred { - trace, - trace_storage, - batch, - batch_storage, + cursor1, + storage1, + cursor2, + storage2, capability, done: false, temp: Vec::new(), @@ -415,46 +401,50 @@ where fn work( &mut self, output: &mut OutputHandle>, - mut logic: L, + mut result: L, fuel: &mut usize, ) where - I: IntoIterator, - L: FnMut(&C1::Key, &C1::Val, &C2::Val, &T, &C1::R, &C2::R) -> I, + I: IntoIterator, + L: FnMut(&C1::Key, &C1::Val, &C2::Val) -> I, { let meet = self.capability.time(); let mut session = output.session(&self.capability); - let trace_storage = &self.trace_storage; - let batch_storage = &self.batch_storage; + let storage1 = &self.storage1; + let storage2 = &self.storage2; - let trace = &mut self.trace; - let batch = &mut self.batch; + let cursor1 = &mut self.cursor1; + let cursor2 = &mut self.cursor2; let temp = &mut self.temp; - while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) { - match trace.key(trace_storage).cmp(batch.key(batch_storage)) { - Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)), - Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)), + while cursor1.key_valid(storage1) && cursor2.key_valid(storage2) { + match cursor1.key(storage1).cmp(cursor2.key(storage2)) { + Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)), + Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)), Ordering::Equal => { assert_eq!(temp.len(), 0); // Populate `temp` with the results, as long as fuel remains. - let key = batch.key(batch_storage); - while let Some(val1) = trace.get_val(trace_storage) { - while let Some(val2) = batch.get_val(batch_storage) { - trace.map_times(trace_storage, |time1, diff1| { + let key = cursor2.key(storage2); + while let Some(val1) = cursor1.get_val(storage1) { + while let Some(val2) = cursor2.get_val(storage2) { + cursor1.map_times(storage1, |time1, diff1| { let time1 = time1.join(meet); - batch.map_times(batch_storage, |time2, diff2| { + cursor2.map_times(storage2, |time2, diff2| { let time = time1.join(time2); - temp.extend(logic(key, val1, val2, &time, diff1, diff2)) + let diff = diff1.multiply(diff2); + let results = result(key, val1, val2) + .into_iter() + .map(|d| (d, time.clone(), diff.clone())); + temp.extend(results); }); }); - batch.step_val(batch_storage); + cursor2.step_val(storage2); } - batch.rewind_vals(batch_storage); - trace.step_val(trace_storage); + cursor1.step_val(storage1); + cursor2.rewind_vals(storage2); // TODO: This consolidation is optional, and it may not be very // helpful. We might try harder to understand whether we @@ -472,8 +462,8 @@ where } } - batch.step_key(batch_storage); - trace.step_key(trace_storage); + cursor1.step_key(storage1); + cursor2.step_key(storage2); } } }