Skip to content

Commit

Permalink
Update Dispatch to impl OnEventRichTimer
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Apr 13, 2024
1 parent 36728d6 commit 94c73ab
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 47 deletions.
11 changes: 6 additions & 5 deletions crates/entropy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use augustus::{
DigestHash as _, H256,
},
event::{
self,
erased::{
session::{Buffered, Sender},
Blanket, Session,
Expand All @@ -39,7 +40,6 @@ use axum::{
routing::{get, post},
Json, Router,
};

use entropy::{
BulkService, CodecWorker, Get, GetOk, MessageNet, Net, Peer, Put, PutOk, SendCodecEvent,
SendFsEvent,
Expand Down Expand Up @@ -296,8 +296,9 @@ async fn start_peer(
// it done)
kademlia::Peer::<_, _, _, BlackHole, _>::new(
buckets,
Box::new(MessageNet::new(dispatch::Net::from(tcp_control_session.sender())))
as Box<dyn kademlia::Net<SocketAddr> + Send + Sync>,
Box::new(MessageNet::new(dispatch::Net::from(
tcp_control_session.sender(),
))) as Box<dyn kademlia::Net<SocketAddr> + Send + Sync>,
// MessageNet::new(DispatchNet(Sender::from(quic_control_session.sender()))),
Sender::from(kademlia_control_session.sender()),
Box::new(kademlia::CryptoWorker::from(Worker::Inline(
Expand Down Expand Up @@ -333,7 +334,7 @@ async fn start_peer(
))) as _,
Box::new(fs_sender) as _,
)));
let mut tcp_control = Unify(Dispatch::new(
let mut tcp_control = Unify(event::Buffered::from(Dispatch::new(
augustus::net::session::Tcp::new(addr)?,
{
// let mut quic_control = Blanket(Unify(Dispatch::new(quic.clone(), {
Expand All @@ -349,7 +350,7 @@ async fn start_peer(
}
},
Once(tcp_control_session.sender()),
)?);
)?));

let socket_session =
augustus::net::session::tcp::accept_session(listener, tcp_control_session.sender());
Expand Down
19 changes: 10 additions & 9 deletions examples/bench-unreplicated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use augustus::{
blocking,
erased::{self, events::Init, Blanket},
ordered::Timer,
BlackHole, Inline, OnTimer, Once, SendEvent as _, Session, Unify, UnreachableTimer,
BlackHole, Buffered, Inline, OnTimer, Once, SendEvent as _, Session, Unify,
UnreachableTimer,
},
net::{
dispatch::Net,
Expand Down Expand Up @@ -153,11 +154,11 @@ async fn main() -> anyhow::Result<()> {
erased::session::Sender::from(close_loop_session.sender()),
));
let mut state_sender = state_session.sender();
let mut tcp_control = Unify(Dispatch::new(
let mut tcp_control = Unify(Buffered::from(Dispatch::new(
Tcp::new(listener.local_addr()?)?,
move |buf: &_| to_client_on_buf(buf, &mut state_sender),
Once(tcp_session.sender()),
)?);
)?));

let tcp_sender = tcp_session.sender();
sessions
Expand Down Expand Up @@ -186,11 +187,11 @@ async fn main() -> anyhow::Result<()> {
erased::session::Sender::from(close_loop_session.sender()),
));
let mut state_sender = state_session.sender();
let mut quic_control = Unify(Dispatch::new(
let mut quic_control = Unify(Buffered::from(Dispatch::new(
quic.clone(),
move |buf: &_| to_client_on_buf(buf, &mut state_sender),
Once(quic_session.sender()),
)?);
)?));

let quic_sender = quic_session.sender();
sessions.spawn_on(quic::accept_session(quic, quic_sender), runtime.handle());
Expand Down Expand Up @@ -334,11 +335,11 @@ async fn main() -> anyhow::Result<()> {
let mut state = Unify(Replica::new(Null, ToClientMessageNet::new(raw_net)));
let mut state_session = Session::new();
let mut state_sender = state_session.sender();
let mut tcp_control = Unify(Dispatch::new(
let mut tcp_control = Unify(Buffered::from(Dispatch::new(
Tcp::new(listener.local_addr()?)?,
move |buf: &_| to_replica_on_buf::<SocketAddr>(buf, &mut state_sender),
Once(tcp_session.sender()),
)?);
)?));

let accept_session = tcp::accept_session(listener, tcp_session.sender());
let tcp_session = tcp_session.run(&mut tcp_control);
Expand All @@ -364,11 +365,11 @@ async fn main() -> anyhow::Result<()> {
let mut state_session = Session::new();
let mut state_sender = state_session.sender();
let quic = Quic::new(replica_addr)?;
let mut quic_control = Unify(Dispatch::new(
let mut quic_control = Unify(Buffered::from(Dispatch::new(
quic.clone(),
move |buf: &_| to_replica_on_buf::<SocketAddr>(buf, &mut state_sender),
Once(quic_session.sender()),
)?);
)?));

let accept_session = quic::accept_session(quic, quic_session.sender());
let quic_session = quic_session.run(&mut quic_control);
Expand Down
6 changes: 3 additions & 3 deletions examples/stress-connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

use augustus::{
event::{Once, Session, Unify},
event::{Buffered, Once, Session, Unify},
net::{dispatch::Net, Dispatch, SendMessage},
};

Expand Down Expand Up @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
let quic = augustus::net::session::Quic::new(SocketAddr::from(([0, 0, 0, 0], 3000 + i)))?;

let mut control_session = Session::new();
let mut control = Unify(Dispatch::new(
let mut control = Unify(Buffered::from(Dispatch::new(
// augustus::net::session::Tcp::new(None)?,
quic.clone(),
{
Expand All @@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> {
}
},
Once(control_session.sender()),
)?);
)?));

// sessions.spawn(augustus::net::session::tcp_accept_session(
// listener,
Expand Down
16 changes: 13 additions & 3 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ pub trait OnTimer {

pub struct Inline<'a, S, T>(pub &'a mut S, pub &'a mut T);

impl<S: OnEventUniversal<T>, T: Timer> SendEvent<S::Event> for Inline<'_, S, T> {
fn send(&mut self, event: S::Event) -> anyhow::Result<()> {
self.0.on_event(event, self.1)
impl<S: OnEventUniversal<T>, T: Timer, M: Into<S::Event>> SendEvent<M> for Inline<'_, S, T> {
fn send(&mut self, event: M) -> anyhow::Result<()> {
self.0.on_event(event.into(), self.1)
}
}

Expand Down Expand Up @@ -164,6 +164,15 @@ pub struct Buffered<S, M> {
attached: HashMap<TimerId, Box<dyn FnMut() -> M + Send + Sync>>,
}

impl<S, M> From<S> for Buffered<S, M> {
fn from(value: S) -> Self {
Self {
inner: value,
attached: Default::default(),
}
}
}

struct BufferedTimer<'a, T, M> {
inner: &'a mut T,
attached: &'a mut HashMap<TimerId, Box<dyn FnMut() -> M + Send + Sync>>,
Expand Down Expand Up @@ -268,6 +277,7 @@ pub mod erased {

pub struct Inline<'a, S, T>(pub &'a mut S, pub &'a mut T);

// we probably cannot have `impl SendEvent<impl Into<M>>`
impl<S: OnEvent<M>, M, T: Timer> SendEvent<M> for Inline<'_, S, T> {
fn send(&mut self, event: M) -> anyhow::Result<()> {
self.0.on_event(event, self.1)
Expand Down
35 changes: 18 additions & 17 deletions src/net/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use derive_where::derive_where;

use tracing::{debug, warn};

use crate::event::{erased, OnEvent, OnTimer, SendEvent, SendEventOnce, Timer};
use crate::event::{
erased, OnEventRichTimer as OnEvent, RichTimer as Timer, SendEvent, SendEventOnce,
UnreachableTimer,
};

use super::{Addr, Buf, IterAddr, SendMessage};

Expand Down Expand Up @@ -65,8 +68,8 @@ impl<E, P: Protocol<A, B>, A, B, F> Dispatch<E, P, A, B, F> {
}
}

// go with typed event for this state machine because it takes a sender that
// sends to itself and must be `Clone` at the same time
// go with typed event for this state machine because it (1) owns a sender that
// (2) sends to itself and (3) must be `Clone` at the same time
// i.e. "the horror" of type erasure
#[derive(derive_more::From)]
pub enum Event<P: Protocol<A, B>, A, B> {
Expand Down Expand Up @@ -146,9 +149,9 @@ impl<

fn on_event(&mut self, event: Self::Event, timer: &mut impl Timer) -> anyhow::Result<()> {
match event {
Event::Outgoing(event) => erased::OnEvent::on_event(self, event, timer),
Event::Incoming(event) => erased::OnEvent::on_event(self, event, timer),
Event::Closed(event) => erased::OnEvent::on_event(self, event, timer),
Event::Outgoing(event) => self.handle_outgoing(event, timer),
Event::Incoming(event) => erased::OnEvent::on_event(self, event, &mut UnreachableTimer),
Event::Closed(event) => self.handle_closed(event, timer),
}
}
}
Expand All @@ -159,9 +162,9 @@ impl<
A: Addr,
B: Buf,
F: FnMut(&[u8]) -> anyhow::Result<()> + Clone + Send + 'static,
> erased::OnEvent<Outgoing<A, B>> for Dispatch<E, P, A, B, F>
> Dispatch<E, P, A, B, F>
{
fn on_event(
fn handle_outgoing(
&mut self,
Outgoing(remote, buf): Outgoing<A, B>,
_: &mut impl Timer,
Expand Down Expand Up @@ -217,7 +220,7 @@ impl<
fn on_event(
&mut self,
Incoming(event): Incoming<P::Incoming>,
_: &mut impl Timer,
_: &mut impl crate::event::Timer,
) -> anyhow::Result<()> {
self.seq += 1;
let close_guard = CloseGuard(self.close_sender.clone(), None, self.seq);
Expand All @@ -239,8 +242,12 @@ impl<
}
}

impl<E, P: Protocol<A, B>, A: Addr, B, F> erased::OnEvent<Closed<A>> for Dispatch<E, P, A, B, F> {
fn on_event(&mut self, Closed(addr, seq): Closed<A>, _: &mut impl Timer) -> anyhow::Result<()> {
impl<E, P: Protocol<A, B>, A: Addr, B, F> Dispatch<E, P, A, B, F> {
fn handle_closed(
&mut self,
Closed(addr, seq): Closed<A>,
_: &mut impl Timer,
) -> anyhow::Result<()> {
if let Some(connection) = self.connections.get(&addr) {
if connection.seq == seq {
debug!(">>> {addr:?} outgoing connection closed");
Expand All @@ -250,9 +257,3 @@ impl<E, P: Protocol<A, B>, A: Addr, B, F> erased::OnEvent<Closed<A>> for Dispatc
Ok(())
}
}

impl<E, P: Protocol<A, B>, A, B, F> OnTimer for Dispatch<E, P, A, B, F> {
fn on_timer(&mut self, _: crate::event::TimerId, _: &mut impl Timer) -> anyhow::Result<()> {
unreachable!()
}
}
13 changes: 3 additions & 10 deletions src/net/session/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,7 @@ impl Quic {
let mut stream = match connection.accept_uni().await {
Ok(stream) => stream,
// TODO
Err(
quinn::ConnectionError::ConnectionClosed(_)
| quinn::ConnectionError::LocallyClosed
| quinn::ConnectionError::TimedOut,
) => break,
// Err(quinn::ConnectionError::ConnectionClosed(_)) => break,
Err(err) => {
warn!("<<< {remote_addr} {err}");
break;
Expand Down Expand Up @@ -148,11 +144,8 @@ impl<B: Buf> Protocol<SocketAddr, B> for Quic {
endpoint.connect(remote, "neatworks.quic")
}?;
drop(span.exit());
anyhow::Ok(
connecting
.instrument(tracing::debug_span!("connect", local = ?endpoint.local_addr(), remote = ?remote))
.await?,
)
let span = tracing::debug_span!("connect", local = ?endpoint.local_addr(), remote = ?remote);
anyhow::Ok(connecting.instrument(span).await?)
};
let connection = match task.await {
Ok(connection) => connection,
Expand Down

0 comments on commit 94c73ab

Please sign in to comment.