Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow custom exertion logic #411

Merged
merged 3 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,17 @@ impl Config {
pub fn configure(config: &mut timely::WorkerConfig, options: &Config) {
if let Some(effort) = options.idle_merge_effort {
config.set("differential/idle_merge_effort".to_string(), effort);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we're referencing idle_merge_effort anymore, so we should probably remove it here, and extend options to also take a default_excert_logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm up for waiting just a bit to see how this looks before locking things in. I wrote a bunch of code before realizing that it all needs to cross thread boundaries, and if it turns out that it needs to be SerDe also .. what a mess. :D

config.set::<trace::ExertionLogic>(
"differential/default_exert_logic".to_string(),
std::sync::Arc::new(move |batches| {
let mut non_empty = 0;
for (_index, count, length) in batches {
if count > 1 { return Some(effort as usize); }
if length > 0 { non_empty += 1; }
if non_empty > 1 { return Some(effort as usize); }
}
None
}),
);
}
}
20 changes: 8 additions & 12 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use timely::dataflow::operators::Capability;
use ::{Data, ExchangeData, Collection, AsCollection, Hashable};
use ::difference::Semigroup;
use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::implementations::ord::OrdValSpine as DefaultValTrace;
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;

use trace::wrappers::enter::{TraceEnter, BatchEnter};
use trace::wrappers::enter::{TraceEnter, BatchEnter,};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
use trace::wrappers::filter::{TraceFilter, BatchFilter};
Expand Down Expand Up @@ -563,15 +563,13 @@ where
// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();

let (activator, effort) =
if let Some(effort) = self.inner.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.scope().activator_for(&info.address[..])), Some(effort))
let activator = Some(self.scope().activator_for(&info.address[..]));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
// If there is default exertion logic set, install it.
if let Some(exert_logic) = self.inner.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
empty_trace.set_exert_logic(exert_logic);
}
else {
(None, None)
};

let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);

*reader = Some(reader_local);
Expand Down Expand Up @@ -672,9 +670,7 @@ where
prev_frontier.extend(input.frontier().frontier().iter().cloned());
}

if let Some(mut fuel) = effort.clone() {
writer.exert(&mut fuel);
}
writer.exert();
}
})
};
Expand Down
23 changes: 9 additions & 14 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ use timely::dataflow::operators::Capability;

use ::{ExchangeData, Hashable};
use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, Cursor};
use trace::{self, Trace, TraceReader, Batch, Cursor};

use trace::Builder;

Expand Down Expand Up @@ -165,20 +165,17 @@ where
register.get::<::logging::DifferentialEvent>("differential/arrange")
};

// Establish compaction effort to apply even without updates.
let (activator, effort) =
if let Some(effort) = stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(stream.scope().activator_for(&info.address[..])), Some(effort))
}
else {
(None, None)
};

// Tracks the lower envelope of times in `priority_queue`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut buffer = Vec::new();
// Form the trace we will both use internally and publish.
let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
let activator = Some(stream.scope().activator_for(&info.address[..]));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);

if let Some(exert_logic) = stream.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
empty_trace.set_exert_logic(exert_logic);
}

let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
// Capture the reader outside the builder scope.
*reader = Some(reader_local.clone());
Expand Down Expand Up @@ -334,9 +331,7 @@ where
reader_local.set_physical_compaction(prev_frontier.borrow());
}

if let Some(mut fuel) = effort.clone() {
writer.exert(&mut fuel);
}
writer.exert();
}
})
};
Expand Down
4 changes: 2 additions & 2 deletions src/operators/arrange/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ where
}

/// Exerts merge effort, even without additional updates.
pub fn exert(&mut self, fuel: &mut isize) {
pub fn exert(&mut self) {
if let Some(trace) = self.trace.upgrade() {
trace.borrow_mut().trace.exert(fuel);
trace.borrow_mut().trace.exert();
}
}

Expand Down
21 changes: 8 additions & 13 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use timely::dataflow::operators::Capability;

use operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent};
use lattice::Lattice;
use trace::{Batch, BatchReader, Cursor, Trace, Builder};
use trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic};
use trace::cursor::CursorList;
use trace::implementations::ord::OrdValSpine as DefaultValTrace;
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
Expand Down Expand Up @@ -351,17 +351,14 @@ where
register.get::<::logging::DifferentialEvent>("differential/arrange")
};

// Determine if we should regularly exert the trace maintenance machinery,
// and with what amount of effort each time.
let (activator, effort) =
if let Some(effort) = self.stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.stream.scope().activator_for(&operator_info.address[..])), Some(effort))
let activator = Some(self.stream.scope().activator_for(&operator_info.address[..]));
let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
// If there is default exert logic set, install it.
if let Some(exert_logic) = self.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
empty.set_exert_logic(exert_logic);
}
else {
(None, None)
};

let empty = T2::new(operator_info.clone(), logger.clone(), activator);

let mut source_trace = self.trace.clone();

let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
Expand Down Expand Up @@ -629,9 +626,7 @@ where
}

// Exert trace maintenance if we have been so requested.
if let Some(mut fuel) = effort.clone() {
output_writer.exert(&mut fuel);
}
output_writer.exert();
}
}
)
Expand Down
49 changes: 28 additions & 21 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ use std::fmt::Debug;
use ::logging::Logger;
use ::difference::Semigroup;
use lattice::Lattice;
use trace::{Batch, BatchReader, Trace, TraceReader};
use trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic};
use trace::cursor::{Cursor, CursorList};
use trace::Merger;

Expand All @@ -97,6 +97,8 @@ pub struct Spine<B: Batch> where B::Time: Lattice+Ord, B::R: Semigroup {
upper: Antichain<B::Time>,
effort: usize,
activator: Option<timely::scheduling::activate::Activator>,
/// Logic to indicate whether and how many records we should introduce in the absence of actual updates.
exert_logic: ExertionLogic,
}

impl<B> TraceReader for Spine<B>
Expand Down Expand Up @@ -264,22 +266,21 @@ where

/// Apply some amount of effort to trace maintenance.
///
/// The units of effort are updates, and the method should be
/// thought of as analogous to inserting as many empty updates,
/// where the trace is permitted to perform proportionate work.
fn exert(&mut self, effort: &mut isize) {
/// Whether and how much effort to apply is determined by `self.exert_logic`, a closure the user can set.
fn exert(&mut self) {
// If there is work to be done, ...
self.tidy_layers();
if !self.reduced() {
// Determine whether we should apply effort independent of updates.
if let Some(effort) = self.exert_effort() {

// If any merges exist, we can directly call `apply_fuel`.
if self.merging.iter().any(|b| b.is_double()) {
self.apply_fuel(effort);
self.apply_fuel(&mut (effort as isize));
}
// Otherwise, we'll need to introduce fake updates to move merges along.
else {
// Introduce an empty batch with roughly *effort number of virtual updates.
let level = (*effort as usize).next_power_of_two().trailing_zeros() as usize;
let level = effort.next_power_of_two().trailing_zeros() as usize;
self.introduce_batch(None, level);
}
// We were not in reduced form, so let's check again in the future.
Expand All @@ -289,6 +290,10 @@ where
}
}

fn set_exert_logic(&mut self, logic: ExertionLogic) {
self.exert_logic = logic;
}

// Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin
// merging the batch. This means it is a good time to perform amortized work proportional
// to the size of batch.
Expand Down Expand Up @@ -388,19 +393,20 @@ where
B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug,
B::R: Semigroup,
{
/// True iff there is at most one non-empty batch in `self.merging`.
/// Determine the amount of effort we should exert in the absence of updates.
///
/// When true, there is no maintenance work to perform in the trace, other than compaction.
/// We do not yet have logic in place to determine if compaction would improve a trace, so
/// for now we are ignoring that.
fn reduced(&self) -> bool {
let mut non_empty = 0;
for index in 0 .. self.merging.len() {
if self.merging[index].is_double() { return false; }
if self.merging[index].len() > 0 { non_empty += 1; }
if non_empty > 1 { return false; }
}
true
/// This method prepares an iterator over batches, including the level, count, and length of each layer.
/// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply.
fn exert_effort(&self) -> Option<usize> {
(self.exert_logic)(
Box::new(self.merging.iter().enumerate().rev().map(|(index, batch)| {
match batch {
MergeState::Vacant => (index, 0, 0),
MergeState::Single(_) => (index, 1, batch.len()),
MergeState::Double(_) => (index, 2, batch.len()),
}
}))
)
}

/// Describes the merge progress of layers in the trace.
Expand Down Expand Up @@ -443,6 +449,7 @@ where
upper: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
effort,
activator,
exert_logic: std::sync::Arc::new(|_batches| None),
}
}

Expand Down Expand Up @@ -483,7 +490,7 @@ where
}

// Having performed all of our work, if more than one batch remains reschedule ourself.
if !self.reduced() {
if !self.exert_effort().is_some() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if !self.exert_effort().is_some() {
if self.exert_effort().is_none() {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh, I think this is wrong both ways, actually. We want to reschedule if there is exert effort to apply. Oops. Will fix.

if let Some(activator) = &self.activator {
activator.activate();
}
Expand Down
14 changes: 12 additions & 2 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use timely::progress::Timestamp;
pub use self::cursor::Cursor;
pub use self::description::Description;

/// A type used to express how much effort a trace should exert even in the absence of updates.
pub type ExertionLogic = std::sync::Arc<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>+Send+Sync>;

// The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and
// values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys
// and vals that change the `Ord` implementation, or stash hash codes, or the like.
Expand Down Expand Up @@ -208,8 +211,15 @@ where <Self as TraceReader>::Batch: Batch {
activator: Option<timely::scheduling::activate::Activator>,
) -> Self;

/// Exert merge effort, even without updates.
fn exert(&mut self, effort: &mut isize);
/// Exert merge effort, even without updates.
fn exert(&mut self);

/// Sets the logic for exertion in the absence of updates.
///
/// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`,
/// indicating the level, the number of batches, and their total length in updates. It should return a number of
/// updates to perform, or `None` if no work is required.
fn set_exert_logic(&mut self, logic: ExertionLogic);

/// Introduces a batch of updates to the trace.
///
Expand Down