Skip to content

Commit

Permalink
Update docs and doctests
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Jul 4, 2023
1 parent c5835a3 commit 0319561
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 14 deletions.
3 changes: 1 addition & 2 deletions src/block/structure.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//! This `mod` contains the types and function that model the structure of the blocks and operators
//! for debugging purposes only.
//! Types that describe the structure of an execution graph. For debugging purposes

use std::fmt::{Display, Formatter};

Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! The types for constructing the configuration of the environment.
//! Configuration types used to initialize the [`StreamEnvironment`](crate::StreamEnvironment).
//!
//! See the documentation of [`EnvironmentConfig`] for more details.

Expand Down
2 changes: 1 addition & 1 deletion src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub(crate) struct StreamEnvironmentInner {
/// If you want to use a distributed environment (i.e. using remote workers) you have to spawn them
/// using [`spawn_remote_workers`](StreamEnvironment::spawn_remote_workers) before asking for some stream.
///
/// When all the stream have been registered you have to call [`execute`](StreamEnvironment::execute) that will consume the
/// When all the stream have been registered you have to call [`execute`](StreamEnvironment::execute_blocking) that will consume the
/// environment and start the computation. This function will return when the computation ends.
///
/// TODO: example usage
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ pub(crate) mod worker;

pub type CoordUInt = u64;

/// Re-export of commonly used structs and traits
pub mod prelude {
pub use super::operator::sink::StreamOutput;
pub use super::operator::source::*;
Expand Down
2 changes: 2 additions & 0 deletions src/operator/iteration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Utilities for iteration operators

use std::cell::UnsafeCell;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Condvar, Mutex};
Expand Down
5 changes: 4 additions & 1 deletion src/operator/map_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ where
tokio::spawn(async move {
while let Ok(b) = i_rx.recv_async().await {
let v: Vec<_> = futures::stream::iter(b.into_iter())
.then(|el| el.map_async(&ff))
.then(|el| async {
micrometer::span!(el.map_async(&ff).await, "map_async_call")
})
.collect()
.await;

Expand Down Expand Up @@ -196,6 +198,7 @@ where

#[inline]
fn next(&mut self) -> StreamElement<O> {
micrometer::span!(map_async_next);
loop {
if let Some(el) = self.buffer.as_mut().and_then(Iterator::next) {
return el;
Expand Down
117 changes: 110 additions & 7 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
//! Operators that can be applied to a stream.
//!
//! The actual operator list can be found from the implemented methods of [`Stream`](crate::Stream),
//! [`KeyedStream`](crate::KeyedStream), [`WindowedStream`](crate::WindowedStream) and
//! [`WindowedStream`](crate::WindowedStream).
//! [`KeyedStream`](crate::KeyedStream), [`WindowedStream`](crate::WindowedStream)

use std::fmt::Display;
use std::hash::Hash;
Expand Down Expand Up @@ -69,7 +68,7 @@ mod fold;
mod inspect;
#[cfg(feature = "timestamp")]
mod interval_join;
pub(crate) mod iteration;
pub mod iteration;
pub mod join;
mod key_by;
mod keyed_fold;
Expand Down Expand Up @@ -407,8 +406,10 @@ where
self.add_operator(|prev| Filter::new(prev, predicate))
}

/// # TODO
/// Reorder timestamped items
///
/// # Example
/// ### TODO
pub fn reorder(self) -> Stream<I, impl Operator<I>> {
self.add_operator(|prev| Reorder::new(prev))
}
Expand Down Expand Up @@ -547,6 +548,32 @@ where
self.add_operator(|prev| Map::new(prev, f))
}

/// Map the elements of the stream into new elements by evaluating a future for each one.
/// Use memoization to cache outputs for previously seen inputs.
///
/// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
/// The maximum number of elements to be cached is passed as the `capacity` parameter.
///
/// The outputs are cached according to the key produced by the `fk` function.
///
/// ## Example
///
/// ```
/// # use noir::{StreamEnvironment, EnvironmentConfig};
/// # use noir::operator::source::IteratorSource;
/// # tokio::runtime::Runtime::new()
/// # .unwrap()
/// # .block_on(base());
/// # async fn base() {
/// # let mut env = StreamEnvironment::new(EnvironmentConfig::local(1));
/// let s = env.stream_iter(5..15);
/// let res = s.map_async_memo_by(
/// |n| async move {(n * n) % 7}, |n| n % 7, 5
/// ).collect_vec();
/// env.execute().await;
/// assert_eq!(res.get().unwrap(), vec![4, 1, 0, 1, 4, 2, 2, 4, 1, 0]);
/// # }
/// ```
#[cfg(feature = "async-tokio")]
pub fn map_async_memo_by<O, K, F, Fk, Fut>(
self,
Expand Down Expand Up @@ -596,6 +623,24 @@ where
})
}

/// Map the elements of the stream into new elements by evaluating a future for each one.
///
/// ## Example
///
/// ```
/// # use noir::{StreamEnvironment, EnvironmentConfig};
/// # use noir::operator::source::IteratorSource;
/// # tokio::runtime::Runtime::new()
/// # .unwrap()
/// # .block_on(base());
/// # async fn base() {
/// # let mut env = StreamEnvironment::new(EnvironmentConfig::local(1));
/// let s = env.stream_iter(5..15);
/// let res = s.map_async(|n| async move {(n * n) % 7}).collect_vec();
/// env.execute().await;
/// assert_eq!(res.get().unwrap(), vec![4, 1, 0, 1, 4, 2, 2, 4, 1, 0]);
/// # }
/// ```
#[cfg(feature = "async-tokio")]
pub fn map_async<O: Data, F, Fut>(self, f: F) -> Stream<O, impl Operator<O>>
where
Expand All @@ -605,7 +650,27 @@ where
self.add_operator(|prev| MapAsync::new(prev, f, 0))
}

/// # TODO
/// Map the elements of the stream into new elements. Use memoization
/// to cache outputs for previously seen inputs.
///
/// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
/// The maximum number of elements to be cached is passed as the `capacity` parameter.
///
/// The outputs are cached according to the key produced by the `fk` function.
///
/// ## Example
///
/// ```
/// # use noir::{StreamEnvironment, EnvironmentConfig};
/// # use noir::operator::source::IteratorSource;
/// # let mut env = StreamEnvironment::new(EnvironmentConfig::local(1));
/// let s = env.stream(IteratorSource::new((5..15)));
/// let res = s.map_memo_by(|n| (n * n) % 7, |n| n % 7, 5).collect_vec();
///
/// env.execute_blocking();
///
/// assert_eq!(res.get().unwrap(), vec![4, 1, 0, 1, 4, 2, 2, 4, 1, 0]);
/// ```
pub fn map_memo_by<K: DataKey + Sync, O: Data + Sync, F, Fk>(
self,
f: F,
Expand Down Expand Up @@ -1819,8 +1884,28 @@ where
I: Data + Hash + Eq + Sync,
Op: Operator<I> + 'static,
{
/// # TODO
/// Map the elements of the stream into new elements by evaluating a future for each one.
/// Use memoization to cache outputs for previously seen inputs.
///
/// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
/// The maximum number of elements to be cached is passed as the `capacity` parameter.
///
/// ## Example
///
/// ```
/// # use noir::{StreamEnvironment, EnvironmentConfig};
/// # use noir::operator::source::IteratorSource;
/// # tokio::runtime::Runtime::new()
/// # .unwrap()
/// # .block_on(base());
/// # async fn base() {
/// # let mut env = StreamEnvironment::new(EnvironmentConfig::local(1));
/// let s = env.stream_iter((0..4).cycle().take(10));
/// let res = s.map_async_memo(|n| async move {n * n}, 100).collect_vec();
/// env.execute().await;
/// assert_eq!(res.get().unwrap(), vec![0, 1, 4, 9, 0, 1, 4, 9, 0, 1]);
/// # }
/// ```
#[cfg(feature = "async-tokio")]
pub fn map_async_memo<O: Data + Sync, F, Fut>(
self,
Expand All @@ -1840,8 +1925,26 @@ where
I: Data + Hash + Eq + Sync,
Op: Operator<I> + 'static,
{
/// # TODO
/// Map the elements of the stream into new elements. Use memoization
/// to cache outputs for previously seen inputs.
///
/// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
/// The maximum number of elements to be cached is passed as the `capacity` parameter.
///
/// ## Example
///
/// ```
/// # use noir::{StreamEnvironment, EnvironmentConfig};
/// # use noir::operator::source::IteratorSource;
/// # let mut env = StreamEnvironment::new(EnvironmentConfig::local(1));
/// let s = env.stream_iter((0..4).cycle().take(10));
/// let res = s.map_memo(|n| n * n, 5).collect_vec();
///
/// env.execute_blocking();
///
/// assert_eq!(res.get().unwrap(), vec![0, 1, 4, 9, 0, 1, 4, 9, 0, 1]);
/// ```

pub fn map_memo<O: Data + Sync, F>(self, f: F, capacity: usize) -> Stream<O, impl Operator<O>>
where
F: Fn(I) -> O + Send + Clone + 'static,
Expand Down
4 changes: 2 additions & 2 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ pub type ReplicaId = CoordUInt;
type BlockInitFn =
Box<dyn FnOnce(&mut ExecutionMetadata) -> (JoinHandle<()>, BlockStructure) + Send>;

/// Metadata associated to a block in the execution graph.
/// Metadata used to initialize a block at the start of an execution
#[derive(Debug)]
pub struct ExecutionMetadata<'a> {
/// The coordinate of the block (it's id, replica id, ...).
/// The coordinate of the block
pub coord: Coord,
/// The list of replicas of this block.
pub replicas: Vec<Coord>,
Expand Down

0 comments on commit 0319561

Please sign in to comment.