Skip to content

Commit

Permalink
Add pbft test simulated model
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Aug 22, 2024
1 parent 5c299a7 commit 93609be
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 33 deletions.
5 changes: 4 additions & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ pub trait ScheduleEvent<M> {
self.set_internal(period, move || event.clone())
}

#[allow(unused)]
fn set_internal(
&mut self,
period: Duration,
event: impl FnMut() -> M + Send + 'static,
) -> anyhow::Result<ActiveTimer>;
) -> anyhow::Result<ActiveTimer> {
anyhow::bail!("unimplemented")
}

fn unset(&mut self, id: ActiveTimer) -> anyhow::Result<()>;
}
Expand Down
8 changes: 0 additions & 8 deletions src/model/search/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@ impl<M: Into<N>, N> ScheduleEvent<M> for Schedule<N> {
Ok(ActiveTimer(id))
}

fn set_internal(
&mut self,
_: Duration,
_: impl FnMut() -> M + Send + 'static,
) -> anyhow::Result<ActiveTimer> {
anyhow::bail!("unimplemented")
}

fn unset(&mut self, ActiveTimer(id): ActiveTimer) -> anyhow::Result<()> {
self.remove(id)?;
Ok(())
Expand Down
23 changes: 0 additions & 23 deletions src/model/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,6 @@ use crate::{
#[derive(Debug, Display, Error)]
pub struct ProgressExhausted;

pub trait State {
type Event;

fn step(&mut self, temporal: &mut Temporal<Self::Event>) -> anyhow::Result<()>;

fn progress(&mut self, temporal: &mut Temporal<Self::Event>) -> anyhow::Result<()> {
loop {
match self.step(temporal) {
Err(err) if err.is::<ProgressExhausted>() => return Ok(()),
result => result?,
}
}
}
}

pub type TimerId = u32;

#[derive_where(Default)]
Expand Down Expand Up @@ -61,14 +46,6 @@ impl<M> ScheduleEvent<M> for Temporal<M> {
Ok(ActiveTimer(id))
}

fn set_internal(
&mut self,
_: Duration,
_: impl FnMut() -> M + Send + 'static,
) -> anyhow::Result<ActiveTimer> {
anyhow::bail!("unimplemented")
}

fn unset(&mut self, ActiveTimer(id): ActiveTimer) -> anyhow::Result<()> {
let Some(envelop) = self.timers.remove(&id) else {
anyhow::bail!("missing timer envelop")
Expand Down
129 changes: 128 additions & 1 deletion src/pbft/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,133 @@ mod search {
Ok(())
}
}

impl<W: Workload<Op = Bytes, Result = Bytes>> crate::model::search::State for State<W> {
type Event = Event;

fn events(&self) -> impl Iterator<Item = Self::Event> + '_ {
let client_timers =
self.clients
.iter()
.enumerate()
.flat_map(|(index, (_, context))| {
assert!(context.upcall.sender.is_none());
context.schedule.events().map(move |(id, event)| {
Event::Timer(Addr::Client(index as _), id, event)
})
});
let replica_timers =
self.replicas
.iter()
.enumerate()
.flat_map(|(index, (_, context))| {
context.schedule.events().map(move |(id, event)| {
Event::Timer(Addr::Replica(index as _), id, event)
})
});
self.network
.events()
.map(|(addr, message)| Event::Message(addr, message))
.chain(client_timers)
.chain(replica_timers)
}
}
}

mod simulate {}
mod simulate {
use arbtest::arbitrary::Unstructured;
use bytes::Bytes;

use crate::{
event::{combinators::Transient, OnErasedEvent as _, ScheduleEvent},
model::simulate::{NetworkState, ProgressExhausted, Temporal},
workload::Workload,
};

use super::{Addr, Message, Timer};

pub type State<W> =
super::State<ClientContextState<W>, ReplicaContextState, NetworkState<Addr, Message>>;
pub type ClientContextState<W> = super::ClientContextState<W, ()>;
pub type ReplicaContextState = super::ReplicaContextState<()>;

pub type NetworkContext<'a> = super::NetworkContext<'a, NetworkState<Addr, Message>>;
pub type ClientContext<'a, W> = super::ClientContext<'a, NetworkContext<'a>, W, Schedule<'a>>;
pub type ReplicaContext<'a> = super::ReplicaContext<'a, NetworkContext<'a>, Schedule<'a>>;

pub type Event = super::Event<()>;

pub struct Schedule<'a> {
addr: super::Addr,
temporal: &'a mut Temporal<Event>,
}

impl<M: Into<Timer>> ScheduleEvent<M> for Schedule<'_> {
fn set(
&mut self,
period: std::time::Duration,
event: M,
) -> anyhow::Result<crate::event::ActiveTimer>
where
M: Send + Clone + 'static,
{
self.temporal
.set(period, super::Event::Timer(self.addr, (), event.into()))
}

fn unset(&mut self, id: crate::event::ActiveTimer) -> anyhow::Result<()> {
self.temporal.unset(id)
}
}

impl<W: Workload<Op = Bytes, Result = Bytes>> State<W> {
pub fn step(
&mut self,
u: &mut Unstructured,
temporal: &mut Temporal<Event>,
) -> anyhow::Result<()> {
let event = match self.network.choose(u) {
Ok((addr, message)) => Event::Message(addr, message),
Err(err) if err.is::<ProgressExhausted>() => temporal.pop()?,
Err(err) => return Err(err),
};
match event {
Event::Message(addr @ Addr::Client(index), _)
| Event::Timer(addr @ Addr::Client(index), ..) => {
let Some((client, context)) = self.clients.get_mut(index as usize) else {
anyhow::bail!("missing client for index {index}")
};
let mut context = ClientContext {
net: NetworkContext {
state: &mut self.network,
all: (0..self.replicas.len() as u8).map(Addr::Replica).collect(),
},
upcall: &mut context.upcall,
schedule: &mut Schedule { addr, temporal },
};
client.on_event(event, &mut context)
}
Event::Message(addr @ Addr::Replica(index), _)
| Event::Timer(addr @ Addr::Replica(index), ..) => {
let all = (0..self.replicas.len() as u8)
.filter(|id| *id != index)
.map(Addr::Replica)
.collect();
let Some((replica, context)) = self.replicas.get_mut(index as usize) else {
anyhow::bail!("missing replica for index {index}")
};
let mut context = ReplicaContext {
net: NetworkContext {
state: &mut self.network,
all,
},
crypto_worker: Transient::new(),
schedule: &mut Schedule { addr, temporal },
crypto: &mut context.crypto,
};
replica.on_event(event, &mut context)
}
}
}
}
}

0 comments on commit 93609be

Please sign in to comment.