Skip to content

Commit

Permalink
Use consolidating builder in consolidate_stream (#488)
Browse files Browse the repository at this point in the history
This switches from consolidating each input individually to
consolidating at the output. The benefits are that it can yield better
consolidation performance because it can consolidate across input
containers instead of only being able to consolidate each individual
input container.

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru authored May 15, 2024
1 parent b215b9a commit 7063c6f
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use timely::dataflow::Scope;

use crate::{Collection, ExchangeData, Hashable};
use crate::consolidation::ConsolidatingContainerBuilder;
use crate::difference::Semigroup;

use crate::Data;
Expand Down Expand Up @@ -92,14 +93,13 @@ where
use crate::collection::AsCollection;

self.inner
.unary(Pipeline, "ConsolidateStream", |_cap, _info| {
.unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {

let mut vector = Vec::new();
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
crate::consolidation::consolidate_updates(&mut vector);
output.session(&time).give_container(&mut vector);
output.session_with_builder(&time).give_container(&mut vector);
})
}
})
Expand Down

0 comments on commit 7063c6f

Please sign in to comment.