From bd12951f98374c595abbb5bca2b99cb754aa9e47 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 6 Nov 2023 19:15:13 -0500 Subject: [PATCH 1/3] Allow custom exertion logic --- src/operators/arrange/arrangement.rs | 23 ++++++----- src/operators/arrange/upsert.rs | 29 +++++++------- src/operators/arrange/writer.rs | 4 +- src/operators/reduce.rs | 24 ++++++------ src/trace/implementations/spine_fueled.rs | 47 +++++++++++++---------- src/trace/mod.rs | 11 +++++- 6 files changed, 81 insertions(+), 57 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index a2dd3e1b0..0633c8c4d 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -563,15 +563,22 @@ where // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); - let (activator, effort) = + let activator = Some(self.scope().activator_for(&info.address[..])); + let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); + + // If idle merge effort exists, configure aggressive idle merging logic. if let Some(effort) = self.inner.scope().config().get::("differential/idle_merge_effort").cloned() { - (Some(self.scope().activator_for(&info.address[..])), Some(effort)) + empty_trace.set_exert_logic(Some(Box::new(move |batches| { + let mut non_empty = 0; + for (_index, count, length) in batches { + if count > 1 { return Some(effort as usize); } + if length > 0 { non_empty += 1; } + if non_empty > 1 { return Some(effort as usize); } + } + None + }))); } - else { - (None, None) - }; - let empty_trace = Tr::new(info.clone(), logger.clone(), activator); let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); *reader = Some(reader_local); @@ -672,9 +679,7 @@ where prev_frontier.extend(input.frontier().frontier().iter().cloned()); } - if let Some(mut fuel) = effort.clone() { - writer.exert(&mut fuel); - } + writer.exert(); } }) }; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 468398ae6..dbcd38c8e 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -165,20 +165,25 @@ where register.get::<::logging::DifferentialEvent>("differential/arrange") }; - // Establish compaction effort to apply even without updates. - let (activator, effort) = - if let Some(effort) = stream.scope().config().get::("differential/idle_merge_effort").cloned() { - (Some(stream.scope().activator_for(&info.address[..])), Some(effort)) - } - else { - (None, None) - }; - // Tracks the lower envelope of times in `priority_queue`. let mut capabilities = Antichain::>::new(); let mut buffer = Vec::new(); // Form the trace we will both use internally and publish. - let empty_trace = Tr::new(info.clone(), logger.clone(), activator); + let activator = Some(stream.scope().activator_for(&info.address[..])); + let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); + // If idle merge effort exists, configure aggressive idle merging logic. + if let Some(effort) = stream.scope().config().get::("differential/idle_merge_effort").cloned() { + empty_trace.set_exert_logic(Some(Box::new(move |batches| { + let mut non_empty = 0; + for (_index, count, length) in batches { + if count > 1 { return Some(effort as usize); } + if length > 0 { non_empty += 1; } + if non_empty > 1 { return Some(effort as usize); } + } + None + }))); + } + let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); // Capture the reader outside the builder scope. *reader = Some(reader_local.clone()); @@ -334,9 +339,7 @@ where reader_local.set_physical_compaction(prev_frontier.borrow()); } - if let Some(mut fuel) = effort.clone() { - writer.exert(&mut fuel); - } + writer.exert(); } }) }; diff --git a/src/operators/arrange/writer.rs b/src/operators/arrange/writer.rs index a42924b75..b53f298f1 100644 --- a/src/operators/arrange/writer.rs +++ b/src/operators/arrange/writer.rs @@ -52,9 +52,9 @@ where } /// Exerts merge effort, even without additional updates. - pub fn exert(&mut self, fuel: &mut isize) { + pub fn exert(&mut self) { if let Some(trace) = self.trace.upgrade() { - trace.borrow_mut().trace.exert(fuel); + trace.borrow_mut().trace.exert(); } } diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 45ad92460..712843e1e 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -351,17 +351,21 @@ where register.get::<::logging::DifferentialEvent>("differential/arrange") }; - // Determine if we should regularly exert the trace maintenance machinery, - // and with what amount of effort each time. - let (activator, effort) = + let activator = Some(self.stream.scope().activator_for(&operator_info.address[..])); + let mut empty = T2::new(operator_info.clone(), logger.clone(), activator); + // If idle merge effort exists, configure aggressive idle merging logic. if let Some(effort) = self.stream.scope().config().get::("differential/idle_merge_effort").cloned() { - (Some(self.stream.scope().activator_for(&operator_info.address[..])), Some(effort)) + empty.set_exert_logic(Some(Box::new(move |batches| { + let mut non_empty = 0; + for (_index, count, length) in batches { + if count > 1 { return Some(effort as usize); } + if length > 0 { non_empty += 1; } + if non_empty > 1 { return Some(effort as usize); } + } + None + }))); } - else { - (None, None) - }; - let empty = T2::new(operator_info.clone(), logger.clone(), activator); let mut source_trace = self.trace.clone(); let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); @@ -629,9 +633,7 @@ where } // Exert trace maintenance if we have been so requested. - if let Some(mut fuel) = effort.clone() { - output_writer.exert(&mut fuel); - } + output_writer.exert(); } } ) diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index f3b61ec94..4425b46c5 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -97,6 +97,8 @@ pub struct Spine where B::Time: Lattice+Ord, B::R: Semigroup { upper: Antichain, effort: usize, activator: Option, + /// Logic to indicate whether and how many records we should introduce in the absence of actual updates. + exert_logic: Option Fn(Box+'a>)->Option>>, } impl TraceReader for Spine @@ -264,22 +266,21 @@ where /// Apply some amount of effort to trace maintenance. /// - /// The units of effort are updates, and the method should be - /// thought of as analogous to inserting as many empty updates, - /// where the trace is permitted to perform proportionate work. - fn exert(&mut self, effort: &mut isize) { + /// Whether and how much effort to apply is determined by `self.exert_logic`, a closure the user can set. + fn exert(&mut self) { // If there is work to be done, ... self.tidy_layers(); - if !self.reduced() { + // Determine whether we should apply effort independent of updates. + if let Some(effort) = self.exert_effort() { // If any merges exist, we can directly call `apply_fuel`. if self.merging.iter().any(|b| b.is_double()) { - self.apply_fuel(effort); + self.apply_fuel(&mut (effort as isize)); } // Otherwise, we'll need to introduce fake updates to move merges along. else { // Introduce an empty batch with roughly *effort number of virtual updates. - let level = (*effort as usize).next_power_of_two().trailing_zeros() as usize; + let level = effort.next_power_of_two().trailing_zeros() as usize; self.introduce_batch(None, level); } // We were not in reduced form, so let's check again in the future. @@ -289,6 +290,10 @@ where } } + fn set_exert_logic(&mut self, logic: Option Fn(Box+'a>)->Option>>) { + self.exert_logic = logic; + } + // Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin // merging the batch. This means it is a good time to perform amortized work proportional // to the size of batch. @@ -388,19 +393,20 @@ where B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, B::R: Semigroup, { - /// True iff there is at most one non-empty batch in `self.merging`. + /// Determine the amount of effort we should exert in the absence of updates. /// - /// When true, there is no maintenance work to perform in the trace, other than compaction. - /// We do not yet have logic in place to determine if compaction would improve a trace, so - /// for now we are ignoring that. - fn reduced(&self) -> bool { - let mut non_empty = 0; - for index in 0 .. self.merging.len() { - if self.merging[index].is_double() { return false; } - if self.merging[index].len() > 0 { non_empty += 1; } - if non_empty > 1 { return false; } - } - true + /// This method prepares an iterator over batches, including the level, count, and length of each layer. + /// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply. + fn exert_effort(&self) -> Option { + self.exert_logic.as_ref().and_then(|l| (**l)( + Box::new(self.merging.iter().enumerate().rev().map(|(index, batch)| { + match batch { + MergeState::Vacant => (index, 0, 0), + MergeState::Single(_) => (index, 1, batch.len()), + MergeState::Double(_) => (index, 2, batch.len()), + } + })) + )) } /// Describes the merge progress of layers in the trace. @@ -443,6 +449,7 @@ where upper: Antichain::from_elem(::minimum()), effort, activator, + exert_logic: None, } } @@ -483,7 +490,7 @@ where } // Having performed all of our work, if more than one batch remains reschedule ourself. - if !self.reduced() { + if !self.exert_effort().is_some() { if let Some(activator) = &self.activator { activator.activate(); } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 6e411475e..4593629e6 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -208,8 +208,15 @@ where ::Batch: Batch { activator: Option, ) -> Self; - /// Exert merge effort, even without updates. - fn exert(&mut self, effort: &mut isize); + /// Exert merge effort, even without updates. + fn exert(&mut self); + + /// Sets the logic for exertion in the absence of updates. + /// + /// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`, + /// indicating the level, the number of batches, and their total length in updates. It should return a number of + /// updates to perform, or `None` if no work is required. + fn set_exert_logic(&mut self, logic: Option Fn(Box+'a>)->Option>>); /// Introduces a batch of updates to the trace. /// From 612fc91b443eca6f17876b304c0c22a346b036b8 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 7 Nov 2023 07:19:31 -0500 Subject: [PATCH 2/3] Extract logic to config parameter --- src/lib.rs | 12 ++++++++++++ src/operators/arrange/arrangement.rs | 19 +++++-------------- src/operators/arrange/upsert.rs | 16 ++++------------ src/operators/reduce.rs | 17 +++++------------ src/trace/implementations/spine_fueled.rs | 12 ++++++------ src/trace/mod.rs | 5 ++++- 6 files changed, 36 insertions(+), 45 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c6b10904a..2b18bd796 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -140,5 +140,17 @@ impl Config { pub fn configure(config: &mut timely::WorkerConfig, options: &Config) { if let Some(effort) = options.idle_merge_effort { config.set("differential/idle_merge_effort".to_string(), effort); + config.set::( + "differential/default_exert_logic".to_string(), + std::sync::Arc::new(move |batches| { + let mut non_empty = 0; + for (_index, count, length) in batches { + if count > 1 { return Some(effort as usize); } + if length > 0 { non_empty += 1; } + if non_empty > 1 { return Some(effort as usize); } + } + None + }), + ); } } diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 0633c8c4d..1e94ed130 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -29,11 +29,11 @@ use timely::dataflow::operators::Capability; use ::{Data, ExchangeData, Collection, AsCollection, Hashable}; use ::difference::Semigroup; use lattice::Lattice; -use trace::{Trace, TraceReader, Batch, BatchReader, Batcher, Cursor}; +use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor}; use trace::implementations::ord::OrdValSpine as DefaultValTrace; use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; -use trace::wrappers::enter::{TraceEnter, BatchEnter}; +use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; use trace::wrappers::enter_at::BatchEnter as BatchEnterAt; use trace::wrappers::filter::{TraceFilter, BatchFilter}; @@ -565,18 +565,9 @@ where let activator = Some(self.scope().activator_for(&info.address[..])); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - - // If idle merge effort exists, configure aggressive idle merging logic. - if let Some(effort) = self.inner.scope().config().get::("differential/idle_merge_effort").cloned() { - empty_trace.set_exert_logic(Some(Box::new(move |batches| { - let mut non_empty = 0; - for (_index, count, length) in batches { - if count > 1 { return Some(effort as usize); } - if length > 0 { non_empty += 1; } - if non_empty > 1 { return Some(effort as usize); } - } - None - }))); + // If there is default exertion logic set, install it. + if let Some(exert_logic) = self.inner.scope().config().get::("differential/default_exert_logic").cloned() { + empty_trace.set_exert_logic(exert_logic); } let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index dbcd38c8e..869947ec3 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -117,7 +117,7 @@ use timely::dataflow::operators::Capability; use ::{ExchangeData, Hashable}; use lattice::Lattice; -use trace::{Trace, TraceReader, Batch, Cursor}; +use trace::{self, Trace, TraceReader, Batch, Cursor}; use trace::Builder; @@ -171,17 +171,9 @@ where // Form the trace we will both use internally and publish. let activator = Some(stream.scope().activator_for(&info.address[..])); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - // If idle merge effort exists, configure aggressive idle merging logic. - if let Some(effort) = stream.scope().config().get::("differential/idle_merge_effort").cloned() { - empty_trace.set_exert_logic(Some(Box::new(move |batches| { - let mut non_empty = 0; - for (_index, count, length) in batches { - if count > 1 { return Some(effort as usize); } - if length > 0 { non_empty += 1; } - if non_empty > 1 { return Some(effort as usize); } - } - None - }))); + + if let Some(exert_logic) = stream.scope().config().get::("differential/default_exert_logic").cloned() { + empty_trace.set_exert_logic(exert_logic); } let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 712843e1e..1a0356a80 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -19,7 +19,7 @@ use timely::dataflow::operators::Capability; use operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent}; use lattice::Lattice; -use trace::{Batch, BatchReader, Cursor, Trace, Builder}; +use trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic}; use trace::cursor::CursorList; use trace::implementations::ord::OrdValSpine as DefaultValTrace; use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; @@ -353,19 +353,12 @@ where let activator = Some(self.stream.scope().activator_for(&operator_info.address[..])); let mut empty = T2::new(operator_info.clone(), logger.clone(), activator); - // If idle merge effort exists, configure aggressive idle merging logic. - if let Some(effort) = self.stream.scope().config().get::("differential/idle_merge_effort").cloned() { - empty.set_exert_logic(Some(Box::new(move |batches| { - let mut non_empty = 0; - for (_index, count, length) in batches { - if count > 1 { return Some(effort as usize); } - if length > 0 { non_empty += 1; } - if non_empty > 1 { return Some(effort as usize); } - } - None - }))); + // If there is default exert logic set, install it. + if let Some(exert_logic) = self.stream.scope().config().get::("differential/default_exert_logic").cloned() { + empty.set_exert_logic(exert_logic); } + let mut source_trace = self.trace.clone(); let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 4425b46c5..8acfda51f 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -74,7 +74,7 @@ use std::fmt::Debug; use ::logging::Logger; use ::difference::Semigroup; use lattice::Lattice; -use trace::{Batch, BatchReader, Trace, TraceReader}; +use trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic}; use trace::cursor::{Cursor, CursorList}; use trace::Merger; @@ -98,7 +98,7 @@ pub struct Spine where B::Time: Lattice+Ord, B::R: Semigroup { effort: usize, activator: Option, /// Logic to indicate whether and how many records we should introduce in the absence of actual updates. - exert_logic: Option Fn(Box+'a>)->Option>>, + exert_logic: ExertionLogic, } impl TraceReader for Spine @@ -290,7 +290,7 @@ where } } - fn set_exert_logic(&mut self, logic: Option Fn(Box+'a>)->Option>>) { + fn set_exert_logic(&mut self, logic: ExertionLogic) { self.exert_logic = logic; } @@ -398,7 +398,7 @@ where /// This method prepares an iterator over batches, including the level, count, and length of each layer. /// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply. fn exert_effort(&self) -> Option { - self.exert_logic.as_ref().and_then(|l| (**l)( + (self.exert_logic)( Box::new(self.merging.iter().enumerate().rev().map(|(index, batch)| { match batch { MergeState::Vacant => (index, 0, 0), @@ -406,7 +406,7 @@ where MergeState::Double(_) => (index, 2, batch.len()), } })) - )) + ) } /// Describes the merge progress of layers in the trace. @@ -449,7 +449,7 @@ where upper: Antichain::from_elem(::minimum()), effort, activator, - exert_logic: None, + exert_logic: std::sync::Arc::new(|_batches| None), } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 4593629e6..d9c5e3ca3 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -21,6 +21,9 @@ use timely::progress::Timestamp; pub use self::cursor::Cursor; pub use self::description::Description; +/// A type used to express how much effort a trace should exert even in the absence of updates. +pub type ExertionLogic = std::sync::Arc Fn(Box+'a>)->Option+Send+Sync>; + // The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and // values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys // and vals that change the `Ord` implementation, or stash hash codes, or the like. @@ -216,7 +219,7 @@ where ::Batch: Batch { /// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`, /// indicating the level, the number of batches, and their total length in updates. It should return a number of /// updates to perform, or `None` if no work is required. - fn set_exert_logic(&mut self, logic: Option Fn(Box+'a>)->Option>>); + fn set_exert_logic(&mut self, logic: ExertionLogic); /// Introduces a batch of updates to the trace. /// From 6cab205a96e963accb7c10344393bcfd65e1d462 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 7 Nov 2023 17:54:44 -0500 Subject: [PATCH 3/3] Correct rescheduling logic --- src/trace/implementations/spine_fueled.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 8acfda51f..33edd4d79 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -489,8 +489,8 @@ where } } - // Having performed all of our work, if more than one batch remains reschedule ourself. - if !self.exert_effort().is_some() { + // Having performed all of our work, if we should perform more work reschedule ourselves. + if self.exert_effort().is_some() { if let Some(activator) = &self.activator { activator.activate(); }