Skip to content

Commit

Permalink
Changes to track timely's #597 (#538)
Browse files Browse the repository at this point in the history
* Changes to track timely's #597

* Update dogs3

* Update interactive
  • Loading branch information
frankmcsherry authored Nov 11, 2024
1 parent 73772db commit 35f5180
Show file tree
Hide file tree
Showing 16 changed files with 39 additions and 116 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fnv="1.0.2"
timely = {workspace = true}

[workspace.dependencies]
timely = { version = "0.13", default-features = false }
timely = { version = "0.14", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[features]
Expand Down
4 changes: 1 addition & 3 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ where
let arrangement_stream = arrangement.stream;

let mut stash = HashMap::new();
let mut buffer = Vec::new();

let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());

Expand All @@ -167,10 +166,9 @@ where

// drain the first input, stashing requests.
input1.for_each(|capability, data| {
data.swap(&mut buffer);
stash.entry(capability.retain())
.or_insert(Vec::new())
.extend(buffer.drain(..))
.extend(data.drain(..))
});

// Drain input batches; although we do not observe them, we want access to the input
Expand Down
5 changes: 1 addition & 4 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ where
let mut logic1 = key_selector.clone();
let mut logic2 = key_selector.clone();

let mut buffer = Vec::new();

let mut key: K = supplied_key0;
let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
logic1(&update.0, &mut key);
Expand All @@ -64,10 +62,9 @@ where

// drain the first input, stashing requests.
input1.for_each(|capability, data| {
data.swap(&mut buffer);
stash.entry(capability.retain())
.or_insert(Vec::new())
.extend(buffer.drain(..))
.extend(data.drain(..))
});

// Drain input batches; although we do not observe them, we want access to the input
Expand Down
11 changes: 2 additions & 9 deletions interactive/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ where
let (mut park_out, park) = demux.new_output();
let (mut text_out, text) = demux.new_output();

let mut demux_buffer = Vec::new();

demux.build(move |_capability| {

move |_frontiers| {
Expand All @@ -71,8 +69,6 @@ where

input.for_each(|time, data| {

data.swap(&mut demux_buffer);

let mut operates_session = operates.session(&time);
let mut channels_session = channels.session(&time);
let mut schedule_session = schedule.session(&time);
Expand All @@ -81,7 +77,7 @@ where
let mut park_session = park.session(&time);
let mut text_session = text.session(&time);

for (time, _worker, datum) in demux_buffer.drain(..) {
for (time, _worker, datum) in data.drain(..) {

// Round time up to next multiple of `granularity_ns`.
let time_ns = (((time.as_nanos() as u64) / granularity_ns) + 1) * granularity_ns;
Expand Down Expand Up @@ -235,8 +231,6 @@ where
let (mut batch_out, batch) = demux.new_output();
let (mut merge_out, merge) = demux.new_output();

let mut demux_buffer = Vec::new();

demux.build(move |_capability| {

move |_frontiers| {
Expand All @@ -246,11 +240,10 @@ where

input.for_each(|time, data| {

data.swap(&mut demux_buffer);
let mut batch_session = batch.session(&time);
let mut merge_session = merge.session(&time);

for (time, _worker, datum) in demux_buffer.drain(..) {
for (time, _worker, datum) in data.drain(..) {

// Round time up to next multiple of `granularity_ns`.
let time_ns = (((time.as_nanos() as u64) / granularity_ns) + 1) * granularity_ns;
Expand Down
2 changes: 1 addition & 1 deletion src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ pub mod sink {
}

// Now record the update to the writer.
send_queue.push_back(Message::Updates(updates.replace(Vec::new())));
send_queue.push_back(Message::Updates(std::mem::take(updates)));

// Transmit timestamp counts downstream.
output
Expand Down
6 changes: 2 additions & 4 deletions src/dynamic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,20 @@ where
let (mut output, stream) = builder.new_output();
let mut input = builder.new_input_connection(&self.inner, Pipeline, vec![Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } })]);

let mut vector = Default::default();
builder.build(move |_capability| move |_frontier| {
let mut output = output.activate();
input.for_each(|cap, data| {
data.swap(&mut vector);
let mut new_time = cap.time().clone();
let mut vec = std::mem::take(&mut new_time.inner).into_vec();
vec.truncate(level - 1);
new_time.inner = PointStamp::new(vec);
let new_cap = cap.delayed(&new_time);
for (_data, time, _diff) in vector.iter_mut() {
for (_data, time, _diff) in data.iter_mut() {
let mut vec = std::mem::take(&mut time.inner).into_vec();
vec.truncate(level - 1);
time.inner = PointStamp::new(vec);
}
output.session(&new_cap).give_container(&mut vector);
output.session(&new_cap).give_container(data);
});
});

Expand Down
4 changes: 1 addition & 3 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ where

// Tracks the lower envelope of times in `priority_queue`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut buffer = Vec::new();
// Form the trace we will both use internally and publish.
let activator = Some(stream.scope().activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
Expand All @@ -186,8 +185,7 @@ where
// Stash capabilities and associated data (ordered by time).
input.for_each(|cap, data| {
capabilities.insert(cap.retain());
data.swap(&mut buffer);
for (key, val, time) in buffer.drain(..) {
for (key, val, time) in data.drain(..) {
priority_queue.push(std::cmp::Reverse((time, key, val)))
}
});
Expand Down
4 changes: 1 addition & 3 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,9 @@ where
self.inner
.unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {

let mut vector = Vec::new();
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
output.session_with_builder(&time).give_iterator(vector.drain(..));
output.session_with_builder(&time).give_iterator(data.drain(..));
})
}
})
Expand Down
4 changes: 1 addition & 3 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ where
fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> Collection<G, (K, T1::Diff), R2> {

let mut trace = self.trace.clone();
let mut buffer = Vec::new();

self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {

Expand All @@ -87,8 +86,7 @@ where
if cap.is_none() { // NB: Assumes batches are in-order
cap = Some(capability.retain());
}
batches.swap(&mut buffer);
for batch in buffer.drain(..) {
for batch in batches.drain(..) {
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
Expand Down
10 changes: 2 additions & 8 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,6 @@ where
let mut trace1_option = Some(trace1);
let mut trace2_option = Some(trace2);

// Swappable buffers for input extraction.
let mut input1_buffer = Vec::new();
let mut input2_buffer = Vec::new();

move |input1, input2, output| {

// 1. Consuming input.
Expand All @@ -468,8 +464,7 @@ where
// This test *should* always pass, as we only drop a trace in response to the other input emptying.
if let Some(ref mut trace2) = trace2_option {
let capability = capability.retain();
data.swap(&mut input1_buffer);
for batch1 in input1_buffer.drain(..) {
for batch1 in data.drain(..) {
// Ignore any pre-loaded data.
if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
if !batch1.is_empty() {
Expand All @@ -496,8 +491,7 @@ where
// This test *should* always pass, as we only drop a trace in response to the other input emptying.
if let Some(ref mut trace1) = trace1_option {
let capability = capability.retain();
data.swap(&mut input2_buffer);
for batch2 in input2_buffer.drain(..) {
for batch2 in data.drain(..) {
// Ignore any pre-loaded data.
if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
if !batch2.is_empty() {
Expand Down
5 changes: 1 addition & 4 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,6 @@ where
let mut output_upper = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
let mut output_lower = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

let mut input_buffer = Vec::new();

let id = trace.stream.scope().index();

move |input, output| {
Expand Down Expand Up @@ -409,8 +407,7 @@ where
// times in the batch.
input.for_each(|capability, batches| {

batches.swap(&mut input_buffer);
for batch in input_buffer.drain(..) {
for batch in batches.drain(..) {
upper_limit.clone_from(batch.upper());
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
Expand Down
4 changes: 1 addition & 3 deletions src/operators/threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ where
{

let mut trace = self.trace.clone();
let mut buffer = Vec::new();

self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {

Expand All @@ -134,8 +133,7 @@ where
if cap.is_none() { // NB: Assumes batches are in-order
cap = Some(capability.retain());
}
batches.swap(&mut buffer);
for batch in buffer.drain(..) {
for batch in batches.drain(..) {
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
Expand Down
81 changes: 19 additions & 62 deletions src/trace/implementations/chunker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Organize streams of data into sorted chunks.

use std::collections::VecDeque;
use timely::communication::message::RefOrMut;
use timely::Container;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
Expand Down Expand Up @@ -64,14 +63,14 @@ where
}
}

impl<'a, K, V, T, R> PushInto<RefOrMut<'a, Vec<((K, V), T, R)>>> for VecChunker<((K, V), T, R)>
impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)>
where
K: Ord + Clone,
V: Ord + Clone,
T: Ord + Clone,
R: Semigroup + Clone,
{
fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) {
fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
// Important: Consolidation requires `pending` to have twice the chunk capacity to
Expand All @@ -80,27 +79,11 @@ where
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}

// `container` is either a shared reference or an owned allocations.
match container {
RefOrMut::Ref(vec) => {
let mut slice = &vec[..];
while !slice.is_empty() {
let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len()));
slice = tail;
self.pending.extend_from_slice(head);
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
RefOrMut::Mut(vec) => {
let mut drain = vec.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
let mut drain = container.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
Expand Down Expand Up @@ -196,41 +179,25 @@ where
}
}

impl<'a, K, V, T, R> PushInto<RefOrMut<'a, Vec<((K, V), T, R)>>> for ColumnationChunker<((K, V), T, R)>
impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for ColumnationChunker<((K, V), T, R)>
where
K: Columnation + Ord + Clone,
V: Columnation + Ord + Clone,
T: Columnation + Ord + Clone,
R: Columnation + Semigroup + Clone,
{
fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) {
fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
if self.pending.capacity() < Self::chunk_capacity() * 2 {
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}

// `container` is either a shared reference or an owned allocations.
match container {
RefOrMut::Ref(vec) => {
let mut slice = &vec[..];
while !slice.is_empty() {
let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len()));
slice = tail;
self.pending.extend_from_slice(head);
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
RefOrMut::Mut(vec) => {
let mut drain = vec.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
let mut drain = container.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
Expand Down Expand Up @@ -288,15 +255,15 @@ where
}
}

impl<'a, Input, Output> PushInto<RefOrMut<'a, Input>> for ContainerChunker<Output>
impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
where
Input: Container,
Output: SizableContainer
+ ConsolidateLayout
+ PushInto<Input::Item<'a>>
+ PushInto<Input::ItemRef<'a>>,
{
fn push_into(&mut self, container: RefOrMut<'a, Input>) {
fn push_into(&mut self, container: &'a mut Input) {
if self.pending.capacity() < Output::preferred_capacity() {
self.pending.reserve(Output::preferred_capacity() - self.pending.len());
}
Expand All @@ -313,19 +280,9 @@ where
}
}
};
match container {
RefOrMut::Ref(container) => {
for item in container.iter() {
self.pending.push(item);
form_batch(self);
}
}
RefOrMut::Mut(container) => {
for item in container.drain() {
self.pending.push(item);
form_batch(self);
}
}
for item in container.drain() {
self.pending.push(item);
form_batch(self);
}
}
}
Expand Down
Loading

0 comments on commit 35f5180

Please sign in to comment.