Skip to content

Commit

Permalink
chore(consensus): set proposer address in propose block input
Browse files Browse the repository at this point in the history
  • Loading branch information
ArniStarkware committed Nov 28, 2024
1 parent ef924ab commit 8049c64
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 63 deletions.
27 changes: 16 additions & 11 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mock! {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<Transaction>
) -> oneshot::Receiver<ProposalContentId>;
Expand All @@ -71,8 +72,12 @@ mock! {
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;

async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);

async fn set_height_and_round(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId
);
}
}

Expand All @@ -99,15 +104,15 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 1.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
.return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_receiver
})
.times(1);
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context.expect_broadcast().returning(move |_| Ok(()));

let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone());
Expand All @@ -119,7 +124,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 2.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
.return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_receiver
Expand All @@ -136,14 +141,14 @@ async fn run_consensus_sync() {
let mut context = MockTestContext::new();
let (decision_tx, decision_rx) = oneshot::channel();

context.expect_validate_proposal().return_once(move |_, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context.expect_broadcast().returning(move |_| Ok(()));
context.expect_decision_reached().return_once(move |block, votes| {
assert_eq!(block, BlockHash(Felt::TWO));
Expand Down Expand Up @@ -196,14 +201,14 @@ async fn run_consensus_sync_cancellation_safety() {
let (proposal_handled_tx, proposal_handled_rx) = oneshot::channel();
let (decision_tx, decision_rx) = oneshot::channel();

context.expect_validate_proposal().return_once(move |_, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context.expect_broadcast().with(eq(prevote(Some(Felt::ONE), 1, 0, *VALIDATOR_ID))).return_once(
move |_| {
proposal_handled_tx.send(()).unwrap();
Expand Down Expand Up @@ -267,8 +272,8 @@ async fn test_timeouts() {
send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_3)).await;

let mut context = MockTestContext::new();
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _| {
context.expect_set_height_and_round().returning(move |_, _, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_receiver
Expand Down
25 changes: 20 additions & 5 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,15 @@ impl SingleHeightConsensus {
context: &mut ContextT,
) -> Result<ShcReturn, ConsensusError> {
info!("Starting consensus with validators {:?}", self.validators);
context.set_height_and_round(self.height, self.state_machine.round()).await;
context
.set_height_and_round(self.height, self.state_machine.round(), ValidatorId::default())
.await;
let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) };
let events = self.state_machine.start(&leader_fn);
let ret = self.handle_state_machine_events(context, events).await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
context
.set_height_and_round(self.height, self.state_machine.round(), ValidatorId::default())
.await;
ret
}

Expand Down Expand Up @@ -240,11 +244,14 @@ impl SingleHeightConsensus {
.validate_proposal(
self.height,
init.round,
init.proposer,
self.timeouts.proposal_timeout,
p2p_messages_receiver,
)
.await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
context
.set_height_and_round(self.height, self.state_machine.round(), ValidatorId::default())
.await;
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver, fin_receiver)]))
}

Expand Down Expand Up @@ -272,7 +279,13 @@ impl SingleHeightConsensus {
}
ConsensusMessage::Vote(vote) => {
let ret = self.handle_vote(context, vote).await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
context
.set_height_and_round(
self.height,
self.state_machine.round(),
ValidatorId::default(),
)
.await;
ret
}
}
Expand Down Expand Up @@ -359,7 +372,9 @@ impl SingleHeightConsensus {
}
_ => unimplemented!("Unexpected event: {:?}", event),
};
context.set_height_and_round(self.height, self.state_machine.round()).await;
context
.set_height_and_round(self.height, self.state_machine.round(), ValidatorId::default())
.await;
ret
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async fn proposer() {
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context
.expect_broadcast()
.times(1)
Expand Down Expand Up @@ -174,12 +174,12 @@ async fn validator(repeat_proposal: bool) {
);

context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context
.expect_broadcast()
.times(1)
Expand Down Expand Up @@ -253,12 +253,12 @@ async fn vote_twice(same_vote: bool) {
);

context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context
.expect_broadcast()
.times(1) // Shows the repeat vote is ignored.
Expand Down Expand Up @@ -327,7 +327,7 @@ async fn rebroadcast_votes() {
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context
.expect_broadcast()
.times(1)
Expand Down Expand Up @@ -389,7 +389,7 @@ async fn repropose() {
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context
.expect_broadcast()
.times(1)
Expand Down
8 changes: 7 additions & 1 deletion crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mock! {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<u32>
) -> oneshot::Receiver<ProposalContentId>;
Expand All @@ -56,7 +57,12 @@ mock! {
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;

async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);
async fn set_height_and_round(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId
);
}
}

Expand Down
8 changes: 7 additions & 1 deletion crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub trait ConsensusContext {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<Self::ProposalChunk>,
) -> oneshot::Receiver<ProposalContentId>;
Expand Down Expand Up @@ -106,7 +107,12 @@ pub trait ConsensusContext {

/// Update the context with the current height and round.
/// Must be called at the beginning of each height.
async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);
async fn set_height_and_round(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
);
}

#[derive(PartialEq)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader};
use starknet_api::block::BlockNumber;
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use tracing::{debug, debug_span, info, warn, Instrument};

Expand Down Expand Up @@ -57,7 +56,7 @@ impl PapyrusConsensusContext {
Self {
storage_reader,
network_broadcast_client,
validators: (0..num_validators).map(ContractAddress::from).collect(),
validators: (0..num_validators).map(ValidatorId::from).collect(),
sync_broadcast_sender,
valid_proposals: Arc::new(Mutex::new(BTreeMap::new())),
}
Expand Down Expand Up @@ -141,6 +140,7 @@ impl ConsensusContext for PapyrusConsensusContext {
&mut self,
height: BlockNumber,
_round: Round,
_proposer: ValidatorId,
_timeout: Duration,
mut content: mpsc::Receiver<Transaction>,
) -> oneshot::Receiver<ProposalContentId> {
Expand Down Expand Up @@ -261,7 +261,12 @@ impl ConsensusContext for PapyrusConsensusContext {
Ok(())
}

async fn set_height_and_round(&mut self, _height: BlockNumber, _round: Round) {
async fn set_height_and_round(
&mut self,
_height: BlockNumber,
_round: Round,
_proposer: ValidatorId,
) {
// No-op
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_consensus::types::ConsensusContext;
use papyrus_consensus::types::{ConsensusContext, ValidatorId};
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
Expand All @@ -13,7 +13,6 @@ use papyrus_storage::header::HeaderStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
use papyrus_test_utils::get_test_block;
use starknet_api::block::{Block, BlockHash};
use starknet_api::core::ContractAddress;

use crate::papyrus_consensus_context::PapyrusConsensusContext;

Expand All @@ -29,7 +28,7 @@ async fn build_proposal() {
let proposal_init = ProposalInit {
height: block_number,
round: 0,
proposer: ContractAddress::default(),
proposer: ValidatorId::default(),
valid_round: None,
};
// TODO(Asmaa): Test proposal content.
Expand All @@ -51,7 +50,13 @@ async fn validate_proposal_success() {
validate_sender.close_channel();

let fin = papyrus_context
.validate_proposal(block_number, 0, Duration::MAX, validate_receiver)
.validate_proposal(
block_number,
0,
ValidatorId::default(),
Duration::MAX,
validate_receiver,
)
.await
.await
.unwrap();
Expand All @@ -72,7 +77,13 @@ async fn validate_proposal_fail() {
validate_sender.close_channel();

let fin = papyrus_context
.validate_proposal(block_number, 0, Duration::MAX, validate_receiver)
.validate_proposal(
block_number,
0,
ValidatorId::default(),
Duration::MAX,
validate_receiver,
)
.await
.await;
assert_eq!(fin, Err(oneshot::Canceled));
Expand Down
Loading

0 comments on commit 8049c64

Please sign in to comment.