From bc1d1025da4eaa7a3d65e28ae316c539c352ecde Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 6 May 2024 09:40:24 -0400 Subject: [PATCH] Consolidator on batcher Signed-off-by: Moritz Hoffmann --- src/consolidation.rs | 97 ++++++++++++++++++- src/trace/implementations/merge_batcher.rs | 8 +- .../implementations/merge_batcher_col.rs | 4 +- 3 files changed, 102 insertions(+), 7 deletions(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index 086c86a9c..6cc00c6df 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -211,6 +211,61 @@ where } } +/// Behavior to sort containers. +pub trait ContainerSorter { + /// Sort `container`, possibly replacing the contents by a different object. + fn sort(&mut self, container: &mut C); +} + +/// A generic container sorter for containers where the item implements [`ConsolidateLayout`]. +pub struct ExternalContainerSorter { + /// Storage to permute item. + permutation: Vec>, + /// Empty container to write results at. + empty: C, +} + +impl ContainerSorter for ExternalContainerSorter +where + for<'a> C: Container + PushInto>, + for<'a> C::Item<'a>: ConsolidateLayout, +{ + fn sort(&mut self, container: &mut C) { + // SAFETY: `Permutation` is empty, types are equal but have a different lifetime + let mut permutation: Vec> = unsafe { std::mem::transmute::>, Vec>>(std::mem::take(&mut self.permutation)) }; + + permutation.extend(container.drain()); + permutation.sort_by(|a, b| a.key().cmp(&b.key())); + + for item in permutation.drain(..) { + self.empty.push(item); + } + + // SAFETY: `Permutation` is empty, types are equal but have a different lifetime + self.permutation = unsafe { std::mem::transmute::>, Vec>>(permutation) }; + std::mem::swap(container, &mut self.empty); + self.empty.clear(); + } +} + +/// Sort containers in-place, with specific implementations. +#[derive(Default, Debug)] +pub struct InPlaceSorter(); + +impl ContainerSorter> for InPlaceSorter { + #[inline] + fn sort(&mut self, container: &mut Vec<(T, R)>) { + container.sort_by(|(a, _), (b, _)| a.cmp(b)); + } +} + +impl ContainerSorter> for InPlaceSorter { + #[inline] + fn sort(&mut self, container: &mut Vec<(D, T, R)>) { + container.sort_by(|(d1, t1, _), (d2, t2, _)| (d1, t1).cmp(&(d2, t2))); + } +} + /// Layout of data to be consolidated. Generic over containers to enable `push`. pub trait ConsolidateLayout { /// Key to consolidate and sort. @@ -257,11 +312,43 @@ impl ConsolidateLayout> for (D, R) { } } -trait ConsolidateContainer{ - fn consolidate_container(container: &mut Self, target: &mut Self); +impl ConsolidateLayout> for (D, T, R) { + type Key<'a> = (&'a D, &'a T) where Self: 'a; + type DiffOwned = R; + + #[inline] + fn key(&self) -> Self::Key<'_> { + (&self.0, &self.1) + } + + #[inline] + fn diff(&self) -> Self::DiffOwned { + self.2.clone() + } + + #[inline] + fn diff_plus_equals(&self, target: &mut Self::DiffOwned) { + target.plus_equals(&self.2); + } + + #[inline] + fn push(self, diff: Self::DiffOwned, target: &mut Vec<(D, T, R)>) { + target.push((self.0, self.1, diff)); + } +} + +/// Behavior for copying consolidation. +pub trait ConsolidateContainer { + /// Consolidate the contents of `container` and write the result to `target`. + fn consolidate_container(container: &mut C, target: &mut C); +} + +/// Container consolidator that requires the container's item to implement [`ConsolidateLayout`]. +#[derive(Default, Debug)] +pub struct ContainerConsolidator{ } -impl ConsolidateContainer for C +impl ConsolidateContainer for ContainerConsolidator where C: Container, for<'a> C::Item<'a>: ConsolidateLayout, @@ -416,7 +503,7 @@ mod tests { let mut data = vec![(1,1), (2, 1), (1, -1)]; let mut target = Vec::default(); data.sort(); - ConsolidateContainer::consolidate_container(&mut data, &mut target); + ContainerConsolidator::consolidate_container(&mut data, &mut target); assert_eq!(target, [(2,1)]); } @@ -444,7 +531,7 @@ mod tests { data2.extend((0..LEN).map(|i| (i/4, -2isize + ((i % 4) as isize)))); data.sort_by(|x,y| x.0.cmp(&y.0)); let start = std::time::Instant::now(); - ConsolidateContainer::consolidate_container(&mut data, &mut target); + ContainerConsolidator::consolidate_container(&mut data, &mut target); duration += start.elapsed(); consolidate(&mut data2); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index bb13cf650..91703e686 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -9,7 +9,7 @@ use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; use timely::{Container, PartialOrder}; -use crate::consolidation::consolidate_updates; +use crate::consolidation::{consolidate_updates, ConsolidateContainer, ContainerConsolidator, ContainerSorter, InPlaceSorter}; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder}; @@ -189,6 +189,10 @@ impl Drop for MergeBatcher { pub trait Merger: Default { /// The type of update containers received from inputs. type Input; + /// TODO + type Sorter: ContainerSorter; + /// TODO + type Consolidator: ConsolidateContainer; /// The internal representation of chunks of data. type Chunk: Container; /// The output type @@ -283,6 +287,8 @@ where type Input = Vec<((K, V), T, R)>; type Chunk = Vec<((K, V), T, R)>; type Output = Vec<((K, V), T, R)>; + type Sorter = InPlaceSorter; + type Consolidator = ContainerConsolidator; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 265f2e649..b286ae4e0 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -1,6 +1,6 @@ //! A general purpose `Batcher` implementation based on radix sort for TimelyStack. -use crate::consolidation::consolidate_updates; +use crate::consolidation::{consolidate_updates, ContainerConsolidator, InPlaceSorter}; use std::cmp::Ordering; use timely::communication::message::RefOrMut; use timely::container::columnation::{Columnation, TimelyStack}; @@ -68,6 +68,8 @@ where type Input = Vec<((K, V), T, R)>; type Chunk = TimelyStack<((K, V), T, R)>; type Output = TimelyStack<((K, V), T, R)>; + type Sorter = InPlaceSorter; + type Consolidator = ContainerConsolidator; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity