Skip to content

Commit

Permalink
Bump dependencies and minor test changes
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Jul 21, 2023
1 parent ff9ea45 commit e6802a8
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 149 deletions.
219 changes: 105 additions & 114 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 10 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ derivative = "2.2.0"

# serialization library used for reading the config file (yaml) and serializing
# the messages on the network
serde = { version = "1.0.164", features = ["derive"] }
serde_yaml = "0.9.22"
serde_json = "1.0.99"
serde = { version = "1.0.174", features = ["derive"] }
serde_yaml = "0.9.25"
serde_json = "1.0.103"
bincode = "1.3.3"

# handy Result type
anyhow = "1.0.71"
thiserror = "1.0.40"
anyhow = "1.0.72"
thiserror = "1.0.44"

# handy iterators functions
itertools = "0.11.0"
Expand All @@ -53,7 +53,7 @@ once_cell = "1.18.0"
ssh2 = { version = "0.9.4", optional = true }
whoami = { version = "1.4.1", optional = true }
shell-escape = { version = "0.1.5", optional = true }
clap = { version = "4.3.9", features = ["derive"], optional = true }
clap = { version = "4.3.17", features = ["derive"], optional = true }
sha2 = { version = "0.10.7", optional = true }
base64 = { version = "0.21.2", optional = true }

Expand All @@ -70,7 +70,7 @@ lazy-init = "0.5.1"
# Faster monotonic clock using libc's CLOCK_MONOTONIC_COARSE
coarsetime = "0.1.23"

tokio = { version = "1.29.0", features = ["rt"], default-features = false, optional = true }
tokio = { version = "1.29.1", features = ["rt"], default-features = false, optional = true }
futures = { version = "0.3.28", optional = true }

parking_lot = "0.12.1"
Expand All @@ -82,22 +82,20 @@ indexmap = "2.0.0"
tracing = { version = "0.1.37", features = ["log"] }
quick_cache = "0.3.0"

micrometer = "0.2.3"

[dev-dependencies]
# for the tests
env_logger = "0.10.0"
rand = { version = "0.8.5", features = ["small_rng"] }
tempfile = "3.6.0"
tempfile = "3.7.0"
criterion = { version = "0.5.1", features = ["html_reports"] }
fake = "2.6.1"
mimalloc = { version = "0.1.37", default-features = false }
tracing-subscriber = "0.3.17"

micrometer = { version = "0.2.3", features = ["enable"]}
micrometer = { version = "0.2.6", features = ["enable"]}

# for the examples
regex = "1.8.4"
regex = "1.9.1"

# used in the benchmarks
crossbeam-channel = "0.5.8"
Expand Down
4 changes: 2 additions & 2 deletions benches/collatz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use common::*;
fn bench_main(c: &mut Criterion) {
let mut g = c.benchmark_group("wordcount-line");
g.sample_size(SAMPLES);
g.warm_up_time(WARM_UP);
g.measurement_time(DURATION);
// g.warm_up_time(WARM_UP);
// g.measurement_time(DURATION);

for size in [0u32, 1_000, 1_000_000, 100_000_000] {
g.throughput(Throughput::Elements(size as u64));
Expand Down
4 changes: 1 addition & 3 deletions benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use std::time::{Duration, Instant};

use noir::*;

pub const SAMPLES: usize = 20;
pub const WARM_UP: Duration = Duration::from_secs(3);
pub const DURATION: Duration = Duration::from_secs(10);
pub const SAMPLES: usize = 50;

static NONCE: AtomicU16 = AtomicU16::new(1);
const PORT_BASE: u16 = 9090;
Expand Down
4 changes: 2 additions & 2 deletions benches/connected.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ fn connected(input: Stream<(u64, u64), impl Operator<(u64, u64)> + 'static>) {
fn bench_main(c: &mut Criterion) {
let mut g = c.benchmark_group("connected");
g.sample_size(SAMPLES);
g.warm_up_time(WARM_UP);
g.measurement_time(DURATION);
// g.warm_up_time(WARM_UP);
// g.measurement_time(DURATION);

for size in [0, 1_000, 1_000_000, 2_000_000] {
g.throughput(Throughput::Elements(size));
Expand Down
4 changes: 2 additions & 2 deletions benches/nexmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ fn run_query(env: &mut StreamEnvironment, q: &str, n: usize) {
fn bench_main(c: &mut Criterion) {
let mut g = c.benchmark_group("nexmark");
g.sample_size(SAMPLES);
g.warm_up_time(WARM_UP);
g.measurement_time(DURATION);
// g.warm_up_time(WARM_UP);
// g.measurement_time(DURATION);

macro_rules! bench_query {
($q:expr, $n:expr) => {{
Expand Down
4 changes: 2 additions & 2 deletions benches/wordcount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ fn make_file(lines: usize) -> tempfile::NamedTempFile {
fn wordcount_bench(c: &mut Criterion) {
let mut g = c.benchmark_group("wordcount-line");
g.sample_size(SAMPLES);
g.warm_up_time(WARM_UP);
g.measurement_time(DURATION);
// g.warm_up_time(WARM_UP);
// g.measurement_time(DURATION);

for lines in [0, 100, 10_000, 1_000_000] {
let file = make_file(lines as usize);
Expand Down
2 changes: 0 additions & 2 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use noir::prelude::*;

fn main() {
let (config, _args) = EnvironmentConfig::from_args();

let mut env = StreamEnvironment::new(config);

env.spawn_remote_workers();

let source = IteratorSource::new(0..100);
Expand Down
2 changes: 1 addition & 1 deletion src/operator/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ where
rhs,
keyer1,
keyer2,
_key: PhantomData::default(),
_key: PhantomData,
}
}
}
Expand Down
18 changes: 12 additions & 6 deletions src/operator/map_async.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Display;
use std::sync::Arc;
use std::vec::IntoIter as VecIter;

use coarsetime::Instant;
Expand Down Expand Up @@ -103,7 +104,7 @@ where
PreviousOperators: Clone,
{
fn clone(&self) -> Self {
Self::new(self.prev.clone(), self.f.clone(), 0)
Self::new(self.prev.clone(), self.f.clone(), 4)
}
}

Expand Down Expand Up @@ -131,18 +132,23 @@ where
Fut: Future<Output = O> + Send,
PreviousOperators: Operator<I>,
{
pub(super) fn new(prev: PreviousOperators, f: F, _queue: usize) -> Self {
const CH: usize = 4;
pub(super) fn new(prev: PreviousOperators, f: F, buffer: usize) -> Self {
const CH: usize = 2;
let (i_tx, i_rx) = flume::bounded::<Vec<StreamElement<I>>>(CH);
let (o_tx, o_rx) = flume::bounded::<Vec<StreamElement<O>>>(CH);

let ff = f.clone();
let ff = Arc::new(f.clone());
tokio::spawn(async move {
while let Ok(b) = i_rx.recv_async().await {
let v: Vec<_> = futures::stream::iter(b.into_iter())
.then(|el| async {
micrometer::span!(el.map_async(&ff).await, "map_async_call")
.map(|el| {
let ff = ff.clone();
tokio::spawn(async move {
micrometer::span!(el.map_async(ff.as_ref()).await, "map_async_call")
})
})
.buffered(buffer)
.map(Result::unwrap)
.collect()
.await;

Expand Down
6 changes: 3 additions & 3 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ where
}

/// Reorder timestamped items
///
///
/// # Example
/// ### TODO
pub fn reorder(self) -> Stream<I, impl Operator<I>> {
Expand Down Expand Up @@ -618,7 +618,7 @@ where
}
}
},
capacity,
4,
)
})
}
Expand Down Expand Up @@ -647,7 +647,7 @@ where
F: Fn(I) -> Fut + Send + Sync + 'static + Clone,
Fut: futures::Future<Output = O> + Send + 'static,
{
self.add_operator(|prev| MapAsync::new(prev, f, 0))
self.add_operator(|prev| MapAsync::new(prev, f, 4))
}

/// Map the elements of the stream into new elements. Use memoization
Expand Down

0 comments on commit e6802a8

Please sign in to comment.