Skip to content

Commit

Permalink
Minor async fix
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Jul 25, 2024
1 parent a4736ca commit 586c35f
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
5 changes: 2 additions & 3 deletions src/network/tokio/demultiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use tokio::net::{TcpListener, TcpStream};
#[cfg(feature = "tokio")]
use tokio::task::JoinHandle;

use anyhow::anyhow;
use std::collections::HashMap;
use std::net::ToSocketAddrs;

Expand Down Expand Up @@ -82,12 +81,12 @@ async fn bind_remotes<In: ExchangeData>(
let listener = TcpListener::bind(&*address)
.await
.map_err(|e| {
anyhow!(
panic!(
"Failed to bind socket for {} at {:?}: {:?}",
coord,
address,
e
)
) // TODO
})
.unwrap();
let address = listener
Expand Down
2 changes: 1 addition & 1 deletion src/operator/map_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ where
}

let el = self.prev.next();
let kind = el.take();
let kind = el.variant();

if let Some(b) = self.batcher.enqueue(el) {
self.schedule_batch(b);
Expand Down
19 changes: 11 additions & 8 deletions src/operator/source/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use flume::{bounded, Receiver, RecvError, Sender, TryRecvError};

use crate::block::{BlockStructure, OperatorKind, OperatorStructure, Replication};
use crate::operator::source::Source;
use crate::operator::{Operator, StreamElement};
use crate::operator::{replication, Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;

const MAX_RETRY: u8 = 8;
const MAX_RETRY: u8 = 16;

/// Source that consumes an iterator and emits all its elements into the stream.
///
Expand All @@ -20,6 +20,7 @@ pub struct ChannelSource<Out: Send + 'static> {
rx: Receiver<Out>,
terminated: bool,
retry_count: u8,
replication: Replication,
}

impl<Out: Send> Display for ChannelSource<Out> {
Expand All @@ -31,10 +32,11 @@ impl<Out: Send> Display for ChannelSource<Out> {
impl<Out: Send + 'static> ChannelSource<Out> {
/// Create a new source that reads the items from the iterator provided as input.
///
/// **Note**: this source is **not parallel**, the iterator will be consumed only on a single
/// replica, on all the others no item will be read from the iterator. If you want to achieve
/// parallelism you need to add an operator that shuffles the data (e.g.
/// [`Stream::shuffle`](crate::Stream::shuffle)).
/// **Note**: the replication of this source is determined by the `replication` parameter
/// The channel is an MPMC channel so items will be captured by one and only one of the replicas
/// with no specified order. Developers must take into account this replication when sending items
/// to the channel in order for messages to be delivered to renoir (eg. if replication is One,
/// only the host with id 0 should send messages to the channel)
///
/// ## Example
///
Expand All @@ -47,12 +49,13 @@ impl<Out: Send + 'static> ChannelSource<Out> {
/// tx_channel.send(1);
/// tx_channel.send(2);
/// ```
pub fn new(channel_size: usize) -> (Sender<Out>, Self) {
pub fn new(channel_size: usize, replication: Replication) -> (Sender<Out>, Self) {
let (tx, rx) = bounded(channel_size);
let s = Self {
rx,
terminated: false,
retry_count: 0,
replication,
};

(tx, s)
Expand All @@ -61,7 +64,7 @@ impl<Out: Send + 'static> ChannelSource<Out> {
// TODO: remove Debug requirement
impl<Out: Send + core::fmt::Debug> Source for ChannelSource<Out> {
fn replication(&self) -> Replication {
Replication::One
self.replication
}
}

Expand Down

0 comments on commit 586c35f

Please sign in to comment.