diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs new file mode 100644 index 000000000..16000334a --- /dev/null +++ b/src/trace/implementations/merge_batcher_col.rs @@ -0,0 +1,316 @@ +//! A general purpose `Batcher` implementation based on radix sort for TimelyStack. + +use timely::Container; +use timely::communication::message::RefOrMut; +use timely::container::columnation::{Columnation, TimelyStack}; +use timely::progress::frontier::Antichain; + +use ::difference::Semigroup; + +use lattice::Lattice; +use trace::{Batch, Batcher, Builder}; + +/// Creates batches from unordered tuples. +pub struct ColumnatedMergeBatcher + where + B::Key: Ord+Clone+Columnation, + B::Val: Ord+Clone+Columnation, + B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation, + B::R: Semigroup+Columnation, +{ + sorter: MergeSorterColumnation<(B::Key, B::Val), B::Time, B::R>, + lower: Antichain, + frontier: Antichain, + phantom: ::std::marker::PhantomData, +} + +impl Batcher for ColumnatedMergeBatcher +where + B::Key: Ord+Clone+Columnation+'static, + B::Val: Ord+Clone+Columnation+'static, + B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation+'static, + B::R: Semigroup+Columnation+'static, +{ + fn new() -> Self { + ColumnatedMergeBatcher { + sorter: MergeSorterColumnation::new(), + frontier: Antichain::new(), + lower: Antichain::from_elem(::minimum()), + phantom: ::std::marker::PhantomData, + } + } + + #[inline(never)] + fn push_batch(&mut self, batch: RefOrMut>) { + // `batch` is either a shared reference or an owned allocations. + match batch { + RefOrMut::Ref(reference) => { + // This is a moment at which we could capture the allocations backing + // `batch` into a different form of region, rather than just cloning. + // let mut owned: TimelyStack<((B::Key, B::Val), B::Time, B::R)> = self.sorter.empty(); + // owned.clone_from(reference); + self.sorter.push(reference); + }, + RefOrMut::Mut(reference) => { + self.sorter.push(reference); + } + } + } + + // Sealing a batch means finding those updates with times not greater or equal to any time + // in `upper`. All updates must have time greater or equal to the previously used `upper`, + // which we call `lower`, by assumption that after sealing a batcher we receive no more + // updates with times not greater or equal to `upper`. + #[inline(never)] + fn seal(&mut self, upper: Antichain) -> B { + + let mut builder = B::Builder::new(); + + let mut merged = Default::default(); + self.sorter.finish_into(&mut merged); + + let mut kept = Vec::new(); + let mut keep = TimelyStack::default(); + + self.frontier.clear(); + + // TODO: Re-use buffer, rather than dropping. + for buffer in merged.drain(..) { + for datum @ ((key, val), time, diff) in &buffer[..] { + if upper.less_equal(time) { + self.frontier.insert(time.clone()); + if keep.len() == keep.capacity() { + if keep.len() > 0 { + kept.push(keep); + keep = self.sorter.empty(); + } + } + keep.copy(datum); + } + else { + builder.push((key.clone(), val.clone(), time.clone(), diff.clone())); + } + } + // buffer.clear(); + // Recycling buffer. + // self.sorter.push(&mut buffer); + } + + // Finish the kept data. + if keep.len() > 0 { + kept.push(keep); + } + if kept.len() > 0 { + self.sorter.push_list(kept); + } + + // Drain buffers (fast reclaimation). + // TODO : This isn't obviously the best policy, but "safe" wrt footprint. + // In particular, if we are reading serialized input data, we may + // prefer to keep these buffers around to re-fill, if possible. + let mut buffer = Default::default(); + self.sorter.push(&mut buffer); + // We recycle buffers with allocations (capacity, and not zero-sized). + while buffer.capacity() > 0 && std::mem::size_of::<((B::Key,B::Val),B::Time,B::R)>() > 0 { + buffer = Default::default(); + self.sorter.push(&mut buffer); + } + + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + self.lower = upper; + seal + } + + // the frontier of elements remaining after the most recent call to `self.seal`. + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + self.frontier.borrow() + } +} + +pub struct TimelyStackQueue { + list: TimelyStack, + head: usize, +} + +impl TimelyStackQueue { + #[inline] + pub fn new() -> Self { TimelyStackQueue::from(Default::default()) } + #[inline] + pub fn pop(&mut self) -> &T { + self.head += 1; + &self.list[self.head - 1] + } + #[inline] + pub fn peek(&self) -> &T { + &self.list[self.head] + } + #[inline] + pub fn from(list: TimelyStack) -> Self { + TimelyStackQueue { + list, + head: 0, + } + } + #[inline] + pub fn done(mut self) -> TimelyStack { + self.list.clear(); + self.list + } + #[inline] + pub fn len(&self) -> usize { self.list.len() - self.head } + #[inline] + pub fn is_empty(&self) -> bool { self.head == self.list.len() } +} + +pub struct MergeSorterColumnation { + queue: Vec>>, // each power-of-two length list of allocations. + stash: Vec>, +} + +impl MergeSorterColumnation { + + const BUFFER_SIZE_BYTES: usize = 1 << 13; + + fn buffer_size() -> usize { + let size = ::std::mem::size_of::<(D, T, R)>(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } + + #[inline] + pub fn new() -> Self { MergeSorterColumnation { queue: Vec::new(), stash: Vec::new() } } + + #[inline] + pub fn empty(&mut self) -> TimelyStack<(D, T, R)> { + self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) + } + + #[inline] + pub fn push(&mut self, batch: &Vec<(D, T, R)>) { + + if batch.len() > 0 { + let mut batch = batch.clone(); + crate::consolidation::consolidate_updates(&mut batch); + let mut stack = TimelyStack::with_capacity(batch.len()); + for tuple in batch.iter() { + stack.copy(tuple); + } + self.queue.push(vec![stack]); + while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { + let list1 = self.queue.pop().unwrap(); + let list2 = self.queue.pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue.push(merged); + } + } + } + + // This is awkward, because it isn't a power-of-two length any more, and we don't want + // to break it down to be so. + pub fn push_list(&mut self, list: Vec>) { + while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { + let list1 = self.queue.pop().unwrap(); + let list2 = self.queue.pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue.push(merged); + } + self.queue.push(list); + } + + #[inline(never)] + pub fn finish_into(&mut self, target: &mut Vec>) { + while self.queue.len() > 1 { + let list1 = self.queue.pop().unwrap(); + let list2 = self.queue.pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue.push(merged); + } + + if let Some(mut last) = self.queue.pop() { + ::std::mem::swap(&mut last, target); + } + } + + // merges two sorted input lists into one sorted output list. + #[inline(never)] + fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { + + use std::cmp::Ordering; + + // TODO: `list1` and `list2` get dropped; would be better to reuse? + let mut output = Vec::with_capacity(list1.len() + list2.len()); + let mut result = self.empty(); + + let mut list1 = list1.into_iter().peekable(); + let mut list2 = list2.into_iter().peekable(); + + let mut head1 = if list1.peek().is_some() { TimelyStackQueue::from(list1.next().unwrap()) } else { TimelyStackQueue::new() }; + let mut head2 = if list2.peek().is_some() { TimelyStackQueue::from(list2.next().unwrap()) } else { TimelyStackQueue::new() }; + + // while we have valid data in each input, merge. + while !head1.is_empty() && !head2.is_empty() { + + while (result.capacity() - result.len()) > 0 && head1.len() > 0 && head2.len() > 0 { + + let cmp = { + let x = head1.peek(); + let y = head2.peek(); + (&x.0, &x.1).cmp(&(&y.0, &y.1)) + }; + match cmp { + Ordering::Less => { result.copy(head1.pop()); } + Ordering::Greater => { result.copy(head2.pop()); } + Ordering::Equal => { + let (data1, time1, diff1) = head1.pop(); + let (_data2, _time2, diff2) = head2.pop(); + let mut diff1 = diff1.clone(); + diff1.plus_equals(&diff2); + if !diff1.is_zero() { + result.copy_destructured(data1, time1, &diff1); + } + } + } + } + + if result.capacity() == result.len() { + output.push(result); + result = self.empty(); + } + + if head1.is_empty() { + let done1 = head1.done(); + if done1.capacity() == Self::buffer_size() { self.stash.push(done1); } + head1 = if list1.peek().is_some() { TimelyStackQueue::from(list1.next().unwrap()) } else { TimelyStackQueue::new() }; + } + if head2.is_empty() { + let done2 = head2.done(); + if done2.capacity() == Self::buffer_size() { self.stash.push(done2); } + head2 = if list2.peek().is_some() { TimelyStackQueue::from(list2.next().unwrap()) } else { TimelyStackQueue::new() }; + } + } + + if result.len() > 0 { output.push(result); } + else if result.capacity() > 0 { self.stash.push(result); } + + if !head1.is_empty() { + let mut result = self.empty(); + for _ in 0 .. head1.len() { result.copy(head1.pop()); } + output.push(result); + } + output.extend(list1); + + if !head2.is_empty() { + let mut result = self.empty(); + for _ in 0 .. head2.len() { result.copy(head2.pop()); } + output.push(result); + } + output.extend(list2); + + output + } +} diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 4eee120de..1fd7e80eb 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -40,7 +40,8 @@ pub mod spine_fueled; -mod merge_batcher; +pub(crate) mod merge_batcher; +pub(crate) mod merge_batcher_col; pub use self::merge_batcher::MergeBatcher as Batcher; diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 5c995e048..9cb566209 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -38,10 +38,10 @@ use super::merge_batcher::MergeBatcher; use abomonation::abomonated::Abomonated; /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine, O>>>; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine, Vec>>>; +pub type OrdValSpineAbom = Spine,O>, Vec>>>; /// A trace implementation for empty values using a spine of ordered lists. pub type OrdKeySpine = Spine>>; @@ -50,9 +50,9 @@ pub type OrdKeySpine = Spine>>; pub type OrdKeySpineAbom = Spine, Vec>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine, TimelyStack>>>; +pub type ColValSpine = Spine, O, TimelyStack, TimelyStack>>>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine>>>; +pub type ColKeySpine = Spine>>>; /// A container that can retain/discard from some offset onward. @@ -87,7 +87,7 @@ impl RetainFrom for TimelyStack { /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Debug, Abomonation)] -pub struct OrdValBatch, CV=Vec> +pub struct OrdValBatch, CV=Vec> where K: Ord+Clone, V: Ord+Clone, @@ -101,9 +101,11 @@ where pub layer: OrderedLayer, O, CV>, O, CK>, /// Description of the update times this layer represents. pub desc: Description, + /// Phantom marker + phantom: std::marker::PhantomData } -impl BatchReader for OrdValBatch +impl BatchReader for OrdValBatch where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -118,13 +120,13 @@ where type Time = T; type R = R; - type Cursor = OrdValCursor; + type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor(), phantom: std::marker::PhantomData } } fn len(&self) -> usize { , O, CV>, O, CK> as Trie>::tuples(&self.layer) } fn description(&self) -> &Description { &self.desc } } -impl Batch for OrdValBatch +impl Batch for OrdValBatch, O, CK, CV> where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -135,15 +137,36 @@ where CV: BatchContainer+Deref+RetainFrom, { type Batcher = MergeBatcher; - type Builder = OrdValBuilder; - type Merger = OrdValMerger; + type Builder = OrdValBuilder, O, CK, CV>; + type Merger = OrdValMerger, O, CK, CV>; fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { OrdValMerger::new(self, other, compaction_frontier) } } -impl OrdValBatch +use trace::implementations::merge_batcher_col; +impl Batch for OrdValBatch, O, CK, CV> +where + K: Ord+Clone+Columnation+'static, + V: Ord+Clone+Columnation+'static, + T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+Columnation+'static, + R: Semigroup+Columnation, + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, +{ + type Batcher = merge_batcher_col::ColumnatedMergeBatcher; + type Builder = OrdValBuilder, O, CK, CV>; + type Merger = OrdValMerger, O, CK, CV>; + + fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + OrdValMerger::new(self, other, compaction_frontier) + } +} + + +impl OrdValBatch where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -242,7 +265,7 @@ where } /// State for an in-progress merge. -pub struct OrdValMerger, CV=Vec> +pub struct OrdValMerger, CV=Vec> where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -262,9 +285,11 @@ where result: , O, CV>, O, CK> as Trie>::MergeBuilder, description: Description, should_compact: bool, + /// Phantom marker + phantom: std::marker::PhantomData } -impl Merger> for OrdValMerger +impl Merger> for OrdValMerger where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -273,8 +298,9 @@ where O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, CV: BatchContainer+Deref+RetainFrom, + OrdValBatch: Batch, { - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option>) -> Self { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -293,9 +319,10 @@ where result: <, O, CV>, O, CK> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), description: description, should_compact: compaction_frontier.is_some(), + phantom: std::marker::PhantomData, } } - fn done(self) -> OrdValBatch { + fn done(self) -> OrdValBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -303,9 +330,10 @@ where OrdValBatch { layer: self.result.done(), desc: self.description, + phantom: std::marker::PhantomData, } } - fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.vals.len(); let mut effort = 0isize; @@ -344,7 +372,7 @@ where // if we are supplied a frontier, we should compact. if self.should_compact { - OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -357,7 +385,7 @@ where /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdValCursor, CV=Vec> +pub struct OrdValCursor, CV=Vec> where V: Ord+Clone, T: Lattice+Ord+Clone, @@ -366,11 +394,11 @@ where CK: BatchContainer+Deref+RetainFrom, CV: BatchContainer+Deref+RetainFrom, { - phantom: std::marker::PhantomData<(K, CK, CV)>, + phantom: std::marker::PhantomData<(K, CK, CV, C)>, cursor: OrderedCursor, O, CV>>, } -impl Cursor for OrdValCursor +impl Cursor for OrdValCursor where K: Ord+Clone, V: Ord+Clone, @@ -385,7 +413,7 @@ where type Time = T; type R = R; - type Storage = OrdValBatch; + type Storage = OrdValBatch; fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { &self.cursor.child.key(&storage.layer.vals) } @@ -408,7 +436,7 @@ where /// A builder for creating layers from unsorted update tuples. -pub struct OrdValBuilder, CV=Vec> +pub struct OrdValBuilder, CV=Vec> where K: Ord+Clone, V: Ord+Clone, @@ -419,9 +447,11 @@ where CV: BatchContainer+Deref+RetainFrom, { builder: OrderedBuilder, O, CV>, O, CK>, + /// Phantom marker + phantom: std::marker::PhantomData } -impl Builder> for OrdValBuilder +impl Builder> for OrdValBuilder where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -430,16 +460,19 @@ where O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, CV: BatchContainer+Deref+RetainFrom, + OrdValBatch: Batch, { fn new() -> Self { OrdValBuilder { - builder: OrderedBuilder::, O, CV>, O, CK>::new() + builder: OrderedBuilder::, O, CV>, O, CK>::new(), + phantom: std::marker::PhantomData, } } fn with_capacity(cap: usize) -> Self { OrdValBuilder { - builder: , O, CV>, O, CK> as TupleBuilder>::with_capacity(cap) + builder: , O, CV>, O, CK> as TupleBuilder>::with_capacity(cap), + phantom: std::marker::PhantomData, } } @@ -449,10 +482,11 @@ where } #[inline(never)] - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { OrdValBatch { layer: self.builder.done(), - desc: Description::new(lower, upper, since) + desc: Description::new(lower, upper, since), + phantom: std::marker::PhantomData, } } } diff --git a/tests/trace.rs b/tests/trace.rs index d00c4497e..fde37bd4a 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -11,11 +11,11 @@ use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher}; use differential_dataflow::trace::cursor::Cursor; use differential_dataflow::trace::implementations::spine_fueled::Spine; -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine>>>; type IntegerTrace = OrdValSpine; -fn get_trace() -> Spine>> { +fn get_trace() -> Spine>>> { let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None, None); {