Skip to content

Commit

Permalink
Add pbft client context
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Jul 6, 2024
1 parent b93f6ca commit 136a691
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 32 deletions.
29 changes: 27 additions & 2 deletions src/bin/workload-standalone.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::{env::args, time::Duration};
use std::{
env::args,
time::{Duration, Instant},
};

use neatworks::workload::events::Invoke;
use tokio::{select, time::sleep};

pub mod workload {
Expand All @@ -8,6 +12,27 @@ pub mod workload {
mod util;
}

struct InvokeTask;

impl workload::clients::InvokeTask for InvokeTask {
async fn run(
self,
mut sender: impl neatworks::event::SendEvent<neatworks::workload::events::Invoke<bytes::Bytes>>,
mut receiver: tokio::sync::mpsc::UnboundedReceiver<
neatworks::workload::events::InvokeOk<bytes::Bytes>,
>,
) -> anyhow::Result<()> {
for _ in 0..10 {
let start = Instant::now();
sender.send(Invoke(Default::default()))?;
let recv = receiver.recv().await;
anyhow::ensure!(recv.is_some());
println!("{:?}", start.elapsed())
}
anyhow::Ok(())
}
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let mode = args().nth(1);
Expand All @@ -17,7 +42,7 @@ async fn main() -> anyhow::Result<()> {
let server_task = workload::servers::unreplicated();
let client_task = async {
sleep(Duration::from_millis(100)).await;
workload::clients::unreplicated().await
workload::clients::unreplicated(InvokeTask).await
};
select! {
result = client_task => break 'client result?,
Expand Down
89 changes: 68 additions & 21 deletions src/bin/workload/clients.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,40 @@
use std::sync::Arc;
use std::{future::Future, sync::Arc};

use bytes::Bytes;
use neatworks::{
event::{
task::{erase::Of, run_with_schedule, ScheduleState},
Erase, SendEvent, Untyped,
},
net::{combinators::Forward, task::udp},
net::{
combinators::{Forward, IndexNet},
task::udp,
},
pbft::{self, PublicParameters},
unreplicated,
workload::events::{Invoke, InvokeOk},
};
use rand::random;
use tokio::{net::UdpSocket, select, sync::mpsc::unbounded_channel, time::Instant};
use tokio::{
net::UdpSocket,
select,
sync::mpsc::{unbounded_channel, UnboundedReceiver},
};

use super::util::run_until;

pub async fn unreplicated() -> anyhow::Result<()> {
pub trait InvokeTask {
fn run(
self,
sender: impl SendEvent<Invoke<Bytes>>,
receiver: UnboundedReceiver<InvokeOk<Bytes>>,
) -> impl Future<Output = anyhow::Result<()>>;
}

pub async fn unreplicated(invoke_task: impl InvokeTask) -> anyhow::Result<()> {
let socket = Arc::new(UdpSocket::bind("localhost:0").await?);
let addr = socket.local_addr()?;
let (upcall_sender, mut upcall_receiver) = unbounded_channel::<InvokeOk<_>>();
let (upcall_sender, upcall_receiver) = unbounded_channel::<InvokeOk<_>>();
let (schedule_sender, mut schedule_receiver) = unbounded_channel();
let (sender, mut receiver) = unbounded_channel();

Expand All @@ -42,22 +59,52 @@ pub async fn unreplicated() -> anyhow::Result<()> {
|context| &mut *context.schedule,
);

let invoke_task = async {
let mut sender = Erase::new(sender);
for _ in 0..10 {
let start = Instant::now();
sender.send(Invoke(Default::default()))?;
let recv = upcall_receiver.recv().await;
anyhow::ensure!(recv.is_some());
println!("{:?}", start.elapsed())
}
anyhow::Ok(())
run_until(
invoke_task.run(Erase::new(sender), upcall_receiver),
async {
select! {
result = net_task => result,
result = client_task => result,
}
},
)
.await
}

pub async fn pbft(invoke_task: impl InvokeTask, config: PublicParameters) -> anyhow::Result<()> {
let socket = Arc::new(UdpSocket::bind("localhost:0").await?);
let addr = socket.local_addr()?;
let (upcall_sender, upcall_receiver) = unbounded_channel::<InvokeOk<_>>();
let (schedule_sender, mut schedule_receiver) = unbounded_channel();
let (sender, mut receiver) = unbounded_channel();

let net_task = udp::run(
&socket,
pbft::messages::codec::to_client_decode(Erase::new(sender.clone())),
);

let mut context = pbft::client::context::Context::<_, _, Of<_>> {
// TODO
net: pbft::messages::codec::to_replica_encode(IndexNet::new(vec![], None, socket.clone())),
upcall: upcall_sender,
schedule: Erase::new(ScheduleState::new(schedule_sender)),
};
run_until(invoke_task, async {
select! {
result = net_task => result,
result = client_task => result,
}
})
let client_task = run_with_schedule(
Untyped::new(pbft::client::State::new(random(), addr, config)),
&mut context,
&mut receiver,
&mut schedule_receiver,
|context| &mut *context.schedule,
);

run_until(
invoke_task.run(Erase::new(sender), upcall_receiver),
async {
select! {
result = net_task => result,
result = client_task => result,
}
},
)
.await
}
67 changes: 58 additions & 9 deletions src/pbft/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::BTreeMap;

use bytes::Bytes;

use crate::{
codec::Payload,
event::{OnErasedEvent, ScheduleEvent, SendEvent, TimerId},
Expand Down Expand Up @@ -51,22 +53,21 @@ pub mod events {

pub trait Context<A> {
type Net: SendMessage<u8, Request<A>> + SendMessage<All, Request<A>>;
type Upcall: SendEvent<InvokeOk<Payload>>;
type Upcall: SendEvent<InvokeOk<Bytes>>;
type Schedule: ScheduleEvent<events::Resend>;
fn net(&mut self) -> &mut Self::Net;
fn upcall(&mut self) -> &mut Self::Upcall;
fn schedule(&mut self) -> &mut Self::Schedule;
}

impl<A: Addr, C: Context<A>> OnErasedEvent<Invoke<Payload>, C> for State<A> {
fn on_event(&mut self, Invoke(op): Invoke<Payload>, context: &mut C) -> anyhow::Result<()> {
impl<A: Addr, C: Context<A>> OnErasedEvent<Invoke<Bytes>, C> for State<A> {
fn on_event(&mut self, Invoke(op): Invoke<Bytes>, context: &mut C) -> anyhow::Result<()> {
self.seq += 1;
let timer = context
.schedule()
.set(self.config.client_resend_interval, || events::Resend)?;
let replaced = self.outstanding.replace(Outstanding {
op,
timer,
op: Payload(op),
timer: context
.schedule()
.set(self.config.client_resend_interval, || events::Resend)?,
replies: Default::default(),
});
anyhow::ensure!(replaced.is_none());
Expand Down Expand Up @@ -109,7 +110,8 @@ impl<A, C: Context<A>> OnErasedEvent<Recv<Reply>, C> for State<A> {
context
.schedule()
.unset(self.outstanding.take().unwrap().timer)?;
context.upcall().send(InvokeOk(reply.result))
let Payload(result) = reply.result;
context.upcall().send(InvokeOk(result))
}
}

Expand All @@ -127,3 +129,50 @@ impl<A: Addr> State<A> {
context.net().send(dest, request)
}
}

pub mod context {
use super::*;

pub struct Context<N, U, O: On<Self>> {
pub net: N,
pub upcall: U,
pub schedule: O::Schedule,
}

pub trait On<C> {
type Schedule: ScheduleEvent<events::Resend>;
}

impl<N, U, O: On<Self>, A> super::Context<A> for Context<N, U, O>
where
N: SendMessage<u8, Request<A>> + SendMessage<All, Request<A>>,
U: SendEvent<InvokeOk<Bytes>>,
{
type Net = N;
type Upcall = U;
type Schedule = O::Schedule;
fn net(&mut self) -> &mut Self::Net {
&mut self.net
}
fn upcall(&mut self) -> &mut Self::Upcall {
&mut self.upcall
}
fn schedule(&mut self) -> &mut Self::Schedule {
&mut self.schedule
}
}

mod task {
use crate::event::task::erase::{Of, ScheduleState};

use super::*;

impl<N, U, A: Addr> On<Context<N, U, Self>> for Of<State<A>>
where
N: SendMessage<u8, Request<A>> + SendMessage<All, Request<A>>,
U: SendEvent<InvokeOk<Bytes>>,
{
type Schedule = ScheduleState<State<A>, Context<N, U, Self>>;
}
}
}

0 comments on commit 136a691

Please sign in to comment.