diff --git a/examples/accumulate.rs b/examples/accumulate.rs index 4693431e4..0fa792926 100644 --- a/examples/accumulate.rs +++ b/examples/accumulate.rs @@ -16,7 +16,14 @@ fn main() { let mut input = worker.dataflow::<(), _, _>(|scope| { let (input, data) = scope.new_collection::<_, isize>(); - data.consolidate(); + + use timely::dataflow::Scope; + scope.iterative::(|inner| { + data.enter_at(inner, |_| 0) + .consolidate() + .leave() + }); + input }); diff --git a/src/collection.rs b/src/collection.rs index aeb6aad38..2e82c7e62 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -322,20 +322,16 @@ impl Collection where G::Timestamp: Da /// data.assert_eq(&result); /// }); /// ``` - pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, initial: F) -> Collection, D, R> + pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection, D, R> where T: Timestamp+Hash, F: FnMut(&D) -> T + Clone + 'static, G::Timestamp: Hash, { - - let mut initial1 = initial.clone(); - let mut initial2 = initial.clone(); - self.inner - .enter_at(child, move |x| initial1(&x.0)) + .enter(child) .map(move |(data, time, diff)| { - let new_time = Product::new(time, initial2(&data)); + let new_time = Product::new(time, initial(&data)); (data, new_time, diff) }) .as_collection()