From 93765600d73b697a5898e2519db701079d50752b Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 9 Sep 2024 21:49:34 +0200 Subject: [PATCH] Update to latest Timely (#519) * Update to latest Timely Bring in changes related to abomination and reference-counted addresses. Signed-off-by: Moritz Hoffmann * Serde for batch Signed-off-by: Moritz Hoffmann --------- Signed-off-by: Moritz Hoffmann --- Cargo.toml | 2 - dogsdogsdogs/Cargo.toml | 2 - dogsdogsdogs/src/altneu.rs | 3 +- dogsdogsdogs/src/operators/half_join.rs | 4 +- examples/arrange.rs | 2 +- examples/monoid-bfs.rs | 3 +- examples/multitemporal.rs | 6 +- experiments/Cargo.toml | 2 - experiments/src/bin/multitemporal.rs | 6 +- server/dataflows/random_graph/src/lib.rs | 2 +- src/capture.rs | 19 ++-- src/difference.rs | 3 +- src/dynamic/pointstamp.rs | 9 +- src/logging.rs | 16 +-- src/operators/arrange/agent.rs | 8 +- src/operators/arrange/arrangement.rs | 2 +- src/operators/arrange/upsert.rs | 2 +- src/operators/join.rs | 2 +- src/operators/reduce.rs | 2 +- src/trace/description.rs | 3 +- src/trace/implementations/mod.rs | 8 +- src/trace/implementations/ord_neu.rs | 25 +++-- src/trace/implementations/rhh.rs | 21 ++-- src/trace/mod.rs | 122 ----------------------- tests/trace.rs | 2 +- 25 files changed, 76 insertions(+), 200 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0cca0a4bd..0c23ce087 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,8 +42,6 @@ graph_map = "0.1" [dependencies] serde = { version = "1.0", features = ["derive"] } -abomonation = "0.7" -abomonation_derive = "0.5" fnv="1.0.2" timely = {workspace = true} diff --git a/dogsdogsdogs/Cargo.toml b/dogsdogsdogs/Cargo.toml index 49dc52f52..1d21ea7a9 100644 --- a/dogsdogsdogs/Cargo.toml +++ b/dogsdogsdogs/Cargo.toml @@ -6,8 +6,6 @@ license = "MIT" edition = "2021" [dependencies] -abomonation = "0.7" -abomonation_derive = "0.5" timely = { workspace = true } differential-dataflow = { path = "../", default-features = false } serde = "1" diff --git a/dogsdogsdogs/src/altneu.rs b/dogsdogsdogs/src/altneu.rs index d1cf8cd05..c025f9f83 100644 --- a/dogsdogsdogs/src/altneu.rs +++ b/dogsdogsdogs/src/altneu.rs @@ -11,11 +11,10 @@ //! element of the second lattice, if neither first element equals //! the join. -use abomonation_derive::Abomonation; use serde_derive::{Deserialize, Serialize}; /// A pair of timestamps, partially ordered by the product order. -#[derive(Debug, Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Serialize, Deserialize)] +#[derive(Debug, Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub struct AltNeu { pub time: T, pub neu: bool, // alt < neu in timestamp comparisons. diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index f0b50ad4e..5eb5a872e 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -161,9 +161,7 @@ where stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| { // Acquire an activator to reschedule the operator when it has unfinished work. - use timely::scheduling::Activator; - let activations = stream.scope().activations(); - let activator = Activator::new(&info.address[..], activations); + let activator = stream.scope().activator_for(info.address); move |input1, input2, output| { diff --git a/examples/arrange.rs b/examples/arrange.rs index 69e39b739..afe412440 100644 --- a/examples/arrange.rs +++ b/examples/arrange.rs @@ -36,7 +36,7 @@ fn main() { // create a source operator which will produce random edges and delete them. timely::dataflow::operators::generic::source(scope, "RandomGraph", |mut capability, info| { - let activator = scope.activator_for(&info.address[..]); + let activator = scope.activator_for(info.address); let seed: &[_] = &[1, 2, 3, index]; let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions diff --git a/examples/monoid-bfs.rs b/examples/monoid-bfs.rs index f61ae44ea..9b1b367d2 100644 --- a/examples/monoid-bfs.rs +++ b/examples/monoid-bfs.rs @@ -1,4 +1,3 @@ -use abomonation_derive::Abomonation; use rand::{Rng, SeedableRng, StdRng}; use serde::{Deserialize, Serialize}; @@ -13,7 +12,7 @@ use differential_dataflow::lattice::Lattice; type Node = u32; type Edge = (Node, Node); -#[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)] +#[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)] pub struct MinSum { value: u32, } diff --git a/examples/multitemporal.rs b/examples/multitemporal.rs index 0a3506a77..b0515c207 100644 --- a/examples/multitemporal.rs +++ b/examples/multitemporal.rs @@ -136,7 +136,7 @@ fn main() { mod pair { /// A pair of timestamps, partially ordered by the product order. - #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Serialize, Deserialize)] + #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub struct Pair { pub first: S, pub second: T, @@ -203,7 +203,6 @@ mod pair { } use std::fmt::{Formatter, Error, Debug}; - use abomonation_derive::Abomonation; use serde::{Deserialize, Serialize}; /// Debug implementation to avoid seeing fully qualified path names. @@ -221,11 +220,10 @@ mod pair { /// from the rest of the library other than the traits it needs to implement. With this /// type and its implementations, you can use it as a timestamp type. mod vector { - use abomonation_derive::Abomonation; use serde::{Deserialize, Serialize}; /// A pair of timestamps, partially ordered by the product order. - #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Debug, Serialize, Deserialize)] + #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)] pub struct Vector { pub vector: Vec, } diff --git a/experiments/Cargo.toml b/experiments/Cargo.toml index 4905a644a..1f9db3b9a 100644 --- a/experiments/Cargo.toml +++ b/experiments/Cargo.toml @@ -7,8 +7,6 @@ edition = "2021" [dependencies] core_affinity = "0.5.9" rand="0.3.13" -abomonation = "0.7" -abomonation_derive = "0.5" #timely = "0.7" timely = { workspace = true } differential-dataflow = { path = "../" } diff --git a/experiments/src/bin/multitemporal.rs b/experiments/src/bin/multitemporal.rs index 9d64e87c0..3a98cab43 100644 --- a/experiments/src/bin/multitemporal.rs +++ b/experiments/src/bin/multitemporal.rs @@ -164,7 +164,7 @@ fn main() { mod pair { /// A pair of timestamps, partially ordered by the product order. - #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Serialize, Deserialize)] + #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub struct Pair { pub first: S, pub second: T, @@ -231,7 +231,6 @@ mod pair { } use std::fmt::{Formatter, Error, Debug}; - use abomonation_derive::Abomonation; use serde::{Deserialize, Serialize}; /// Debug implementation to avoid seeing fully qualified path names. @@ -249,11 +248,10 @@ mod pair { /// from the rest of the library other than the traits it needs to implement. With this /// type and its implementations, you can use it as a timestamp type. mod vector { - use abomonation_derive::Abomonation; use serde::{Deserialize, Serialize}; /// A pair of timestamps, partially ordered by the product order. - #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Debug, Serialize, Deserialize)] + #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)] pub struct Vector { pub vector: Vec, } diff --git a/server/dataflows/random_graph/src/lib.rs b/server/dataflows/random_graph/src/lib.rs index 60261a1dc..1ea57f7da 100644 --- a/server/dataflows/random_graph/src/lib.rs +++ b/server/dataflows/random_graph/src/lib.rs @@ -66,7 +66,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(), let mut trace = source(dataflow, "RandomGraph", |cap, info| { - let activator = dataflow.activator_for(&info.address[..]); + let activator = dataflow.activator_for(info.address); let mut hist = hdrhist::HDRHist::new(); let probe2 = probe.clone(); diff --git a/src/capture.rs b/src/capture.rs index c08f3c05b..d546ad4a2 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -10,11 +10,10 @@ //! this file. use std::time::Duration; -use abomonation_derive::Abomonation; use serde::{Deserialize, Serialize}; /// A message in the CDC V2 protocol. -#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)] +#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] pub enum Message { /// A batch of updates that are certain to occur. /// @@ -32,7 +31,7 @@ pub enum Message { /// Each element of `counts` is an irrevocable statement about the exact number of /// distinct updates that occur at that time. /// Times not present in `counts` have a count of zero. -#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)] +#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] pub struct Progress { /// The lower bound of times contained in this statement. pub lower: Vec, @@ -310,9 +309,9 @@ pub mod source { // Step 1: The MESSAGES operator. let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone()); let address = messages_op.operator_info().address; - let activator = scope.sync_activator_for(&address); - let activator2 = scope.activator_for(&address); - let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(&address)) }; + let activator = scope.sync_activator_for(address.to_vec()); + let activator2 = scope.activator_for(Rc::clone(&address)); + let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) }; let mut source = source_builder(activator); let (mut updates_out, updates) = messages_op.new_output(); let (mut progress_out, progress) = messages_op.new_output(); @@ -582,13 +581,13 @@ pub mod sink { // We can simply record all updates, under the presumption that the have been consolidated // and so any record we see is in fact guaranteed to happen. let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope()); - let reactivator = stream.scope().activator_for(&builder.operator_info().address); + let reactivator = stream.scope().activator_for(builder.operator_info().address); let mut input = builder.new_input(stream, Pipeline); let (mut updates_out, updates) = builder.new_output(); builder.build_reschedule( move |_capability| { - let mut timestamps = ChangeBatch::new(); + let mut timestamps = >::new(); let mut send_queue = std::collections::VecDeque::new(); move |_frontiers| { let mut output = updates_out.activate(); @@ -636,7 +635,7 @@ pub mod sink { // We use a lower-level builder here to get access to the operator address, for rescheduling. let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope()); - let reactivator = stream.scope().activator_for(&builder.operator_info().address); + let reactivator = stream.scope().activator_for(builder.operator_info().address); let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash)); let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers(); @@ -644,7 +643,7 @@ pub mod sink { // Track the advancing frontier, to know when to produce utterances. let mut frontier = Antichain::from_elem(T::minimum()); // Track accumulated counts for timestamps. - let mut timestamps = ChangeBatch::new(); + let mut timestamps = >::new(); // Stash for serialized data yet to send. let mut send_queue = std::collections::VecDeque::new(); let mut retain = Vec::new(); diff --git a/src/difference.rs b/src/difference.rs index 71c724905..569ba42aa 100644 --- a/src/difference.rs +++ b/src/difference.rs @@ -157,7 +157,6 @@ wrapping_implementation!(std::num::Wrapping); pub use self::present::Present; mod present { - use abomonation_derive::Abomonation; use serde::{Deserialize, Serialize}; /// A zero-sized difference that indicates the presence of a record. @@ -168,7 +167,7 @@ mod present { /// The primary feature of this type is that it has zero size, which reduces the overhead /// of differential dataflow's representations for settings where collections either do /// not change, or for which records are only added (for example, derived facts in Datalog). - #[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)] + #[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)] pub struct Present; impl super::Multiply for Present { diff --git a/src/dynamic/pointstamp.rs b/src/dynamic/pointstamp.rs index 14a256e33..0199fe85d 100644 --- a/src/dynamic/pointstamp.rs +++ b/src/dynamic/pointstamp.rs @@ -11,16 +11,13 @@ //! (as iteration within a scope requires leaving contained scopes), and then to any number of appended //! default coordinates (which is effectively just *setting* the coordinate). -use abomonation_derive::Abomonation; use serde::{Deserialize, Serialize}; /// A sequence of timestamps, partially ordered by the product order. /// /// Sequences of different lengths are compared as if extended indefinitely by `T::minimum()`. /// Sequences are guaranteed to be "minimal", and may not end with `T::minimum()` entries. -#[derive( - Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation, -)] +#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)] pub struct PointStamp { /// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes. vector: Vec, @@ -118,9 +115,7 @@ impl Refines<()> for PointStamp { use timely::progress::PathSummary; /// Describes an action on a `PointStamp`: truncation to `length` followed by `actions`. -#[derive( - Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation -)] +#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)] pub struct PointStampSummary { /// Number of leading coordinates to retain. /// diff --git a/src/logging.rs b/src/logging.rs index e42be4ee1..493c4e4e1 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -1,6 +1,6 @@ //! Loggers and logging events for differential dataflow. -use abomonation_derive::Abomonation; +use serde::{Deserialize, Serialize}; /// Logger for differential dataflow events. pub type Logger = ::timely::logging::Logger; @@ -19,7 +19,7 @@ where } /// Possible different differential events. -#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)] +#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)] pub enum DifferentialEvent { /// Batch creation. Batch(BatchEvent), @@ -36,7 +36,7 @@ pub enum DifferentialEvent { } /// Either the start or end of a merge event. -#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)] +#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)] pub struct BatchEvent { /// Operator identifier. pub operator: usize, @@ -48,7 +48,7 @@ impl From for DifferentialEvent { fn from(e: BatchEvent) -> Self { D /// Either the start or end of a merge event. -#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)] +#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)] pub struct BatcherEvent { /// Operator identifier. pub operator: usize, @@ -65,7 +65,7 @@ pub struct BatcherEvent { impl From for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } } /// Either the start or end of a merge event. -#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)] +#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)] pub struct DropEvent { /// Operator identifier. pub operator: usize, @@ -76,7 +76,7 @@ pub struct DropEvent { impl From for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } } /// Either the start or end of a merge event. -#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)] +#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)] pub struct MergeEvent { /// Operator identifier. pub operator: usize, @@ -93,7 +93,7 @@ pub struct MergeEvent { impl From for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } } /// A merge failed to complete in time. -#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)] +#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)] pub struct MergeShortfall { /// Operator identifer. pub operator: usize, @@ -106,7 +106,7 @@ pub struct MergeShortfall { impl From for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } } /// Either the start or end of a merge event. -#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)] +#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)] pub struct TraceShare { /// Operator identifier. pub operator: usize, diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 5357dc931..bc74e82cd 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -302,10 +302,10 @@ where let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new()))); - let activator = scope.activator_for(&info.address[..]); + let activator = scope.activator_for(Rc::clone(&info.address)); let queue = self.new_listener(activator); - let activator = scope.activator_for(&info.address[..]); + let activator = scope.activator_for(info.address); *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator)); capabilities.borrow_mut().as_mut().unwrap().insert(capability); @@ -439,10 +439,10 @@ where let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new()))); - let activator = scope.activator_for(&info.address[..]); + let activator = scope.activator_for(Rc::clone(&info.address)); let queue = self.new_listener(activator); - let activator = scope.activator_for(&info.address[..]); + let activator = scope.activator_for(info.address); *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator)); capabilities.borrow_mut().as_mut().unwrap().insert(capability); diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index ef14f2b92..ecebb2068 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -439,7 +439,7 @@ where // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); - let activator = Some(scope.activator_for(&info.address[..])); + let activator = Some(scope.activator_for(info.address.clone())); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); // If there is default exertion logic set, install it. if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 3583f4473..5d687856f 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -163,7 +163,7 @@ where let mut capabilities = Antichain::>::new(); let mut buffer = Vec::new(); // Form the trace we will both use internally and publish. - let activator = Some(stream.scope().activator_for(&info.address[..])); + let activator = Some(stream.scope().activator_for(info.address.clone())); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); if let Some(exert_logic) = stream.scope().config().get::("differential/default_exert_logic").cloned() { diff --git a/src/operators/join.rs b/src/operators/join.rs index cfa7cbb94..e5772d1c6 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -381,7 +381,7 @@ where // Acquire an activator to reschedule the operator when it has unfinished work. use timely::scheduling::Activator; let activations = arranged1.stream.scope().activations().clone(); - let activator = Activator::new(&info.address[..], activations); + let activator = Activator::new(info.address, activations); // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound. // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index ca40ce285..495a77a61 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -338,7 +338,7 @@ where register.get::("differential/arrange") }; - let activator = Some(trace.stream.scope().activator_for(&operator_info.address[..])); + let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone())); let mut empty = T2::new(operator_info.clone(), logger.clone(), activator); // If there is default exert logic set, install it. if let Some(exert_logic) = trace.stream.scope().config().get::("differential/default_exert_logic").cloned() { diff --git a/src/trace/description.rs b/src/trace/description.rs index 99c4e6344..80dae4c5d 100644 --- a/src/trace/description.rs +++ b/src/trace/description.rs @@ -55,7 +55,6 @@ //! will often be a logic bug, as `since` does not advance without a corresponding advance in //! times at which data may possibly be sent. -use abomonation_derive::Abomonation; use timely::{PartialOrder, progress::Antichain}; use serde::{Serialize, Deserialize}; @@ -66,7 +65,7 @@ use serde::{Serialize, Deserialize}; /// frontier indicates a moment at which the times were observed. If `since` is strictly in /// advance of `lower`, the contained times may be "advanced" to times which appear equivalent to /// any time after `since`. -#[derive(Clone, Debug, Abomonation, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Description