From 136a691c2dddca6f256cf01de68fb99108c470ed Mon Sep 17 00:00:00 2001 From: sgdxbc Date: Sat, 6 Jul 2024 16:14:18 +0800 Subject: [PATCH] Add pbft client context --- src/bin/workload-standalone.rs | 29 ++++++++++- src/bin/workload/clients.rs | 89 ++++++++++++++++++++++++++-------- src/pbft/client.rs | 67 +++++++++++++++++++++---- 3 files changed, 153 insertions(+), 32 deletions(-) diff --git a/src/bin/workload-standalone.rs b/src/bin/workload-standalone.rs index 4ceeffdb..33045ebc 100644 --- a/src/bin/workload-standalone.rs +++ b/src/bin/workload-standalone.rs @@ -1,5 +1,9 @@ -use std::{env::args, time::Duration}; +use std::{ + env::args, + time::{Duration, Instant}, +}; +use neatworks::workload::events::Invoke; use tokio::{select, time::sleep}; pub mod workload { @@ -8,6 +12,27 @@ pub mod workload { mod util; } +struct InvokeTask; + +impl workload::clients::InvokeTask for InvokeTask { + async fn run( + self, + mut sender: impl neatworks::event::SendEvent>, + mut receiver: tokio::sync::mpsc::UnboundedReceiver< + neatworks::workload::events::InvokeOk, + >, + ) -> anyhow::Result<()> { + for _ in 0..10 { + let start = Instant::now(); + sender.send(Invoke(Default::default()))?; + let recv = receiver.recv().await; + anyhow::ensure!(recv.is_some()); + println!("{:?}", start.elapsed()) + } + anyhow::Ok(()) + } +} + #[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { let mode = args().nth(1); @@ -17,7 +42,7 @@ async fn main() -> anyhow::Result<()> { let server_task = workload::servers::unreplicated(); let client_task = async { sleep(Duration::from_millis(100)).await; - workload::clients::unreplicated().await + workload::clients::unreplicated(InvokeTask).await }; select! { result = client_task => break 'client result?, diff --git a/src/bin/workload/clients.rs b/src/bin/workload/clients.rs index 8daddb7c..d138245e 100644 --- a/src/bin/workload/clients.rs +++ b/src/bin/workload/clients.rs @@ -1,23 +1,40 @@ -use std::sync::Arc; +use std::{future::Future, sync::Arc}; +use bytes::Bytes; use neatworks::{ event::{ task::{erase::Of, run_with_schedule, ScheduleState}, Erase, SendEvent, Untyped, }, - net::{combinators::Forward, task::udp}, + net::{ + combinators::{Forward, IndexNet}, + task::udp, + }, + pbft::{self, PublicParameters}, unreplicated, workload::events::{Invoke, InvokeOk}, }; use rand::random; -use tokio::{net::UdpSocket, select, sync::mpsc::unbounded_channel, time::Instant}; +use tokio::{ + net::UdpSocket, + select, + sync::mpsc::{unbounded_channel, UnboundedReceiver}, +}; use super::util::run_until; -pub async fn unreplicated() -> anyhow::Result<()> { +pub trait InvokeTask { + fn run( + self, + sender: impl SendEvent>, + receiver: UnboundedReceiver>, + ) -> impl Future>; +} + +pub async fn unreplicated(invoke_task: impl InvokeTask) -> anyhow::Result<()> { let socket = Arc::new(UdpSocket::bind("localhost:0").await?); let addr = socket.local_addr()?; - let (upcall_sender, mut upcall_receiver) = unbounded_channel::>(); + let (upcall_sender, upcall_receiver) = unbounded_channel::>(); let (schedule_sender, mut schedule_receiver) = unbounded_channel(); let (sender, mut receiver) = unbounded_channel(); @@ -42,22 +59,52 @@ pub async fn unreplicated() -> anyhow::Result<()> { |context| &mut *context.schedule, ); - let invoke_task = async { - let mut sender = Erase::new(sender); - for _ in 0..10 { - let start = Instant::now(); - sender.send(Invoke(Default::default()))?; - let recv = upcall_receiver.recv().await; - anyhow::ensure!(recv.is_some()); - println!("{:?}", start.elapsed()) - } - anyhow::Ok(()) + run_until( + invoke_task.run(Erase::new(sender), upcall_receiver), + async { + select! { + result = net_task => result, + result = client_task => result, + } + }, + ) + .await +} + +pub async fn pbft(invoke_task: impl InvokeTask, config: PublicParameters) -> anyhow::Result<()> { + let socket = Arc::new(UdpSocket::bind("localhost:0").await?); + let addr = socket.local_addr()?; + let (upcall_sender, upcall_receiver) = unbounded_channel::>(); + let (schedule_sender, mut schedule_receiver) = unbounded_channel(); + let (sender, mut receiver) = unbounded_channel(); + + let net_task = udp::run( + &socket, + pbft::messages::codec::to_client_decode(Erase::new(sender.clone())), + ); + + let mut context = pbft::client::context::Context::<_, _, Of<_>> { + // TODO + net: pbft::messages::codec::to_replica_encode(IndexNet::new(vec![], None, socket.clone())), + upcall: upcall_sender, + schedule: Erase::new(ScheduleState::new(schedule_sender)), }; - run_until(invoke_task, async { - select! { - result = net_task => result, - result = client_task => result, - } - }) + let client_task = run_with_schedule( + Untyped::new(pbft::client::State::new(random(), addr, config)), + &mut context, + &mut receiver, + &mut schedule_receiver, + |context| &mut *context.schedule, + ); + + run_until( + invoke_task.run(Erase::new(sender), upcall_receiver), + async { + select! { + result = net_task => result, + result = client_task => result, + } + }, + ) .await } diff --git a/src/pbft/client.rs b/src/pbft/client.rs index 4ddc5321..481457f4 100644 --- a/src/pbft/client.rs +++ b/src/pbft/client.rs @@ -1,5 +1,7 @@ use std::collections::BTreeMap; +use bytes::Bytes; + use crate::{ codec::Payload, event::{OnErasedEvent, ScheduleEvent, SendEvent, TimerId}, @@ -51,22 +53,21 @@ pub mod events { pub trait Context { type Net: SendMessage> + SendMessage>; - type Upcall: SendEvent>; + type Upcall: SendEvent>; type Schedule: ScheduleEvent; fn net(&mut self) -> &mut Self::Net; fn upcall(&mut self) -> &mut Self::Upcall; fn schedule(&mut self) -> &mut Self::Schedule; } -impl> OnErasedEvent, C> for State { - fn on_event(&mut self, Invoke(op): Invoke, context: &mut C) -> anyhow::Result<()> { +impl> OnErasedEvent, C> for State { + fn on_event(&mut self, Invoke(op): Invoke, context: &mut C) -> anyhow::Result<()> { self.seq += 1; - let timer = context - .schedule() - .set(self.config.client_resend_interval, || events::Resend)?; let replaced = self.outstanding.replace(Outstanding { - op, - timer, + op: Payload(op), + timer: context + .schedule() + .set(self.config.client_resend_interval, || events::Resend)?, replies: Default::default(), }); anyhow::ensure!(replaced.is_none()); @@ -109,7 +110,8 @@ impl> OnErasedEvent, C> for State { context .schedule() .unset(self.outstanding.take().unwrap().timer)?; - context.upcall().send(InvokeOk(reply.result)) + let Payload(result) = reply.result; + context.upcall().send(InvokeOk(result)) } } @@ -127,3 +129,50 @@ impl State { context.net().send(dest, request) } } + +pub mod context { + use super::*; + + pub struct Context> { + pub net: N, + pub upcall: U, + pub schedule: O::Schedule, + } + + pub trait On { + type Schedule: ScheduleEvent; + } + + impl, A> super::Context for Context + where + N: SendMessage> + SendMessage>, + U: SendEvent>, + { + type Net = N; + type Upcall = U; + type Schedule = O::Schedule; + fn net(&mut self) -> &mut Self::Net { + &mut self.net + } + fn upcall(&mut self) -> &mut Self::Upcall { + &mut self.upcall + } + fn schedule(&mut self) -> &mut Self::Schedule { + &mut self.schedule + } + } + + mod task { + use crate::event::task::erase::{Of, ScheduleState}; + + use super::*; + + impl On> for Of> + where + N: SendMessage> + SendMessage>, + U: SendEvent>, + { + type Schedule = ScheduleState, Context>; + } + } +}