Skip to content

Commit

Permalink
Minor fixes, formatting, dependency bump
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed May 27, 2024
1 parent 33a4cba commit 9cc749d
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 170 deletions.
268 changes: 142 additions & 126 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ nanorand = "0.7.0"
derivative = "2.2.0"

# serialization
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.116"
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
bincode = "1.3.3"
toml = "0.8.12"
toml = "0.8.13"

thiserror = "1.0.58"
thiserror = "1.0.61"

# handy iterators functions

Expand All @@ -52,7 +52,7 @@ whoami = { version = "1.5.1", optional = true }
shell-escape = { version = "0.1.5", optional = true }
clap = { version = "4.5.4", features = ["derive"], optional = true }
sha2 = { version = "0.10.8", optional = true }
base64 = { version = "0.22.0", optional = true }
base64 = { version = "0.22.1", optional = true }

# channel implementation
flume = "0.11.0"
Expand All @@ -69,7 +69,7 @@ coarsetime = "0.1.34"
tokio = { version = "1.37.0", features = ["rt"], default-features = false, optional = true }
futures = { version = "0.3.30", optional = true }

parking_lot = "0.12.1"
parking_lot = "0.12.3"

wyhash = "0.5.0"
fxhash = "0.2.1"
Expand All @@ -88,7 +88,7 @@ rand = { version = "0.8.5", features = ["small_rng"] }
tempfile = "3.10.1"
criterion = { version = "0.5.1", features = ["html_reports"] }
fake = "2.9.2"
mimalloc = { version = "0.1.39", default-features = false }
mimalloc = { version = "0.1.42", default-features = false }
tracing-subscriber = "0.3.18"
itertools = "0.12.1"

Expand Down
20 changes: 14 additions & 6 deletions examples/avro_rw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use serde::{Deserialize, Serialize};

#[derive(Debug, Parser)]
struct Options {
#[clap(short,long)]
#[clap(short, long)]
input: Option<PathBuf>,

#[clap(short,long)]
#[clap(short, long)]
output: PathBuf,
}

Expand All @@ -30,12 +30,20 @@ fn main() {
let source = if let Some(input) = opts.input {
ctx.stream_avro(input).into_boxed()
} else {
ctx.stream_iter((0..100).map(|i| InputType{ s: format!("{i:o}"), num: i })).into_boxed()
ctx.stream_iter((0..100).map(|i| InputType {
s: format!("{i:o}"),
num: i,
}))
.into_boxed()
};

source.inspect(|e| eprintln!("{e:?}"))
.map(|mut e| { e.num *= 2; e })
source
.inspect(|e| eprintln!("{e:?}"))
.map(|mut e| {
e.num *= 2;
e
})
.write_avro(opts.output);

ctx.execute_blocking();
}
}
10 changes: 4 additions & 6 deletions src/operator/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ where
}

pub struct BoxedOperator<O> {
pub(crate) op: Box<dyn DynOperator<Out=O> + 'static + Send>,
pub(crate) op: Box<dyn DynOperator<Out = O> + 'static + Send>,
}

impl<T> Clone for BoxedOperator<T> {
Expand All @@ -66,10 +66,8 @@ impl<T> Display for BoxedOperator<T> {
}

impl<O: Data> BoxedOperator<O> {
pub fn new<Op: Operator<Out=O> + 'static>(op: Op) -> Self {
Self {
op: Box::new(op),
}
pub fn new<Op: Operator<Out = O> + 'static>(op: Op) -> Self {
Self { op: Box::new(op) }
}
}

Expand All @@ -95,7 +93,7 @@ where
Op::Out: Clone + Send + 'static,
{
/// Erase operator type using dynamic dispatching.
///
///
/// Use only when strictly necessary as it is decrimental for performance.
pub fn into_boxed(self) -> Stream<BoxedOperator<Op::Out>> {
self.add_operator(|prev| BoxedOperator::new(prev))
Expand Down
4 changes: 1 addition & 3 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use std::fmt::Display;
use std::hash::Hash;
use std::ops::{AddAssign, Div};
use std::path::PathBuf;

use flume::{unbounded, Receiver};
#[cfg(feature = "tokio")]
Expand All @@ -26,7 +25,6 @@ use crate::{BatchMode, KeyedStream, Stream};
#[cfg(feature = "tokio")]
use self::map_async::MapAsync;
use self::map_memo::MapMemo;
use self::sink::avro::AvroSink;
use self::sink::collect::Collect;
use self::sink::collect_channel::CollectChannelSink;
use self::sink::collect_count::CollectCountSink;
Expand Down Expand Up @@ -60,6 +58,7 @@ use self::{
#[cfg(feature = "timestamp")]
mod add_timestamps;
mod batch_mode;
mod boxed;
pub(crate) mod end;
mod filter;
mod filter_map;
Expand Down Expand Up @@ -88,7 +87,6 @@ pub mod source;
mod start;
pub mod window;
mod zip;
mod boxed;

/// Marker trait that all the types inside a stream should implement.
pub trait Data: Clone + Send + 'static {}
Expand Down
15 changes: 6 additions & 9 deletions src/operator/sink/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ use serde::Serialize;
use std::fmt::Display;
use std::fs::File;
use std::io::BufWriter;
use std::marker::PhantomData;
use std::path::PathBuf;

use crate::block::{BlockStructure, OperatorKind, OperatorStructure};
use crate::operator::sink::StreamOutputRef;
use crate::operator::{ExchangeData, Operator, StreamElement};
use crate::operator::{Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;
use crate::Stream;

Expand Down Expand Up @@ -67,6 +65,7 @@ where
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&self.path)
.unwrap_or_else(|err| {
panic!(
Expand All @@ -75,7 +74,6 @@ where
)
});


let buf_writer = BufWriter::new(file);
self.writer = Some(buf_writer);
}
Expand Down Expand Up @@ -113,11 +111,11 @@ where
}
}

impl<Op: Operator> Stream<Op> where
impl<Op: Operator> Stream<Op>
where
Op: 'static,
Op::Out: AvroSchema + Serialize
Op::Out: AvroSchema + Serialize,
{

/// Apply the given function to all the elements of the stream, consuming the stream.
///
/// ## Example
Expand All @@ -131,8 +129,7 @@ impl<Op: Operator> Stream<Op> where
///
/// env.execute_blocking();
/// ```
pub fn write_avro<P: Into<PathBuf>>(self, path: P)
{
pub fn write_avro<P: Into<PathBuf>>(self, path: P) {
self.add_operator(|prev| AvroSink::new(prev, path))
.finalize_block();
}
Expand Down
2 changes: 1 addition & 1 deletion src/operator/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

use std::sync::{Arc, Mutex};

pub(super) mod avro;
pub(super) mod collect;
pub(super) mod collect_channel;
pub(super) mod collect_count;
pub(super) mod collect_vec;
pub(super) mod for_each;
pub(super) mod avro;

pub(crate) type StreamOutputRef<Out> = Arc<Mutex<Option<Out>>>;

Expand Down
22 changes: 12 additions & 10 deletions src/operator/source/avro.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::fmt::Display;
use std::fs::File;
use std::io;
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::io::BufReader;
use std::marker::PhantomData;
use std::path::PathBuf;

Expand Down Expand Up @@ -85,7 +84,7 @@ impl<Out: Data + for<'a> Deserialize<'a>> Source for AvroSource<Out> {
impl<Out: Data + for<'a> Deserialize<'a>> Operator for AvroSource<Out> {
type Out = Out;

fn setup(&mut self, metadata: &mut ExecutionMetadata) {
fn setup(&mut self, _metadata: &mut ExecutionMetadata) {
// let global_id = metadata.global_id;
// let instances = metadata.replicas.len();

Expand Down Expand Up @@ -118,7 +117,10 @@ impl<Out: Data + for<'a> Deserialize<'a>> Operator for AvroSource<Out> {
match reader.next() {
Some(Ok(el)) => {
tracing::trace!("avro Value: {el:?}");
StreamElement::Item(apache_avro::from_value(&el).expect("could not deserialize from avro Value to specified type"))
StreamElement::Item(
apache_avro::from_value(&el)
.expect("could not deserialize from avro Value to specified type"),
)
}
Some(Err(e)) => panic!("Error while reading Aveo file: {:?}", e),
None => {
Expand Down Expand Up @@ -163,14 +165,14 @@ impl crate::StreamContext {

#[cfg(test)]
mod tests {
use std::io::Write;
// use std::io::Write;

use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tempfile::NamedTempFile;
// use itertools::Itertools;
// use serde::{Deserialize, Serialize};
// use tempfile::NamedTempFile;

use crate::config::RuntimeConfig;
use crate::environment::StreamContext;
// use crate::config::RuntimeConfig;
// use crate::environment::StreamContext;
// use crate::operator::source::AvroSource;

// #[test]
Expand Down
4 changes: 2 additions & 2 deletions src/operator/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
pub use self::csv::*;
#[cfg(feature = "tokio")]
pub use async_stream::*;
pub use avro::*;
pub use channel::*;
pub use file::*;
pub use iterator::*;
pub use parallel_iterator::*;
pub use avro::*;

use crate::{block::Replication, operator::Operator};

#[cfg(feature = "tokio")]
mod async_stream;
mod avro;
mod channel;
mod csv;
mod file;
mod iterator;
mod parallel_iterator;
mod avro;

/// This trait marks all the operators that can be used as sinks.
pub trait Source: Operator {
Expand Down
Loading

0 comments on commit 9cc749d

Please sign in to comment.