Skip to content

Commit

Permalink
First draft of PBFT view change protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Apr 9, 2024
1 parent ab3c6f7 commit 9a98cae
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 35 deletions.
16 changes: 7 additions & 9 deletions src/event/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,14 @@ impl Timer {

pub fn events(&self) -> impl Iterator<Item = TimerId> + '_ {
let mut prev_period = None;
self.events
.iter()
.take_while(move |(_, period)| {
if let Some(prev_period) = prev_period.replace(*period) {
*period < prev_period
} else {
true
self.events.iter().map_while(move |(id, period)| {
if let Some(prev_period) = prev_period.replace(*period) {
if *period >= prev_period {
return None;
}
})
.map(|(id, _)| TimerId(*id))
}
Some(TimerId(*id))
})
}

fn unset(&mut self, TimerId(id): &TimerId) -> anyhow::Result<(u32, Duration)> {
Expand Down
245 changes: 231 additions & 14 deletions src/pbft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct ViewChange {
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct NewView {
view_num: u32,
logs: Quorum<ViewChange>,
view_changes: Quorum<ViewChange>,
pre_prepares: Vec<Verifiable<PrePrepare>>,
}

Expand Down Expand Up @@ -211,6 +211,10 @@ pub trait SendCryptoEvent<A>:
+ SendEvent<Verified<Prepare>>
+ SendEvent<Signed<Commit>>
+ SendEvent<Verified<Commit>>
+ SendEvent<Signed<ViewChange>>
+ SendEvent<Verified<ViewChange>>
+ SendEvent<Signed<NewView>>
+ SendEvent<Verified<NewView>>
{
}
impl<
Expand All @@ -219,7 +223,11 @@ impl<
+ SendEvent<Signed<Prepare>>
+ SendEvent<Verified<Prepare>>
+ SendEvent<Signed<Commit>>
+ SendEvent<Verified<Commit>>,
+ SendEvent<Verified<Commit>>
+ SendEvent<Signed<ViewChange>>
+ SendEvent<Verified<ViewChange>>
+ SendEvent<Signed<NewView>>
+ SendEvent<Verified<NewView>>,
A,
> SendCryptoEvent<A> for T
{
Expand Down Expand Up @@ -289,7 +297,6 @@ struct LogEntry<A> {
requests: Vec<Request<A>>,
prepares: Quorum<Prepare>,
commits: Quorum<Commit>,

timer_id: Option<TimerId>,
}

Expand Down Expand Up @@ -335,6 +342,15 @@ impl<N, CN, CW, S, A, M> Replica<N, CN, CW, S, A, M> {
(self.view_num as usize % self.num_replica) == self.id as usize
}

fn view_change(&self) -> bool {
self.view_num != 0
&& self
.view_changes
.get(&self.view_num)
.map(|quorum| quorum.len() < self.num_replica - self.num_faulty)
.unwrap_or(true)
}

const NUM_CONCURRENT_PRE_PREPARE: u32 = 1;
}

Expand Down Expand Up @@ -369,6 +385,9 @@ impl<M: ReplicaCommon> OnEvent<Recv<Request<M::A>>> for Replica<M::N, M::CN, M::
Recv(request): Recv<Request<M::A>>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()> {
if self.view_change() {
return Ok(());
}
match self.replies.get(&request.client_id) {
Some((seq, _)) if *seq > request.seq => return Ok(()),
Some((seq, reply)) if *seq == request.seq => {
Expand Down Expand Up @@ -484,7 +503,7 @@ impl<M: ReplicaCommon> OnEvent<Recv<(Verifiable<PrePrepare>, Vec<Request<M::A>>)
Recv((pre_prepare, requests)): Recv<(Verifiable<PrePrepare>, Vec<Request<M::A>>)>,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()> {
if pre_prepare.view_num != self.view_num {
if pre_prepare.view_num != self.view_num || self.view_change() {
if pre_prepare.view_num > self.view_num {
todo!("state transfer to enter view")
}
Expand Down Expand Up @@ -603,7 +622,7 @@ impl<M: ReplicaCommon> OnEvent<Recv<Verifiable<Prepare>>>

impl<M: ReplicaCommon> Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
fn submit_prepare(&mut self, prepare: Verifiable<Prepare>) -> anyhow::Result<bool> {
if prepare.view_num != self.view_num {
if prepare.view_num != self.view_num || self.view_change() {
if prepare.view_num > self.view_num {
todo!("state transfer to enter view")
}
Expand Down Expand Up @@ -731,7 +750,7 @@ impl<M: ReplicaCommon> OnEvent<Recv<Verifiable<Commit>>>

impl<M: ReplicaCommon> Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
fn submit_commit(&mut self, commit: Verifiable<Commit>) -> anyhow::Result<bool> {
if commit.view_num != self.view_num {
if commit.view_num != self.view_num || self.view_change() {
if commit.view_num > self.view_num {
todo!("state transfer to enter view")
}
Expand Down Expand Up @@ -869,16 +888,210 @@ impl<M: ReplicaCommon> OnEvent<DoViewChange> for Replica<M::N, M::CN, M::CW, M::
fn on_event(
&mut self,
DoViewChange(view_num): DoViewChange,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
let log = self
.log
.iter()
.filter_map(|entry| {
if entry.prepares.is_empty() {
None
} else {
Some((entry.pre_prepare.clone()?, entry.prepares.clone()))
}
})
.collect();
let view_change = ViewChange {
view_num,
log,
replica_id: self.id,
};
self.crypto_worker.submit(Box::new(|crypto, sender| {
sender.send(Signed(crypto.sign(view_change)))
}))
}
}

impl<M: ReplicaCommon> OnEvent<Signed<ViewChange>> for Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
fn on_event(
&mut self,
Signed(view_change): Signed<ViewChange>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
self.net.send(All, view_change.clone())?;
self.insert_view_change(view_change, timer)
}
}

fn verify_view_change(
crypto: &Crypto,
view_change: &Verifiable<ViewChange>,
num_replica: usize,
num_faulty: usize,
) -> anyhow::Result<()> {
crypto.verify(view_change.replica_id, view_change)?;
for (pre_prepare, prepares) in &view_change.log {
anyhow::ensure!(prepares.len() >= num_replica - num_faulty);
crypto.verify(pre_prepare.view_num as usize % num_replica, pre_prepare)?;
for prepare in prepares.values() {
anyhow::ensure!(prepare.digest == pre_prepare.digest);
crypto.verify(prepare.replica_id, prepare)?
}
}
Ok(())
}

impl<M: ReplicaCommon> OnEvent<Recv<Verifiable<ViewChange>>>
for Replica<M::N, M::CN, M::CW, M::S, M::A, M>
{
fn on_event(
&mut self,
Recv(view_change): Recv<Verifiable<ViewChange>>,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
if view_change.view_num <= self.view_num {
return Ok(());
}
let num_replica = self.num_replica;
let num_faulty = self.num_faulty;
self.crypto_worker.submit(Box::new(move |crypto, sender| {
if verify_view_change(crypto, &view_change, num_replica, num_faulty).is_ok() {
sender.send(Verified(view_change))
} else {
Ok(())
}
}))
}
}

impl<M: ReplicaCommon> OnEvent<Verified<ViewChange>>
for Replica<M::N, M::CN, M::CW, M::S, M::A, M>
{
fn on_event(
&mut self,
Verified(view_change): Verified<ViewChange>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
timer.unset(
self.do_view_change_timer
.take()
.ok_or(anyhow::anyhow!("missing view change timer"))?,
)?;
if view_change.view_num > self.view_num {
self.insert_view_change(view_change, timer)?
}
Ok(())
}
}

impl<M: ReplicaCommon> Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
fn insert_view_change(
&mut self,
view_change: Verifiable<ViewChange>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()> {
let quorum = self.view_changes.entry(view_change.view_num).or_default();
quorum.insert(view_change.replica_id, view_change.clone());
if quorum.len() >= self.num_replica - self.num_faulty {
assert!(self.view_num < view_change.view_num);
self.view_num = view_change.view_num;
if let Some(timer_id) = self.do_view_change_timer.take() {
timer.unset(timer_id)?
}
let view_changes = quorum.clone();
if self.is_primary() {
let new_view = NewView {
view_num: self.view_num,
view_changes,
pre_prepares: Default::default(),
};
self.crypto_worker.submit(Box::new(move |crypto, sender| {
// TODO add PrePrepares
sender.send(Signed(crypto.sign(new_view)))
}))?
} else {
// TODO set timer
}
}
Ok(())
}
}

impl<M: ReplicaCommon> OnEvent<Signed<NewView>> for Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
fn on_event(
&mut self,
Signed(new_view): Signed<NewView>,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
self.net.send(All, new_view)
// TODO do enter view
}
}

impl<M: ReplicaCommon> OnEvent<Recv<Verifiable<NewView>>>
for Replica<M::N, M::CN, M::CW, M::S, M::A, M>
{
fn on_event(
&mut self,
Recv(new_view): Recv<Verifiable<NewView>>,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
if self.view_num > new_view.view_num
|| self.view_num == new_view.view_num && !self.view_change()
{
return Ok(());
}
let num_replica = self.num_replica;
let num_faulty = self.num_faulty;
self.crypto_worker.submit(Box::new(move |crypto, sender| {
let do_verify = || {
anyhow::ensure!(new_view.view_changes.len() >= num_replica - num_faulty);
for view_change in new_view.view_changes.values() {
verify_view_change(crypto, view_change, num_replica, num_faulty)?
}
// TODO PrePrepare semantic
anyhow::Result::<_>::Ok(())
};
if do_verify().is_ok() {
sender.send(Verified(new_view))
} else {
Ok(())
}
}))
}
}

impl<M: ReplicaCommon> OnEvent<Verified<NewView>> for Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
fn on_event(
&mut self,
Verified(new_view): Verified<NewView>,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
if self.view_num > new_view.view_num
|| self.view_num == new_view.view_num && !self.view_change()
{
return Ok(());
}
let new_view = new_view.into_inner();
self.view_changes
.insert(new_view.view_num, new_view.view_changes);
// TODO enter new view
Ok(())
}
}
Expand Down Expand Up @@ -909,13 +1122,17 @@ pub trait SendReplicaRecvEvent<A>:
+ SendEvent<Recv<(Verifiable<PrePrepare>, Vec<Request<A>>)>>
+ SendEvent<Recv<Verifiable<Prepare>>>
+ SendEvent<Recv<Verifiable<Commit>>>
+ SendEvent<Recv<Verifiable<ViewChange>>>
+ SendEvent<Recv<Verifiable<NewView>>>
{
}
impl<
T: SendEvent<Recv<Request<A>>>
+ SendEvent<Recv<(Verifiable<PrePrepare>, Vec<Request<A>>)>>
+ SendEvent<Recv<Verifiable<Prepare>>>
+ SendEvent<Recv<Verifiable<Commit>>>,
+ SendEvent<Recv<Verifiable<Commit>>>
+ SendEvent<Recv<Verifiable<ViewChange>>>
+ SendEvent<Recv<Verifiable<NewView>>>,
A,
> SendReplicaRecvEvent<A> for T
{
Expand All @@ -930,8 +1147,8 @@ pub fn to_replica_on_buf<A: Addr>(
ToReplica::PrePrepare(message, requests) => sender.send(Recv((message, requests))),
ToReplica::Prepare(message) => sender.send(Recv(message)),
ToReplica::Commit(message) => sender.send(Recv(message)),
ToReplica::ViewChange(message) => todo!(),
ToReplica::NewView(message) => todo!(),
ToReplica::ViewChange(message) => sender.send(Recv(message)),
ToReplica::NewView(message) => sender.send(Recv(message)),
}
}

Expand Down
Loading

0 comments on commit 9a98cae

Please sign in to comment.