Skip to content

Commit

Permalink
Cursor repivoting (#435)
Browse files Browse the repository at this point in the history
* Make Storage an associated type, again

* Reorient reduce::perkeycompute to depend on cursors

* Pivot history replay to be based on cursors

* Rename Cursor::R associated type to Diff

* Encourage idiomatic cursor use

* Update dogs3
  • Loading branch information
frankmcsherry authored Nov 28, 2023
1 parent 0b68301 commit 5a23ad6
Show file tree
Hide file tree
Showing 32 changed files with 512 additions and 445 deletions.
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub fn count<G, Tr, R, F, P>(
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Val=(), Time=G::Timestamp, R=isize>+Clone+'static,
Tr: TraceReader<Val=(), Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
Expand Down
20 changes: 10 additions & 10 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,27 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates};
/// once out of the "delta flow region", the updates will be `delay`d to the
/// times specified in the payloads.
pub fn half_join<G, V, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (Tr::Key, V, G::Timestamp), Tr::R>,
stream: &Collection<G, (Tr::Key, V, G::Timestamp), Tr::Diff>,
arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
mut output_func: S,
) -> Collection<G, (DOut, G::Timestamp), Tr::R>
) -> Collection<G, (DOut, G::Timestamp), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
V: ExchangeData,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+ExchangeData,
Tr::Val: Clone,
Tr::R: Monoid+ExchangeData,
Tr::Diff: Monoid+ExchangeData,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
Tr::R: std::ops::Mul<Tr::R, Output=Tr::R>,
Tr::Diff: std::ops::Mul<Tr::Diff, Output=Tr::Diff>,
S: FnMut(&Tr::Key, &V, &Tr::Val)->DOut+'static,
{
let output_func = move |k: &Tr::Key, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &Tr::R, diff2: &Tr::R| {
let output_func = move |k: &Tr::Key, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &Tr::Diff, diff2: &Tr::Diff| {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
Expand Down Expand Up @@ -121,7 +121,7 @@ where
/// records. Note this is not the number of *output* records, owing mainly to
/// the number of matched records being easiest to record with low overhead.
pub fn half_join_internal_unsafe<G, V, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (Tr::Key, V, G::Timestamp), Tr::R>,
stream: &Collection<G, (Tr::Key, V, G::Timestamp), Tr::Diff>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
Expand All @@ -135,14 +135,14 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+ExchangeData,
Tr::Val: Clone,
Tr::R: Monoid+ExchangeData,
Tr::Diff: Monoid+ExchangeData,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
ROut: Monoid,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::R, &Tr::R)-> I + 'static,
S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::Diff, &Tr::Diff)-> I + 'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -152,7 +152,7 @@ where
let mut stash = HashMap::new();
let mut buffer = Vec::new();

let exchange = Exchange::new(move |update: &((Tr::Key, V, G::Timestamp),G::Timestamp,Tr::R)| (update.0).0.hashed().into());
let exchange = Exchange::new(move |update: &((Tr::Key, V, G::Timestamp),G::Timestamp,Tr::Diff)| (update.0).0.hashed().into());

// Stash for (time, diff) accumulation.
let mut output_buffer = Vec::new();
Expand Down Expand Up @@ -229,7 +229,7 @@ where
}
cursor.rewind_vals(&storage);
}
*diff1 = Tr::R::zero();
*diff1 = Tr::Diff::zero();
}
}

Expand Down
6 changes: 3 additions & 3 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ where
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Sized,
Tr::Val: Clone,
Tr::R: Monoid+ExchangeData,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::Key)+Clone+'static,
D: ExchangeData,
R: ExchangeData+Monoid,
DOut: Clone+'static,
ROut: Monoid,
S: FnMut(&D, &R, &Tr::Val, &Tr::R)->(DOut, ROut)+'static,
S: FnMut(&D, &R, &Tr::Val, &Tr::Diff)->(DOut, ROut)+'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand Down Expand Up @@ -99,7 +99,7 @@ where
cursor.seek_key(&storage, &key1);
if cursor.get_key(&storage) == Some(&key1) {
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::R::zero();
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
if t.less_equal(time) { count.plus_equals(d); }
});
Expand Down
12 changes: 6 additions & 6 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ use differential_dataflow::trace::TraceReader;
/// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case
/// of delta queries.
pub fn propose<G, Tr, F, P>(
prefixes: &Collection<G, P, Tr::R>,
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::Val), Tr::R>
) -> Collection<G, (P, Tr::Val), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
P: ExchangeData,
{
Expand All @@ -46,17 +46,17 @@ where
/// prefixes by the number of matches in `arrangement`. This can be useful to
/// avoid the need to prepare an arrangement of distinct extensions.
pub fn propose_distinct<G, Tr, F, P>(
prefixes: &Collection<G, P, Tr::R>,
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::Val), Tr::R>
) -> Collection<G, (P, Tr::Val), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
P: ExchangeData,
{
Expand Down
6 changes: 3 additions & 3 deletions dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ use differential_dataflow::trace::TraceReader;
/// key with `key_selector` and then proposes all pair af the prefix
/// and values associated with the key in `arrangement`.
pub fn validate<G, K, V, Tr, F, P>(
extensions: &Collection<G, (P, V), Tr::R>,
extensions: &Collection<G, (P, V), Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, V), Tr::R>
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Key=(K,V), Val=(), Time=G::Timestamp>+Clone+'static,
K: Ord+Hash+Clone+Default,
V: ExchangeData+Hash+Default,
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
{
Expand Down
2 changes: 1 addition & 1 deletion examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ where
Tr::Key: Debug + Clone,
Tr::Val: Debug + Clone,
Tr::Time: Debug + Clone,
Tr::R: Debug + Clone,
Tr::Diff: Debug + Clone,
{
let (mut cursor, storage) = trace.cursor();
for ((k, v), diffs) in cursor.to_vec(&storage).iter() {
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=isize>+Clone+'static,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=isize>+Clone+'static,
{
forward
.stream
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=R>+Clone+'static,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
type Key = Tr::Key;
type Val = Tr::Val;
type Time = Tr::Time;
type R = Tr::R;
type Diff = Tr::Diff;

type Batch = Tr::Batch;
type Storage = Tr::Storage;
Expand Down
28 changes: 14 additions & 14 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ where
where
Tr::Key: 'static,
Tr::Val: 'static,
Tr::R: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
{
Expand All @@ -110,7 +110,7 @@ where
where
Tr::Key: 'static,
Tr::Val: 'static,
Tr::R: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
{
Arranged {
Expand All @@ -129,7 +129,7 @@ where
where
Tr::Key: 'static,
Tr::Val: 'static,
Tr::R: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
F: FnMut(&Tr::Key, &Tr::Val, &G::Timestamp)->TInner+Clone+'static,
Expand Down Expand Up @@ -179,7 +179,7 @@ where
where
Tr::Key: 'static,
Tr::Val: 'static,
Tr::R: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
F: FnMut(&Tr::Key, &Tr::Val)->bool+Clone+'static,
{
Expand All @@ -195,9 +195,9 @@ where
/// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
/// and this method should only be used when the data need to be transformed or exchanged, rather than
/// supplied as arguments to an operator using the same key-value structure.
pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::R>
pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
where
Tr::R: Semigroup,
Tr::Diff: Semigroup,
L: FnMut(&Tr::Key, &Tr::Val) -> D+'static,
{
self.flat_map_ref(move |key, val| Some(logic(key,val)))
Expand All @@ -207,9 +207,9 @@ where
///
/// The supplied logic may produce an iterator over output values, allowing either
/// filtering or flat mapping as part of the extraction.
pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::R>
pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
where
Tr::R: Semigroup,
Tr::Diff: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(&Tr::Key, &Tr::Val) -> I+'static,
Expand All @@ -224,9 +224,9 @@ where
///
/// This method exists for streams of batches without the corresponding arrangement.
/// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::R>
pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::Diff>
where
Tr::R: Semigroup,
Tr::Diff: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(&Tr::Key, &Tr::Val) -> I+'static,
Expand Down Expand Up @@ -258,12 +258,12 @@ where
///
/// This method consumes a stream of (key, time) queries and reports the corresponding stream of
/// (key, value, time, diff) accumulations in the `self` trace.
pub fn lookup(&self, queries: &Stream<G, (Tr::Key, G::Timestamp)>) -> Stream<G, (Tr::Key, Tr::Val, G::Timestamp, Tr::R)>
pub fn lookup(&self, queries: &Stream<G, (Tr::Key, G::Timestamp)>) -> Stream<G, (Tr::Key, Tr::Val, G::Timestamp, Tr::Diff)>
where
G::Timestamp: Data+Lattice+Ord+TotalOrder,
Tr::Key: ExchangeData+Hashable,
Tr::Val: ExchangeData,
Tr::R: ExchangeData+Semigroup,
Tr::Diff: ExchangeData+Semigroup,
Tr: 'static,
{
// while the arrangement is already correctly distributed, the query stream may not be.
Expand All @@ -280,8 +280,8 @@ where
let mut active = Vec::new();
let mut retain = Vec::new();

let mut working: Vec<(G::Timestamp, Tr::Val, Tr::R)> = Vec::new();
let mut working2: Vec<(Tr::Val, Tr::R)> = Vec::new();
let mut working: Vec<(G::Timestamp, Tr::Val, Tr::Diff)> = Vec::new();
let mut working2: Vec<(Tr::Val, Tr::Diff)> = Vec::new();

move |input1, input2, output| {

Expand Down
4 changes: 2 additions & 2 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ where
G::Timestamp: Lattice+Ord+TotalOrder+ExchangeData,
Tr::Key: ExchangeData+Hashable+std::hash::Hash,
Tr::Val: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp,R=isize>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp,Diff=isize>+'static,
Tr::Batch: Batch,
Tr::Builder: Builder<Item = ((Tr::Key, Tr::Val), Tr::Time, Tr::R)>,
Tr::Builder: Builder<Item = ((Tr::Key, Tr::Val), Tr::Time, Tr::Diff)>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down
2 changes: 1 addition & 1 deletion src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
/// As `consolidate` but with the ability to name the operator and specify the trace type.
pub fn consolidate_named<Tr>(&self, name: &str) -> Self
where
Tr: crate::trace::Trace+crate::trace::TraceReader<Key=D,Val=(),Time=G::Timestamp,R=R>+'static,
Tr: crate::trace::Trace+crate::trace::TraceReader<Key=D,Val=(),Time=G::Timestamp,Diff=R>+'static,
Tr::Batch: crate::trace::Batch,
Tr::Batcher: Batcher<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
Expand Down
12 changes: 5 additions & 7 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ where G::Timestamp: TotalOrder+Lattice+Ord {
}
}

impl<G: Scope, T1> CountTotal<G, T1::Key, T1::R> for Arranged<G, T1>
impl<G: Scope, T1> CountTotal<G, T1::Key, T1::Diff> for Arranged<G, T1>
where
G::Timestamp: TotalOrder+Lattice+Ord,
T1: TraceReader<Val=(), Time=G::Timestamp>+Clone+'static,
T1::Key: ExchangeData,
T1::R: ExchangeData+Semigroup,
T1::Diff: ExchangeData+Semigroup,
{
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::Key, T1::R), R2> {
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::Key, T1::Diff), R2> {

let mut trace = self.trace.clone();
let mut buffer = Vec::new();
Expand All @@ -82,10 +82,8 @@ where
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
upper_limit.clone_from(batch.upper());

while batch_cursor.key_valid(&batch) {

let key = batch_cursor.key(&batch);
let mut count: Option<T1::R> = None;
while let Some(key) = batch_cursor.get_key(&batch) {
let mut count: Option<T1::Diff> = None;

trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
Expand Down
Loading

0 comments on commit 5a23ad6

Please sign in to comment.