Skip to content

Commit

Permalink
Prepare for PBFT view change implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Apr 9, 2024
1 parent 5b7b244 commit ab3c6f7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 26 deletions.
33 changes: 15 additions & 18 deletions src/pbft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ pub struct Reply {
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct ViewChange {
view_num: u32,
log: Vec<(Verifiable<PrePrepare>, Vec<(u8, Verifiable<Prepare>)>)>,
log: Vec<(Verifiable<PrePrepare>, Quorum<Prepare>)>,
replica_id: u8,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct NewView {
view_num: u32,
logs: Vec<(u8, Verifiable<ViewChange>)>,
logs: Quorum<ViewChange>,
pre_prepares: Vec<Verifiable<PrePrepare>>,
}

Expand Down Expand Up @@ -243,6 +243,9 @@ impl<W: Submit<S, E>, S: 'static, E: SendCryptoEvent<A> + 'static, A: Addr>
}
}

type Quorum<M> = BTreeMap<u8, Verifiable<M>>;
type Quorums<K, M> = BTreeMap<K, Quorum<M>>;

#[derive(Clone)]
#[derive_where(Debug, PartialEq, Eq, Hash; S, A)]
pub struct Replica<N, CN, CW, S, A, M = (N, CN, CW, S, A)> {
Expand All @@ -255,11 +258,14 @@ pub struct Replica<N, CN, CW, S, A, M = (N, CN, CW, S, A)> {
view_num: u32,
op_num: u32,
log: Vec<LogEntry<A>>,
prepare_quorums: BTreeMap<u32, BTreeMap<u8, Verifiable<Prepare>>>,
commit_quorums: BTreeMap<u32, BTreeMap<u8, Verifiable<Commit>>>,
prepare_quorums: Quorums<u32, Prepare>, // `K` = op number
commit_quorums: Quorums<u32, Commit>,
commit_num: u32,
app: S,

do_view_change_timer: Option<TimerId>,
view_changes: Quorums<u32, ViewChange>, // `K` = view number

// any op num presents in this maps -> there's ongoing verification submitted
// entry presents but empty list -> no pending but one is verifying
// no entry present -> no pending and not verifying
Expand All @@ -281,8 +287,8 @@ pub struct Replica<N, CN, CW, S, A, M = (N, CN, CW, S, A)> {
struct LogEntry<A> {
pre_prepare: Option<Verifiable<PrePrepare>>,
requests: Vec<Request<A>>,
prepares: Vec<(u8, Verifiable<Prepare>)>,
commits: Vec<(u8, Verifiable<Commit>)>,
prepares: Quorum<Prepare>,
commits: Quorum<Commit>,

timer_id: Option<TimerId>,
}
Expand Down Expand Up @@ -315,6 +321,7 @@ impl<N, CN, CW, S, A> Replica<N, CN, CW, S, A> {
commit_quorums: Default::default(),
commit_num: 0,
do_view_change_timer: Default::default(),
view_changes: Default::default(),
pending_prepares: Default::default(),
pending_commits: Default::default(),

Expand Down Expand Up @@ -670,12 +677,7 @@ impl<M: ReplicaCommon> Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
return Ok(());
};
assert!(entry.prepares.is_empty());
entry.prepares = self
.prepare_quorums
.remove(&prepare.op_num)
.unwrap()
.into_iter()
.collect();
entry.prepares = self.prepare_quorums.remove(&prepare.op_num).unwrap();

let commit = Commit {
view_num: self.view_num,
Expand Down Expand Up @@ -809,12 +811,7 @@ impl<M: ReplicaCommon> Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
if entry.prepares.is_empty() {
return Ok(());
}
entry.commits = self
.commit_quorums
.remove(&commit.op_num)
.unwrap()
.into_iter()
.collect();
entry.commits = self.commit_quorums.remove(&commit.op_num).unwrap();

let is_primary = self.is_primary();
while let Some(entry) = self.log.get_mut(self.commit_num as usize + 1) {
Expand Down
32 changes: 24 additions & 8 deletions src/pbft/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use crate::{
};

use super::{
Commit, CryptoWorker, NewView, PrePrepare, Prepare, Progress, Reply, Request, Resend,
ViewChange,
Commit, CryptoWorker, DoViewChange, NewView, PrePrepare, Prepare, Progress, Reply, Request,
Resend, ViewChange,
};

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
Expand Down Expand Up @@ -96,18 +96,29 @@ struct TimerEvent {
enum TimerData {
Resend,
Progress(Progress),
DoViewChange(DoViewChange),
}

impl DowncastEvent for TimerData {
fn try_from<M: 'static>(event: M) -> anyhow::Result<Self> {
if (&event as &dyn Any).is::<Resend>() {
return Ok(Self::Resend);
}
let event = Box::new(event) as Box<dyn Any>;
if let Ok(event) = event.downcast::<Progress>() {
return Ok(Self::Progress(*event));

fn downcast_or<T: 'static>(
wrap: impl Fn(T) -> TimerData,
default: impl Fn(Box<dyn Any>) -> anyhow::Result<TimerData>,
) -> impl Fn(Box<dyn Any>) -> anyhow::Result<TimerData> {
move |event| match event.downcast::<T>() {
Ok(event) => Ok(wrap(*event)),
Err(event) => default(event),
}
}
Err(anyhow::anyhow!("unknown timer data"))

let downcast = |_| Err(anyhow::anyhow!("unknown timer data"));
let downcast = downcast_or::<DoViewChange>(TimerData::DoViewChange, downcast);
let downcast = downcast_or::<Progress>(TimerData::Progress, downcast);
downcast(Box::new(event) as Box<dyn Any>)
}
}

Expand Down Expand Up @@ -337,8 +348,13 @@ impl<W: Workload, F: Filter + Clone, const CHECK: bool> crate::search::State
(Addr::Client(index), TimerData::Resend) => {
self.clients[index as usize].state.on_event(Resend, timer)?
}
(Addr::Replica(index), TimerData::Progress(event)) => {
self.replicas[index as usize].on_event(event, timer)?
(Addr::Replica(index), event) => {
let replica = &mut self.replicas[index as usize];
match event {
TimerData::Progress(event) => replica.on_event(event, timer)?,
TimerData::DoViewChange(event) => replica.on_event(event, timer)?,
_ => anyhow::bail!("unimplemented"),
}
}
_ => anyhow::bail!("unimplemented"),
}
Expand Down

0 comments on commit ab3c6f7

Please sign in to comment.