diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index 9e2f6104e8..4e5a553951 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -57,9 +57,8 @@ impl SingleHeightConsensus { context: &mut ContextT, ) -> Result>, ConsensusError> { info!("Starting consensus with validators {:?}", self.validators); - let leader_fn = |_round: Round| -> ValidatorId { - context.proposer(&self.validators.clone(), self.height) - }; + let leader_fn = + |_round: Round| -> ValidatorId { context.proposer(&self.validators, self.height) }; let events = self.state_machine.start(&leader_fn); self.handle_state_machine_events(context, events).await } @@ -121,7 +120,9 @@ impl SingleHeightConsensus { let sm_proposal = StateMachineEvent::Proposal(Some(block.id()), ROUND_ZERO); // TODO(matan): Handle multiple rounds. self.proposals.insert(ROUND_ZERO, block); - let sm_events = self.state_machine.handle_event(sm_proposal); + let leader_fn = + |_round: Round| -> ValidatorId { context.proposer(&self.validators, self.height) }; + let sm_events = self.state_machine.handle_event(sm_proposal, &leader_fn); self.handle_state_machine_events(context, sm_events).await } @@ -169,7 +170,9 @@ impl SingleHeightConsensus { } votes.insert((ROUND_ZERO, vote.voter), vote); - let sm_events = self.state_machine.handle_event(sm_vote); + let leader_fn = + |_round: Round| -> ValidatorId { context.proposer(&self.validators, self.height) }; + let sm_events = self.state_machine.handle_event(sm_vote, &leader_fn); self.handle_state_machine_events(context, sm_events).await } @@ -242,8 +245,9 @@ impl SingleHeightConsensus { fin_sender.send(id).expect("Failed to send ProposalFin to Peering."); let old = self.proposals.insert(round, block); assert!(old.is_none(), "There should be no entry for this round."); - - self.state_machine.handle_event(StateMachineEvent::GetProposal(Some(id), round)) + let leader_fn = + |_round: Round| -> ValidatorId { context.proposer(&self.validators, self.height) }; + self.state_machine.handle_event(StateMachineEvent::GetProposal(Some(id), round), &leader_fn) } #[instrument(skip_all)] diff --git a/crates/sequencing/papyrus_consensus/src/state_machine.rs b/crates/sequencing/papyrus_consensus/src/state_machine.rs index 7d362571e0..81a2b4bf80 100644 --- a/crates/sequencing/papyrus_consensus/src/state_machine.rs +++ b/crates/sequencing/papyrus_consensus/src/state_machine.rs @@ -83,16 +83,11 @@ impl StateMachine { /// Starts the state machine, effectively calling `StartRound(0)` from the paper. This is /// needed to trigger the first leader to propose. /// See [`GetProposal`](StateMachineEvent::GetProposal) - pub fn start( - &mut self, - leader_fn: &impl Fn(Round) -> ValidatorId, - ) -> VecDeque { - if self.id == leader_fn(self.round) { - self.awaiting_get_proposal = true; - // TODO(matan): Support re-proposing validValue. - return VecDeque::from([StateMachineEvent::GetProposal(None, self.round)]); - } - VecDeque::from([]) + pub fn start(&mut self, leader_fn: &LeaderFn) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { + self.advance_to_round(0, leader_fn) } /// Process the incoming event. @@ -104,7 +99,14 @@ impl StateMachine { /// events back to the state machine, as it makes sure to handle them before returning. // This means that the StateMachine handles events the same regardless of whether it was sent by // self or a peer. This is in line with the Algorithm 1 in the paper and keeps the code simpler. - pub fn handle_event(&mut self, event: StateMachineEvent) -> VecDeque { + pub fn handle_event( + &mut self, + event: StateMachineEvent, + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { trace!("Handling event: {:?}", event); // Mimic LOC 18 in the paper; the state machine doesn't // handle any events until `getValue` completes. @@ -124,18 +126,22 @@ impl StateMachine { // The events queue only maintains state while we are waiting for a proposal. let events_queue = std::mem::take(&mut self.events_queue); - self.handle_enqueued_events(events_queue) + self.handle_enqueued_events(events_queue, leader_fn) } - fn handle_enqueued_events( + fn handle_enqueued_events( &mut self, mut events_queue: VecDeque, - ) -> VecDeque { + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { let mut output_events = VecDeque::new(); while let Some(event) = events_queue.pop_front() { // Handle a specific event and then decide which of the output events should also be // sent to self. - for e in self.handle_event_internal(event) { + for e in self.handle_event_internal(event, leader_fn) { match e { StateMachineEvent::Proposal(_, _) | StateMachineEvent::Prevote(_, _) @@ -154,17 +160,26 @@ impl StateMachine { output_events } - fn handle_event_internal(&mut self, event: StateMachineEvent) -> VecDeque { + fn handle_event_internal( + &mut self, + event: StateMachineEvent, + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { match event { StateMachineEvent::GetProposal(block_hash, round) => { self.handle_get_proposal(block_hash, round) } StateMachineEvent::Proposal(block_hash, round) => { - self.handle_proposal(block_hash, round) + self.handle_proposal(block_hash, round, leader_fn) + } + StateMachineEvent::Prevote(block_hash, round) => { + self.handle_prevote(block_hash, round, leader_fn) } - StateMachineEvent::Prevote(block_hash, round) => self.handle_prevote(block_hash, round), StateMachineEvent::Precommit(block_hash, round) => { - self.handle_precommit(block_hash, round) + self.handle_precommit(block_hash, round, leader_fn) } StateMachineEvent::Decision(_, _) => { unimplemented!( @@ -188,28 +203,48 @@ impl StateMachine { } // A proposal from a peer (or self) node. - fn handle_proposal( + fn handle_proposal( &mut self, block_hash: Option, round: u32, - ) -> VecDeque { + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { let old = self.proposals.insert(round, block_hash); assert!(old.is_none(), "SHC should handle conflicts & replays"); + self.process_proposal(block_hash, round, leader_fn) + } + + fn process_proposal( + &mut self, + block_hash: Option, + round: u32, + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { if self.step != Step::Propose { return VecDeque::new(); } let mut output = VecDeque::from([StateMachineEvent::Prevote(block_hash, round)]); - output.append(&mut self.advance_to_step(Step::Prevote)); + output.append(&mut self.advance_to_step(Step::Prevote, leader_fn)); output } // A prevote from a peer (or self) node. - fn handle_prevote( + fn handle_prevote( &mut self, block_hash: Option, round: u32, - ) -> VecDeque { + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { let prevote_count = self.prevotes.entry(round).or_default().entry(block_hash).or_insert(0); // TODO(matan): Use variable weight. *prevote_count += 1; @@ -217,85 +252,138 @@ impl StateMachine { if self.step != Step::Prevote || round != self.round { return VecDeque::new(); } - self.check_prevote_quorum(round) + self.check_prevote_quorum(round, leader_fn) } // A precommit from a peer (or self) node. - fn handle_precommit( + fn handle_precommit( &mut self, block_hash: Option, round: u32, - ) -> VecDeque { + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { let precommit_count = self.precommits.entry(round).or_default().entry(block_hash).or_insert(0); // TODO(matan): Use variable weight. *precommit_count += 1; - self.check_precommit_quorum(round) + self.check_precommit_quorum(round, leader_fn) } - fn advance_to_step(&mut self, step: Step) -> VecDeque { + fn advance_to_step( + &mut self, + step: Step, + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { self.step = step; // Check for an existing quorum in case messages arrived out of order. match self.step { - Step::Propose => { - unimplemented!("Handled by `advance_round`") - } - Step::Prevote => self.check_prevote_quorum(self.round), - Step::Precommit => self.check_precommit_quorum(self.round), + Step::Propose => unreachable!("Advancing to Propose is done by advancing rounds"), + Step::Prevote => self.check_prevote_quorum(self.round, leader_fn), + Step::Precommit => self.check_precommit_quorum(self.round, leader_fn), } } - fn check_prevote_quorum(&mut self, round: u32) -> VecDeque { + fn check_prevote_quorum( + &mut self, + round: u32, + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { + assert_eq!(round, self.round, "check_prevote_quorum is only called for the current round"); let Some((block_hash, count)) = leading_vote(&self.prevotes, round) else { return VecDeque::new(); }; + if *count < self.quorum { + return VecDeque::new(); + } + if block_hash.is_none() { + return self.send_precommit(*block_hash, round, leader_fn); + } let Some(proposed_value) = self.proposals.get(&round) else { return VecDeque::new(); }; - // TODO(matan): Handle this due to malicious proposer. - assert_eq!(proposed_value, block_hash, "Proposal should match quorum."); - - if *count < self.quorum { - return VecDeque::new(); + if proposed_value != block_hash { + // TODO(matan): This can be caused by a malicious leader double proposing. + panic!("Proposal does not match quorum."); } - self.send_precommit(*block_hash, round) + + self.send_precommit(*block_hash, round, leader_fn) } - fn check_precommit_quorum(&mut self, round: u32) -> VecDeque { + fn check_precommit_quorum( + &mut self, + round: u32, + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { let Some((block_hash, count)) = leading_vote(&self.precommits, round) else { return VecDeque::new(); }; + if *count < self.quorum { + return VecDeque::new(); + } + if block_hash.is_none() && round == self.round { + return self.advance_to_round(round + 1, leader_fn); + } let Some(proposed_value) = self.proposals.get(&round) else { return VecDeque::new(); }; - // TODO(matan): Handle this due to malicious proposer. - assert_eq!(proposed_value, block_hash, "Proposal should match quorum."); - - if *count < self.quorum { - return VecDeque::new(); + if proposed_value != block_hash { + // TODO(matan): This can be caused by a malicious leader double proposing. + panic!("Proposal does not match quorum."); } if let Some(block_hash) = block_hash { VecDeque::from([StateMachineEvent::Decision(*block_hash, round)]) - } else if round == self.round { - self.advance_to_round(round + 1) } else { + // NIL quorum reached on a different round. VecDeque::new() } } - fn send_precommit( + fn send_precommit( &mut self, block_hash: Option, round: u32, - ) -> VecDeque { + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { let mut output = VecDeque::from([StateMachineEvent::Precommit(block_hash, round)]); - output.append(&mut self.advance_to_step(Step::Precommit)); + output.append(&mut self.advance_to_step(Step::Precommit, leader_fn)); output } - fn advance_to_round(&mut self, _round: Round) -> VecDeque { - todo!() + fn advance_to_round( + &mut self, + round: u32, + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { + self.round = round; + self.step = Step::Propose; + if self.id == leader_fn(self.round) { + self.awaiting_get_proposal = true; + // TODO(matan): Support re-proposing validValue. + return VecDeque::from([StateMachineEvent::GetProposal(None, self.round)]); + } + let Some(proposal) = self.proposals.get(&round) else { + return VecDeque::new(); + }; + self.process_proposal(*proposal, round, leader_fn) } } diff --git a/crates/sequencing/papyrus_consensus/src/state_machine_test.rs b/crates/sequencing/papyrus_consensus/src/state_machine_test.rs index 090a11f941..d17b464f4b 100644 --- a/crates/sequencing/papyrus_consensus/src/state_machine_test.rs +++ b/crates/sequencing/papyrus_consensus/src/state_machine_test.rs @@ -24,26 +24,30 @@ fn events_arrive_in_ideal_order(is_proposer: bool) { let mut events = state_machine.start(&leader_fn); if is_proposer { assert_eq!(events.pop_front().unwrap(), StateMachineEvent::GetProposal(None, ROUND)); - events = state_machine.handle_event(StateMachineEvent::GetProposal(BLOCK_HASH, ROUND)); + events = state_machine + .handle_event(StateMachineEvent::GetProposal(BLOCK_HASH, ROUND), &leader_fn); assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Proposal(BLOCK_HASH, ROUND)); } else { assert!(events.is_empty(), "{:?}", events); - events = state_machine.handle_event(StateMachineEvent::Proposal(BLOCK_HASH, ROUND)); + events = + state_machine.handle_event(StateMachineEvent::Proposal(BLOCK_HASH, ROUND), &leader_fn); } assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Prevote(BLOCK_HASH, ROUND)); assert!(events.is_empty(), "{:?}", events); - events = state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND)); + events = state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND), &leader_fn); assert!(events.is_empty(), "{:?}", events); - events = state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND)); + events = state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND), &leader_fn); assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Precommit(BLOCK_HASH, ROUND)); assert!(events.is_empty(), "{:?}", events); - events = state_machine.handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND)); + events = + state_machine.handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND), &leader_fn); assert!(events.is_empty(), "{:?}", events); - events = state_machine.handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND)); + events = + state_machine.handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND), &leader_fn); assert_eq!( events.pop_front().unwrap(), StateMachineEvent::Decision(BLOCK_HASH.unwrap(), ROUND) @@ -60,16 +64,31 @@ fn validator_receives_votes_first() { assert!(events.is_empty(), "{:?}", events); // Receives votes from all the other nodes first (more than minimum for a quorum). - events.append(&mut state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND))); - events.append(&mut state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND))); - events.append(&mut state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND))); - events.append(&mut state_machine.handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND))); - events.append(&mut state_machine.handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND))); - events.append(&mut state_machine.handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND))); + events.append( + &mut state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND), &leader_fn), + ); + events.append( + &mut state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND), &leader_fn), + ); + events.append( + &mut state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND), &leader_fn), + ); + events.append( + &mut state_machine + .handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND), &leader_fn), + ); + events.append( + &mut state_machine + .handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND), &leader_fn), + ); + events.append( + &mut state_machine + .handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND), &leader_fn), + ); assert!(events.is_empty(), "{:?}", events); // Finally the proposal arrives. - events = state_machine.handle_event(StateMachineEvent::Proposal(BLOCK_HASH, ROUND)); + events = state_machine.handle_event(StateMachineEvent::Proposal(BLOCK_HASH, ROUND), &leader_fn); assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Prevote(BLOCK_HASH, ROUND)); assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Precommit(BLOCK_HASH, ROUND)); assert_eq!(