Skip to content

Commit

Permalink
feat(consensus): add advance_to_round
Browse files Browse the repository at this point in the history
  • Loading branch information
asmaastarkware committed Jul 28, 2024
1 parent 4851640 commit 1e92acd
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 74 deletions.
18 changes: 11 additions & 7 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
context: &mut ContextT,
) -> Result<Option<Decision<BlockT>>, ConsensusError> {
info!("Starting consensus with validators {:?}", self.validators);
let leader_fn = |_round: Round| -> ValidatorId {
context.proposer(&self.validators.clone(), self.height)
};
let leader_fn =
|_round: Round| -> ValidatorId { context.proposer(&self.validators, self.height) };
let events = self.state_machine.start(&leader_fn);
self.handle_state_machine_events(context, events).await
}
Expand Down Expand Up @@ -121,7 +120,9 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
let sm_proposal = StateMachineEvent::Proposal(Some(block.id()), ROUND_ZERO);
// TODO(matan): Handle multiple rounds.
self.proposals.insert(ROUND_ZERO, block);
let sm_events = self.state_machine.handle_event(sm_proposal);
let leader_fn =
|_round: Round| -> ValidatorId { context.proposer(&self.validators, self.height) };
let sm_events = self.state_machine.handle_event(sm_proposal, &leader_fn);
self.handle_state_machine_events(context, sm_events).await
}

Expand Down Expand Up @@ -169,7 +170,9 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
}

votes.insert((ROUND_ZERO, vote.voter), vote);
let sm_events = self.state_machine.handle_event(sm_vote);
let leader_fn =
|_round: Round| -> ValidatorId { context.proposer(&self.validators, self.height) };
let sm_events = self.state_machine.handle_event(sm_vote, &leader_fn);
self.handle_state_machine_events(context, sm_events).await
}

Expand Down Expand Up @@ -242,8 +245,9 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
fin_sender.send(id).expect("Failed to send ProposalFin to Peering.");
let old = self.proposals.insert(round, block);
assert!(old.is_none(), "There should be no entry for this round.");

self.state_machine.handle_event(StateMachineEvent::GetProposal(Some(id), round))
let leader_fn =
|_round: Round| -> ValidatorId { context.proposer(&self.validators, self.height) };
self.state_machine.handle_event(StateMachineEvent::GetProposal(Some(id), round), &leader_fn)
}

#[instrument(skip_all)]
Expand Down
196 changes: 142 additions & 54 deletions crates/sequencing/papyrus_consensus/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,11 @@ impl StateMachine {
/// Starts the state machine, effectively calling `StartRound(0)` from the paper. This is
/// needed to trigger the first leader to propose.
/// See [`GetProposal`](StateMachineEvent::GetProposal)
pub fn start(
&mut self,
leader_fn: &impl Fn(Round) -> ValidatorId,
) -> VecDeque<StateMachineEvent> {
if self.id == leader_fn(self.round) {
self.awaiting_get_proposal = true;
// TODO(matan): Support re-proposing validValue.
return VecDeque::from([StateMachineEvent::GetProposal(None, self.round)]);
}
VecDeque::from([])
pub fn start<LeaderFn>(&mut self, leader_fn: &LeaderFn) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
self.advance_to_round(0, leader_fn)
}

/// Process the incoming event.
Expand All @@ -104,7 +99,14 @@ impl StateMachine {
/// events back to the state machine, as it makes sure to handle them before returning.
// This means that the StateMachine handles events the same regardless of whether it was sent by
// self or a peer. This is in line with the Algorithm 1 in the paper and keeps the code simpler.
pub fn handle_event(&mut self, event: StateMachineEvent) -> VecDeque<StateMachineEvent> {
pub fn handle_event<LeaderFn>(
&mut self,
event: StateMachineEvent,
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
trace!("Handling event: {:?}", event);
// Mimic LOC 18 in the paper; the state machine doesn't
// handle any events until `getValue` completes.
Expand All @@ -124,18 +126,22 @@ impl StateMachine {

// The events queue only maintains state while we are waiting for a proposal.
let events_queue = std::mem::take(&mut self.events_queue);
self.handle_enqueued_events(events_queue)
self.handle_enqueued_events(events_queue, leader_fn)
}

fn handle_enqueued_events(
fn handle_enqueued_events<LeaderFn>(
&mut self,
mut events_queue: VecDeque<StateMachineEvent>,
) -> VecDeque<StateMachineEvent> {
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
let mut output_events = VecDeque::new();
while let Some(event) = events_queue.pop_front() {
// Handle a specific event and then decide which of the output events should also be
// sent to self.
for e in self.handle_event_internal(event) {
for e in self.handle_event_internal(event, leader_fn) {
match e {
StateMachineEvent::Proposal(_, _)
| StateMachineEvent::Prevote(_, _)
Expand All @@ -154,17 +160,26 @@ impl StateMachine {
output_events
}

fn handle_event_internal(&mut self, event: StateMachineEvent) -> VecDeque<StateMachineEvent> {
fn handle_event_internal<LeaderFn>(
&mut self,
event: StateMachineEvent,
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
match event {
StateMachineEvent::GetProposal(block_hash, round) => {
self.handle_get_proposal(block_hash, round)
}
StateMachineEvent::Proposal(block_hash, round) => {
self.handle_proposal(block_hash, round)
self.handle_proposal(block_hash, round, leader_fn)
}
StateMachineEvent::Prevote(block_hash, round) => {
self.handle_prevote(block_hash, round, leader_fn)
}
StateMachineEvent::Prevote(block_hash, round) => self.handle_prevote(block_hash, round),
StateMachineEvent::Precommit(block_hash, round) => {
self.handle_precommit(block_hash, round)
self.handle_precommit(block_hash, round, leader_fn)
}
StateMachineEvent::Decision(_, _) => {
unimplemented!(
Expand All @@ -188,114 +203,187 @@ impl StateMachine {
}

// A proposal from a peer (or self) node.
fn handle_proposal(
fn handle_proposal<LeaderFn>(
&mut self,
block_hash: Option<BlockHash>,
round: u32,
) -> VecDeque<StateMachineEvent> {
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
let old = self.proposals.insert(round, block_hash);
assert!(old.is_none(), "SHC should handle conflicts & replays");
self.process_proposal(block_hash, round, leader_fn)
}

fn process_proposal<LeaderFn>(
&mut self,
block_hash: Option<BlockHash>,
round: u32,
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
if self.step != Step::Propose {
return VecDeque::new();
}

let mut output = VecDeque::from([StateMachineEvent::Prevote(block_hash, round)]);
output.append(&mut self.advance_to_step(Step::Prevote));
output.append(&mut self.advance_to_step(Step::Prevote, leader_fn));
output
}

// A prevote from a peer (or self) node.
fn handle_prevote(
fn handle_prevote<LeaderFn>(
&mut self,
block_hash: Option<BlockHash>,
round: u32,
) -> VecDeque<StateMachineEvent> {
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
let prevote_count = self.prevotes.entry(round).or_default().entry(block_hash).or_insert(0);
// TODO(matan): Use variable weight.
*prevote_count += 1;

if self.step != Step::Prevote || round != self.round {
return VecDeque::new();
}
self.check_prevote_quorum(round)
self.check_prevote_quorum(round, leader_fn)
}

// A precommit from a peer (or self) node.
fn handle_precommit(
fn handle_precommit<LeaderFn>(
&mut self,
block_hash: Option<BlockHash>,
round: u32,
) -> VecDeque<StateMachineEvent> {
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
let precommit_count =
self.precommits.entry(round).or_default().entry(block_hash).or_insert(0);
// TODO(matan): Use variable weight.
*precommit_count += 1;

self.check_precommit_quorum(round)
self.check_precommit_quorum(round, leader_fn)
}

fn advance_to_step(&mut self, step: Step) -> VecDeque<StateMachineEvent> {
fn advance_to_step<LeaderFn>(
&mut self,
step: Step,
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
self.step = step;
// Check for an existing quorum in case messages arrived out of order.
match self.step {
Step::Propose => {
unimplemented!("Handled by `advance_round`")
}
Step::Prevote => self.check_prevote_quorum(self.round),
Step::Precommit => self.check_precommit_quorum(self.round),
Step::Propose => unreachable!("Advancing to Propose is done by advancing rounds"),
Step::Prevote => self.check_prevote_quorum(self.round, leader_fn),
Step::Precommit => self.check_precommit_quorum(self.round, leader_fn),
}
}

fn check_prevote_quorum(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
fn check_prevote_quorum<LeaderFn>(
&mut self,
round: u32,
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
assert_eq!(round, self.round, "check_prevote_quorum is only called for the current round");
let Some((block_hash, count)) = leading_vote(&self.prevotes, round) else {
return VecDeque::new();
};
if *count < self.quorum {
return VecDeque::new();
}
if block_hash.is_none() {
return self.send_precommit(*block_hash, round, leader_fn);
}
let Some(proposed_value) = self.proposals.get(&round) else {
return VecDeque::new();
};
// TODO(matan): Handle this due to malicious proposer.
assert_eq!(proposed_value, block_hash, "Proposal should match quorum.");

if *count < self.quorum {
return VecDeque::new();
if proposed_value != block_hash {
// TODO(matan): This can be caused by a malicious leader double proposing.
panic!("Proposal does not match quorum.");
}
self.send_precommit(*block_hash, round)

self.send_precommit(*block_hash, round, leader_fn)
}

fn check_precommit_quorum(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
fn check_precommit_quorum<LeaderFn>(
&mut self,
round: u32,
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
let Some((block_hash, count)) = leading_vote(&self.precommits, round) else {
return VecDeque::new();
};
if *count < self.quorum {
return VecDeque::new();
}
if block_hash.is_none() && round == self.round {
return self.advance_to_round(round + 1, leader_fn);
}
let Some(proposed_value) = self.proposals.get(&round) else {
return VecDeque::new();
};
// TODO(matan): Handle this due to malicious proposer.
assert_eq!(proposed_value, block_hash, "Proposal should match quorum.");

if *count < self.quorum {
return VecDeque::new();
if proposed_value != block_hash {
// TODO(matan): This can be caused by a malicious leader double proposing.
panic!("Proposal does not match quorum.");
}
if let Some(block_hash) = block_hash {
VecDeque::from([StateMachineEvent::Decision(*block_hash, round)])
} else if round == self.round {
self.advance_to_round(round + 1)
} else {
// NIL quorum reached on a different round.
VecDeque::new()
}
}

fn send_precommit(
fn send_precommit<LeaderFn>(
&mut self,
block_hash: Option<BlockHash>,
round: u32,
) -> VecDeque<StateMachineEvent> {
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
let mut output = VecDeque::from([StateMachineEvent::Precommit(block_hash, round)]);
output.append(&mut self.advance_to_step(Step::Precommit));
output.append(&mut self.advance_to_step(Step::Precommit, leader_fn));
output
}

fn advance_to_round(&mut self, _round: Round) -> VecDeque<StateMachineEvent> {
todo!()
fn advance_to_round<LeaderFn>(
&mut self,
round: u32,
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
self.round = round;
self.step = Step::Propose;
if self.id == leader_fn(self.round) {
self.awaiting_get_proposal = true;
// TODO(matan): Support re-proposing validValue.
return VecDeque::from([StateMachineEvent::GetProposal(None, self.round)]);
}
let Some(proposal) = self.proposals.get(&round) else {
return VecDeque::new();
};
self.process_proposal(*proposal, round, leader_fn)
}
}

Expand Down
Loading

0 comments on commit 1e92acd

Please sign in to comment.