Skip to content

Commit

Permalink
Revise context design
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Jul 7, 2024
1 parent 99b2163 commit 2c9cf6f
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 127 deletions.
8 changes: 5 additions & 3 deletions src/bin/workload/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{future::Future, net::SocketAddr, sync::Arc};
use bytes::Bytes;
use neatworks::{
event::{
task::{run_with_schedule, ContextOf, ScheduleState},
task::{run_with_schedule, Context, ScheduleState},
Erase, SendEvent, Untyped,
},
net::{
Expand Down Expand Up @@ -43,13 +43,14 @@ pub async fn unreplicated(invoke_task: impl InvokeTask) -> anyhow::Result<()> {
unreplicated::codec::client_decode(Erase::new(sender.clone())),
);

let mut context = unreplicated::context::Client::<ContextOf<_>, _, _> {
let mut context = unreplicated::context::Client::<Context, _, _, _> {
net: unreplicated::codec::client_encode(Forward(
([127, 0, 0, 1], 3000).into(),
socket.clone(),
)),
upcall: upcall_sender,
schedule: Erase::new(ScheduleState::new(schedule_sender)),
_m: Default::default(),
};
let client_task = run_with_schedule(
Untyped::new(unreplicated::ClientState::new(random(), addr)),
Expand Down Expand Up @@ -87,14 +88,15 @@ pub async fn pbft(
pbft::messages::codec::to_client_decode(Erase::new(sender.clone())),
);

let mut context = pbft::client::context::Context::<ContextOf<_>, _, _> {
let mut context = pbft::client::context::Context::<Context, _, _, _> {
net: pbft::messages::codec::to_replica_encode(IndexNet::new(
replica_addrs,
None,
socket.clone(),
)),
upcall: upcall_sender,
schedule: Erase::new(ScheduleState::new(schedule_sender)),
_m: Default::default(),
};
let client_task = run_with_schedule(
Untyped::new(pbft::client::State::new(random(), addr, config)),
Expand Down
4 changes: 2 additions & 2 deletions src/bin/workload/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{net::SocketAddr, sync::Arc};
use neatworks::{
crypto::{Crypto, CryptoFlavor},
event::{
task::{run, run_with_schedule, run_worker, ContextOf, ScheduleState},
task::{run, run_with_schedule, run_worker, Context, ScheduleState},
Erase, Untyped,
},
net::{combinators::IndexNet, task::udp},
Expand Down Expand Up @@ -46,7 +46,7 @@ pub async fn pbft(
let (schedule_sender, mut schedule_receiver) = unbounded_channel();
let (sender, mut receiver) = unbounded_channel();

let mut context = pbft::replica::context::Context::<ContextOf<_>, _, _, _> {
let mut context = pbft::replica::context::Context::<Context, _, _, _> {
peer_net: pbft::messages::codec::to_replica_encode(IndexNet::new(
addrs,
index,
Expand Down
8 changes: 4 additions & 4 deletions src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};

use crate::{
event::SendEvent,
net::events::Send,
net::events::Cast,
workload::{events::InvokeOk, Typed},
};

pub struct Encode<M, T>(fn(&M) -> anyhow::Result<Bytes>, pub T);

impl<M: Into<L>, L, N: SendEvent<Send<A, Bytes>>, A> SendEvent<Send<A, M>> for Encode<L, N> {
fn send(&mut self, Send(remote, message): Send<A, M>) -> anyhow::Result<()> {
impl<M: Into<L>, L, N: SendEvent<Cast<A, Bytes>>, A> SendEvent<Cast<A, M>> for Encode<L, N> {
fn send(&mut self, Cast(remote, message): Cast<A, M>) -> anyhow::Result<()> {
let encoded = (self.0)(&message.into())?;
self.1.send(Send(remote, encoded))
self.1.send(Cast(remote, encoded))
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ pub trait ScheduleEvent<M> {
fn unset(&mut self, id: TimerId) -> anyhow::Result<()>;
}

impl<T: ScheduleEvent<M>, M> ScheduleEvent<M> for &mut T {
fn set(
&mut self,
period: Duration,
event: impl FnMut() -> M + Send + 'static,
) -> anyhow::Result<TimerId> {
T::set(self, period, event)
}

fn unset(&mut self, id: TimerId) -> anyhow::Result<()> {
T::unset(self, id)
}
}

#[derive_where(Debug; S)]
#[derive(Deref, DerefMut)]
pub struct Untyped<C, S>(
Expand Down
18 changes: 15 additions & 3 deletions src/event/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@ pub mod erase {
// a "stub", or marker type that indicates the task based context is being used
// in a context type that refers to itself (on type level, not memory reference)
//
// the private PhantomData<_> prevents it from being constructed anywhere
// the private PhantomData<()> prevents it from being constructed anywhere
// outside. and it indeed should not be ever constructed; it only shows up in
// type annotations, as a "placeholder" to take place for the actual generics
// that would refer the context's own type and cannot be written out directly
// anywhere outside the context definition
//
// the marker type seems to have no implementation. ideally it should have
// several implementation "blanket over" the generic state e.g. for schedule
// several implementation "blanket over" a generic state e.g. for schedule
//
// struct ContextOf<S>(PhantomData<S>) // the desired form of `Context`
//
// trait On<Context> {
// type Out;
// }
//
// impl<State, Context> On<Context> for ContextOf<State>
// where
// /* whatever bounds the state and context */
Expand All @@ -55,7 +62,12 @@ pub mod erase {
// as the result, the `impl`s of `ContextOf<_>` all lives in the use sites and
// are for some specialization. but in sprite those `impl`s are together
// recovering the necessary part of the blanket above
pub struct ContextOf<S>(PhantomData<S>);
//
// since the `impl`s are becoming specialized, it is unnecessary for this marker
// to be generic over state (or anything). the (becoming unnecessary)
// PhantomData<_> is saved to continue preventing `Context` value being
// constructed, which is still desired despiting the workaround
pub struct Context(PhantomData<()>);

impl<M: Into<N>, N> SendEvent<M> for UnboundedSender<N> {
fn send(&mut self, event: M) -> anyhow::Result<()> {
Expand Down
87 changes: 50 additions & 37 deletions src/model.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::{collections::BTreeMap, fmt::Debug, time::Duration};

use derive_where::derive_where;

use crate::{
event::{SendEvent, TimerId},
net::events::Send,
event::{ScheduleEvent, SendEvent, TimerId},
net::events::Cast,
};

pub trait State {
Expand All @@ -17,55 +19,66 @@ pub trait State {
}
}

#[derive(Debug, PartialEq, Eq, Hash, Default)]
pub struct Network<A, M, T> {
pub messages: BTreeMap<A, Vec<M>>,
pub timers: BTreeMap<A, Vec<(u32, Duration, T)>>, // TODO
pub timer_count: u32,
pub now: Duration,
#[derive(Debug, PartialEq, Eq, Hash)]
#[derive_where(Default)]
pub struct TimerState<M> {
envelops: Vec<TimerEnvelop<M>>,
count: u32,
}

#[derive(Debug)]
pub struct NetworkContext<'a, A, M, T> {
state: &'a mut Network<A, M, T>,
pub addr: A,
#[derive_where(Debug, PartialEq, Eq, Hash; M)]
struct TimerEnvelop<M> {
id: u32,
#[derive_where(skip)]
generate: Box<dyn FnMut() -> M + Send>,
period: Duration,
event: M,
}

impl<A, M, T> Network<A, M, T> {
pub fn context(&mut self, addr: A) -> NetworkContext<'_, A, M, T> {
NetworkContext { state: self, addr }
impl<M> TimerState<M> {
pub fn new() -> Self {
Self::default()
}
}

impl<A: Ord + Debug, M: Into<N>, N, T> SendEvent<Send<A, M>> for NetworkContext<'_, A, N, T> {
fn send(&mut self, Send(remote, message): Send<A, M>) -> anyhow::Result<()> {
let Some(inbox) = self.state.messages.get_mut(&remote) else {
anyhow::bail!("missing inbox for addr {remote:?}")
impl<M: Into<N>, N> ScheduleEvent<M> for TimerState<N> {
fn set(
&mut self,
period: Duration,
mut event: impl FnMut() -> M + Send + 'static,
) -> anyhow::Result<TimerId> {
self.count += 1;
let id = self.count;
let envelop = TimerEnvelop {
id,
event: event().into(),
generate: Box::new(move || event().into()),
period,
};
inbox.push(message.into());
Ok(())
self.envelops.push(envelop);
Ok(TimerId(id))
}
}

impl<A: Ord + Debug, M, T> NetworkContext<'_, A, M, T> {
pub fn set(&mut self, period: Duration, event: T) -> anyhow::Result<TimerId> {
let Some(inbox) = self.state.timers.get_mut(&self.addr) else {
anyhow::bail!("missing inbox for addr {:?}", self.addr)
fn unset(&mut self, TimerId(id): TimerId) -> anyhow::Result<()> {
let Some(pos) = self.envelops.iter().position(|envelop| envelop.id == id) else {
anyhow::bail!("missing timer of {:?}", TimerId(id))
};
self.state.timer_count += 1;
let id = self.state.timer_count;
inbox.push((id, self.state.now + period, event));
Ok(TimerId(id))
self.envelops.remove(pos);
Ok(())
}
}

pub fn unset(&mut self, TimerId(id): TimerId) -> anyhow::Result<()> {
let Some(inbox) = self.state.timers.get_mut(&self.addr) else {
anyhow::bail!("missing inbox for addr {:?}", self.addr)
};
let Some(pos) = inbox.iter().position(|(other_id, _, _)| *other_id == id) else {
anyhow::bail!("missing timer {:?}", TimerId(id))
#[derive(Debug, PartialEq, Eq, Hash, Default)]
pub struct NetworkState<A, M> {
pub messages: BTreeMap<A, Vec<M>>,
}

impl<A: Ord + Debug, M: Into<N>, N> SendEvent<Cast<A, M>> for NetworkState<A, N> {
fn send(&mut self, Cast(remote, message): Cast<A, M>) -> anyhow::Result<()> {
let Some(inbox) = self.messages.get_mut(&remote) else {
anyhow::bail!("missing inbox for addr {remote:?}")
};
inbox.remove(pos);
inbox.push(message.into());
Ok(())
}
}
8 changes: 5 additions & 3 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ pub mod task {
}

pub mod events {
pub struct Send<A, M>(pub A, pub M);
// probably called `Send` in any sane codebase, but that terribly conflicts with
// std::marker::Send
pub struct Cast<A, M>(pub A, pub M);

pub struct Recv<M>(pub M);
}
Expand All @@ -20,9 +22,9 @@ pub trait SendMessage<A, M> {
fn send(&mut self, remote: A, message: M) -> anyhow::Result<()>;
}

impl<E: SendEvent<events::Send<A, M>>, A, M> SendMessage<A, M> for E {
impl<E: SendEvent<events::Cast<A, M>>, A, M> SendMessage<A, M> for E {
fn send(&mut self, remote: A, message: M) -> anyhow::Result<()> {
SendEvent::send(self, events::Send(remote, message))
SendEvent::send(self, events::Cast(remote, message))
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/net/combinators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use bytes::Bytes;

use crate::event::SendEvent;

use super::{events::Send, Addr};
use super::{events::Cast, Addr};

#[derive(Debug)]
pub struct Forward<A, N>(pub A, pub N);

impl<A: Addr, N: SendEvent<Send<A, M>>, M> SendEvent<Send<(), M>> for Forward<A, N> {
fn send(&mut self, Send((), message): Send<(), M>) -> anyhow::Result<()> {
self.1.send(Send(self.0.clone(), message))
impl<A: Addr, N: SendEvent<Cast<A, M>>, M> SendEvent<Cast<(), M>> for Forward<A, N> {
fn send(&mut self, Cast((), message): Cast<(), M>) -> anyhow::Result<()> {
self.1.send(Cast(self.0.clone(), message))
}
}

Expand All @@ -33,26 +33,26 @@ impl<A, N> IndexNet<A, N> {
}
}

impl<A: Addr, N: SendEvent<Send<A, M>>, M, I: Into<usize>> SendEvent<Send<I, M>>
impl<A: Addr, N: SendEvent<Cast<A, M>>, M, I: Into<usize>> SendEvent<Cast<I, M>>
for IndexNet<A, N>
{
fn send(&mut self, Send(index, message): Send<I, M>) -> anyhow::Result<()> {
fn send(&mut self, Cast(index, message): Cast<I, M>) -> anyhow::Result<()> {
let index = index.into();
let addr = self
.addrs
.get(index)
.ok_or(anyhow::format_err!("missing address of index {index}"))?;
self.inner.send(Send(addr.clone(), message))
self.inner.send(Cast(addr.clone(), message))
}
}

impl<A: Addr, N: SendEvent<Send<A, Bytes>>> SendEvent<Send<All, Bytes>> for IndexNet<A, N> {
fn send(&mut self, Send(All, message): Send<All, Bytes>) -> anyhow::Result<()> {
impl<A: Addr, N: SendEvent<Cast<A, Bytes>>> SendEvent<Cast<All, Bytes>> for IndexNet<A, N> {
fn send(&mut self, Cast(All, message): Cast<All, Bytes>) -> anyhow::Result<()> {
for (index, addr) in self.addrs.iter().enumerate() {
if Some(index) == self.all_except {
continue;
}
self.inner.send(Send(addr.clone(), message.clone()))?
self.inner.send(Cast(addr.clone(), message.clone()))?
}
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions src/net/task/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::{net::SocketAddr, sync::Arc};
use bytes::Bytes;
use tokio::{net::UdpSocket, spawn};

use crate::{event::SendEvent, net::events::Send};
use crate::{event::SendEvent, net::events::Cast};

impl SendEvent<Send<SocketAddr, Bytes>> for Arc<UdpSocket> {
fn send(&mut self, Send(remote, message): Send<SocketAddr, Bytes>) -> anyhow::Result<()> {
impl SendEvent<Cast<SocketAddr, Bytes>> for Arc<UdpSocket> {
fn send(&mut self, Cast(remote, message): Cast<SocketAddr, Bytes>) -> anyhow::Result<()> {
let socket = self.clone();
spawn(async move {
if socket.send_to(&message, remote).await.is_err() {
Expand Down
13 changes: 8 additions & 5 deletions src/pbft/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,22 @@ impl<A: Addr> State<A> {
}

pub mod context {
use std::marker::PhantomData;

use super::*;

pub struct Context<O: On<Self>, N, U> {
pub struct Context<O: On<Self>, N, U, A> {
pub net: N,
pub upcall: U,
pub schedule: O::Schedule,
pub _m: PhantomData<A>,
}

pub trait On<C> {
type Schedule: ScheduleEvent<events::Resend>;
}

impl<O: On<Self>, N, U, A> super::Context<A> for Context<O, N, U>
impl<O: On<Self>, N, U, A> super::Context<A> for Context<O, N, U, A>
where
N: SendMessage<u8, Request<A>> + SendMessage<All, Request<A>>,
U: SendEvent<InvokeOk<Bytes>>,
Expand All @@ -163,16 +166,16 @@ pub mod context {
}

mod task {
use crate::event::task::{erase::ScheduleState, ContextOf};
use crate::event::task::{erase::ScheduleState, Context as Task};

use super::*;

impl<N, U, A: Addr> On<Context<Self, N, U>> for ContextOf<State<A>>
impl<N, U, A: Addr> On<Context<Self, N, U, A>> for Task
where
N: SendMessage<u8, Request<A>> + SendMessage<All, Request<A>>,
U: SendEvent<InvokeOk<Bytes>>,
{
type Schedule = ScheduleState<State<A>, Context<Self, N, U>>;
type Schedule = ScheduleState<State<A>, Context<Self, N, U, A>>;
}
}
}
Loading

0 comments on commit 2c9cf6f

Please sign in to comment.