Skip to content

Commit

Permalink
Allow custom exertion logic
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 7, 2023
1 parent 2b9ac68 commit 718015a
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 55 deletions.
23 changes: 14 additions & 9 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,15 +563,22 @@ where
// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();

let (activator, effort) =
let activator = Some(self.scope().activator_for(&info.address[..]));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);

// If idle merge effort exists, configure aggressive idle merging logic.
if let Some(effort) = self.inner.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.scope().activator_for(&info.address[..])), Some(effort))
empty_trace.set_exert_logic(Some(Box::new(move |batches| {
let mut non_empty = 0;
for (_index, count, length) in batches {
if count > 1 { return Some(effort as usize); }
if length > 0 { non_empty += 1; }
if non_empty > 1 { return Some(effort as usize); }
}
None
})));
}
else {
(None, None)
};

let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);

*reader = Some(reader_local);
Expand Down Expand Up @@ -672,9 +679,7 @@ where
prev_frontier.extend(input.frontier().frontier().iter().cloned());
}

if let Some(mut fuel) = effort.clone() {
writer.exert(&mut fuel);
}
writer.exert();
}
})
};
Expand Down
29 changes: 16 additions & 13 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,20 +165,25 @@ where
register.get::<::logging::DifferentialEvent>("differential/arrange")
};

// Establish compaction effort to apply even without updates.
let (activator, effort) =
if let Some(effort) = stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(stream.scope().activator_for(&info.address[..])), Some(effort))
}
else {
(None, None)
};

// Tracks the lower envelope of times in `priority_queue`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut buffer = Vec::new();
// Form the trace we will both use internally and publish.
let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
let activator = Some(stream.scope().activator_for(&info.address[..]));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
// If idle merge effort exists, configure aggressive idle merging logic.
if let Some(effort) = stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
empty_trace.set_exert_logic(Some(Box::new(move |batches| {
let mut non_empty = 0;
for (_index, count, length) in batches {
if count > 1 { return Some(effort as usize); }
if length > 0 { non_empty += 1; }
if non_empty > 1 { return Some(effort as usize); }
}
None
})));
}

let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
// Capture the reader outside the builder scope.
*reader = Some(reader_local.clone());
Expand Down Expand Up @@ -334,9 +339,7 @@ where
reader_local.set_physical_compaction(prev_frontier.borrow());
}

if let Some(mut fuel) = effort.clone() {
writer.exert(&mut fuel);
}
writer.exert();
}
})
};
Expand Down
4 changes: 2 additions & 2 deletions src/operators/arrange/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ where
}

/// Exerts merge effort, even without additional updates.
pub fn exert(&mut self, fuel: &mut isize) {
pub fn exert(&mut self) {
if let Some(trace) = self.trace.upgrade() {
trace.borrow_mut().trace.exert(fuel);
trace.borrow_mut().trace.exert();
}
}

Expand Down
24 changes: 13 additions & 11 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,17 +351,21 @@ where
register.get::<::logging::DifferentialEvent>("differential/arrange")
};

// Determine if we should regularly exert the trace maintenance machinery,
// and with what amount of effort each time.
let (activator, effort) =
let activator = Some(self.stream.scope().activator_for(&operator_info.address[..]));
let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
// If idle merge effort exists, configure aggressive idle merging logic.
if let Some(effort) = self.stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.stream.scope().activator_for(&operator_info.address[..])), Some(effort))
empty.set_exert_logic(Some(Box::new(move |batches| {
let mut non_empty = 0;
for (_index, count, length) in batches {
if count > 1 { return Some(effort as usize); }
if length > 0 { non_empty += 1; }
if non_empty > 1 { return Some(effort as usize); }
}
None
})));
}
else {
(None, None)
};

let empty = T2::new(operator_info.clone(), logger.clone(), activator);
let mut source_trace = self.trace.clone();

let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
Expand Down Expand Up @@ -629,9 +633,7 @@ where
}

// Exert trace maintenance if we have been so requested.
if let Some(mut fuel) = effort.clone() {
output_writer.exert(&mut fuel);
}
output_writer.exert();
}
}
)
Expand Down
57 changes: 39 additions & 18 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub struct Spine<B: Batch> where B::Time: Lattice+Ord, B::R: Semigroup {
upper: Antichain<B::Time>,
effort: usize,
activator: Option<timely::scheduling::activate::Activator>,
/// Logic to indicate whether and how many records we should introduce in the absence of actual updates.
exert_logic: Option<Box<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>>>,
}

impl<B> TraceReader for Spine<B>
Expand Down Expand Up @@ -267,19 +269,20 @@ where
/// The units of effort are updates, and the method should be
/// thought of as analogous to inserting as many empty updates,
/// where the trace is permitted to perform proportionate work.
fn exert(&mut self, effort: &mut isize) {
fn exert(&mut self) {
// If there is work to be done, ...
self.tidy_layers();
if !self.reduced() {
// Determine whether we should apply effort independent of updates.
if let Some(effort) = self.exert_effort() {

// If any merges exist, we can directly call `apply_fuel`.
if self.merging.iter().any(|b| b.is_double()) {
self.apply_fuel(effort);
self.apply_fuel(&mut (effort as isize));
}
// Otherwise, we'll need to introduce fake updates to move merges along.
else {
// Introduce an empty batch with roughly *effort number of virtual updates.
let level = (*effort as usize).next_power_of_two().trailing_zeros() as usize;
let level = effort.next_power_of_two().trailing_zeros() as usize;
self.introduce_batch(None, level);
}
// We were not in reduced form, so let's check again in the future.
Expand All @@ -289,6 +292,10 @@ where
}
}

fn set_exert_logic(&mut self, logic: Option<Box<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>>>) {
self.exert_logic = logic;
}

// Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin
// merging the batch. This means it is a good time to perform amortized work proportional
// to the size of batch.
Expand Down Expand Up @@ -388,21 +395,34 @@ where
B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug,
B::R: Semigroup,
{
/// True iff there is at most one non-empty batch in `self.merging`.
///
/// When true, there is no maintenance work to perform in the trace, other than compaction.
/// We do not yet have logic in place to determine if compaction would improve a trace, so
/// for now we are ignoring that.
fn reduced(&self) -> bool {
let mut non_empty = 0;
for index in 0 .. self.merging.len() {
if self.merging[index].is_double() { return false; }
if self.merging[index].len() > 0 { non_empty += 1; }
if non_empty > 1 { return false; }
}
true
/// The amount of effort we should exert in the absence of updates.
fn exert_effort(&self) -> Option<usize> {
self.exert_logic.as_ref().and_then(|l| (**l)(
Box::new(self.merging.iter().enumerate().rev().map(|(index, batch)| {
match batch {
MergeState::Vacant => (index, 0, 0),
MergeState::Single(_) => (index, 1, batch.len()),
MergeState::Double(_) => (index, 2, batch.len()),
}
}))
))
}

// /// True iff there is at most one non-empty batch in `self.merging`.
// ///
// /// When true, there is no maintenance work to perform in the trace, other than compaction.
// /// We do not yet have logic in place to determine if compaction would improve a trace, so
// /// for now we are ignoring that.
// fn reduced(&self) -> bool {
// let mut non_empty = 0;
// for index in 0 .. self.merging.len() {
// if self.merging[index].is_double() { return false; }
// if self.merging[index].len() > 0 { non_empty += 1; }
// if non_empty > 1 { return false; }
// }
// true
// }

/// Describes the merge progress of layers in the trace.
///
/// Intended for diagnostics rather than public consumption.
Expand Down Expand Up @@ -443,6 +463,7 @@ where
upper: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
effort,
activator,
exert_logic: None,
}
}

Expand Down Expand Up @@ -483,7 +504,7 @@ where
}

// Having performed all of our work, if more than one batch remains reschedule ourself.
if !self.reduced() {
if !self.exert_effort().is_some() {
if let Some(activator) = &self.activator {
activator.activate();
}
Expand Down
11 changes: 9 additions & 2 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,15 @@ where <Self as TraceReader>::Batch: Batch {
activator: Option<timely::scheduling::activate::Activator>,
) -> Self;

/// Exert merge effort, even without updates.
fn exert(&mut self, effort: &mut isize);
/// Exert merge effort, even without updates.
fn exert(&mut self);

/// Sets the logic for exertion in the absence of updates.
///
/// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`,
/// indicating the level, the number of batches, and their total length in updates. It should return a number of
/// updates to perform, or `None` if no work is required.
fn set_exert_logic(&mut self, logic: Option<Box<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>>>);

/// Introduces a batch of updates to the trace.
///
Expand Down

0 comments on commit 718015a

Please sign in to comment.