Skip to content

Commit

Permalink
chore(starknet_consensus_manager): add proposal init into validate pr…
Browse files Browse the repository at this point in the history
…oposal input (#2408)
  • Loading branch information
ArniStarkware authored Dec 19, 2024
1 parent 24503a1 commit 093eb7d
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 155 deletions.
16 changes: 14 additions & 2 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ pub enum StreamMessageBody<T> {
Fin,
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct StreamMessage<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> {
pub message: StreamMessageBody<T>,
pub stream_id: u64,
pub message_id: u64,
}

/// This message must be sent first when proposing a new block.
#[derive(Default, Debug, Clone, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct ProposalInit {
/// The height of the consensus (block number).
pub height: BlockNumber,
Expand All @@ -72,6 +72,18 @@ pub struct ProposalInit {
pub proposer: ContractAddress,
}

impl Default for ProposalInit {
fn default() -> Self {
ProposalInit {
height: Default::default(),
round: Default::default(),
valid_round: Default::default(),
// TODO(Arni): Use DEFAULT_VALIDATOR_ID instead of 100.
proposer: ContractAddress::from(100_u64),
}
}
}

/// There is one or more batches of transactions in a proposed block.
#[derive(Debug, Clone, PartialEq)]
pub struct TransactionBatch {
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_protobuf/src/converters/consensus_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fn convert_proposal_init_to_vec_u8_and_back() {

let proposal_init = ProposalInit::get_test_instance(&mut rng);

let bytes_data: Vec<u8> = proposal_init.clone().into();
let bytes_data: Vec<u8> = proposal_init.into();
let res_data = ProposalInit::try_from(bytes_data).unwrap();
assert_eq!(proposal_init, res_data);
}
Expand Down
8 changes: 3 additions & 5 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ mock! {

async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<ProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down Expand Up @@ -116,7 +114,7 @@ async fn send_proposal(
fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt) {
context
.expect_validate_proposal()
.return_once(move |_, _, _, _, _| {
.return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
Expand Down Expand Up @@ -368,7 +366,7 @@ async fn test_timeouts() {

let mut context = MockTestContext::new();
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _, _| {
context.expect_validate_proposal().returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,7 @@ impl SingleHeightConsensus {
// twice in parallel. This could be caused by a network repeat or a malicious spam attack.
proposal_entry.insert(None);
let block_receiver = context
.validate_proposal(
self.height,
init.round,
init.proposer,
self.timeouts.proposal_timeout,
p2p_messages_receiver,
)
.validate_proposal(init, self.timeouts.proposal_timeout, p2p_messages_receiver)
.await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@ lazy_static! {
static ref VALIDATORS: Vec<ValidatorId> =
vec![*PROPOSER_ID, *VALIDATOR_ID_1, *VALIDATOR_ID_2, *VALIDATOR_ID_3];
static ref BLOCK: TestBlock = TestBlock { content: vec![1, 2, 3], id: BlockHash(Felt::ONE) };
static ref PROPOSAL_INIT: ProposalInit = ProposalInit {
height: BlockNumber(0),
round: 0,
proposer: *PROPOSER_ID,
valid_round: None
};
static ref PROPOSAL_INIT: ProposalInit =
ProposalInit { proposer: *PROPOSER_ID, ..Default::default() };
static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default();
static ref VALIDATE_PROPOSAL_EVENT: ShcEvent = ShcEvent::ValidateProposal(
StateMachineEvent::Proposal(Some(BLOCK.id), PROPOSAL_INIT.round, PROPOSAL_INIT.valid_round,),
Expand Down Expand Up @@ -71,7 +67,7 @@ async fn handle_proposal(
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(MockProposalPart(1)).await.unwrap();

shc.handle_proposal(context, PROPOSAL_INIT.clone(), content_receiver).await.unwrap()
shc.handle_proposal(context, *PROPOSAL_INIT, content_receiver).await.unwrap()
}

#[tokio::test]
Expand Down Expand Up @@ -173,7 +169,7 @@ async fn validator(repeat_proposal: bool) {
);

context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
Expand Down Expand Up @@ -253,7 +249,7 @@ async fn vote_twice(same_vote: bool) {
);

context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
Expand Down
21 changes: 4 additions & 17 deletions crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,7 @@ use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_types_core::felt::Felt;

use crate::types::{
ConsensusContext,
ConsensusError,
ProposalContentId,
Round,
ValidatorId,
DEFAULT_VALIDATOR_ID,
};
use crate::types::{ConsensusContext, ConsensusError, ProposalContentId, Round, ValidatorId};

/// Define a consensus block which can be used to enable auto mocking Context.
#[derive(Debug, PartialEq, Clone)]
Expand All @@ -36,11 +29,7 @@ impl From<ProposalInit> for MockProposalPart {
impl TryFrom<MockProposalPart> for ProposalInit {
type Error = ProtobufConversionError;
fn try_from(part: MockProposalPart) -> Result<Self, Self::Error> {
Ok(ProposalInit {
height: BlockNumber(part.0),
proposer: DEFAULT_VALIDATOR_ID.into(),
..Default::default()
})
Ok(ProposalInit { height: BlockNumber(part.0), ..Default::default() })
}
}

Expand Down Expand Up @@ -74,9 +63,7 @@ mock! {

async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<MockProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down Expand Up @@ -129,5 +116,5 @@ pub fn precommit(
})
}
pub fn proposal_init(height: u64, round: u32, proposer: ValidatorId) -> ProposalInit {
ProposalInit { height: BlockNumber(height), round, proposer, valid_round: None }
ProposalInit { height: BlockNumber(height), round, proposer, ..Default::default() }
}
4 changes: 1 addition & 3 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ pub trait ConsensusContext {
/// by ConsensusContext.
async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl ConsensusContext for PapyrusConsensusContext {
proposal_init: ProposalInit,
_timeout: Duration,
) -> oneshot::Receiver<ProposalContentId> {
let height = proposal_init.height;
let mut proposal_sender_sender = self.network_proposal_sender.clone();
let (fin_sender, fin_receiver) = oneshot::channel();

Expand All @@ -99,40 +100,32 @@ impl ConsensusContext for PapyrusConsensusContext {
// TODO(dvir): consider fix this for the case of reverts. If between the check that
// the block in storage and to getting the transaction was a revert
// this flow will fail.
wait_for_block(&storage_reader, proposal_init.height)
.await
.expect("Failed to wait to block");
wait_for_block(&storage_reader, height).await.expect("Failed to wait to block");

let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn");
let transactions = txn
.get_block_transactions(proposal_init.height)
.get_block_transactions(height)
.expect("Get transactions from storage failed")
.unwrap_or_else(|| {
panic!(
"Block in {} was not found in storage despite waiting for it",
proposal_init.height
)
panic!("Block in {height} was not found in storage despite waiting for it")
});

let block_hash = txn
.get_block_header(proposal_init.height)
.get_block_header(height)
.expect("Get header from storage failed")
.unwrap_or_else(|| {
panic!(
"Block in {} was not found in storage despite waiting for it",
proposal_init.height
)
panic!("Block in {height} was not found in storage despite waiting for it")
})
.block_hash;

let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
let stream_id = proposal_init.height.0;
let stream_id = height.0;
proposal_sender_sender
.send((stream_id, proposal_receiver))
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(Self::ProposalPart::Init(proposal_init.clone()))
.send(Self::ProposalPart::Init(proposal_init))
.await
.expect("Failed to send proposal init");
proposal_sender
Expand All @@ -150,10 +143,7 @@ impl ConsensusContext for PapyrusConsensusContext {
let mut proposals = valid_proposals
.lock()
.expect("Lock on active proposals was poisoned due to a previous panic");
proposals
.entry(proposal_init.height)
.or_default()
.insert(block_hash, transactions);
proposals.entry(height).or_default().insert(block_hash, transactions);
}
// Done after inserting the proposal into the map to avoid race conditions between
// insertion and calls to `repropose`.
Expand All @@ -167,12 +157,11 @@ impl ConsensusContext for PapyrusConsensusContext {

async fn validate_proposal(
&mut self,
height: BlockNumber,
_round: Round,
_proposer: ValidatorId,
proposal_init: ProposalInit,
_timeout: Duration,
mut content: mpsc::Receiver<ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
let height = proposal_init.height;
let (fin_sender, fin_receiver) = oneshot::channel();

let storage_reader = self.storage_reader.clone();
Expand Down Expand Up @@ -251,18 +240,19 @@ impl ConsensusContext for PapyrusConsensusContext {
}

async fn repropose(&mut self, id: ProposalContentId, init: ProposalInit) {
let height = init.height;
let transactions = self
.valid_proposals
.lock()
.expect("valid_proposals lock was poisoned")
.get(&init.height)
.unwrap_or_else(|| panic!("No proposals found for height {}", init.height))
.get(&height)
.unwrap_or_else(|| panic!("No proposals found for height {height}"))
.get(&id)
.unwrap_or_else(|| panic!("No proposal found for height {} and id {}", init.height, id))
.unwrap_or_else(|| panic!("No proposal found for height {height} and id {id}"))
.clone();

let proposal = Proposal {
height: init.height.0,
height: height.0,
round: init.round,
proposer: init.proposer,
transactions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::{ConsensusContext, ValidatorId, DEFAULT_VALIDATOR_ID};
use papyrus_consensus::types::ConsensusContext;
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
Expand Down Expand Up @@ -36,12 +36,7 @@ const TEST_CHANNEL_SIZE: usize = 10;
async fn build_proposal() {
let (block, mut papyrus_context, _mock_network, _) = test_setup();
let block_number = block.header.block_header_without_hash.block_number;
let proposal_init = ProposalInit {
height: block_number,
round: 0,
proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID),
valid_round: None,
};
let proposal_init = ProposalInit { height: block_number, ..Default::default() };
// TODO(Asmaa): Test proposal content.
let fin_receiver = papyrus_context.build_proposal(proposal_init, Duration::MAX).await;

Expand All @@ -68,9 +63,7 @@ async fn validate_proposal_success() {

let fin = papyrus_context
.validate_proposal(
block_number,
0,
ValidatorId::from(DEFAULT_VALIDATOR_ID),
ProposalInit { height: block_number, ..Default::default() },
Duration::MAX,
validate_receiver,
)
Expand Down Expand Up @@ -99,9 +92,7 @@ async fn validate_proposal_fail() {

let fin = papyrus_context
.validate_proposal(
block_number,
0,
ValidatorId::from(DEFAULT_VALIDATOR_ID),
ProposalInit { height: block_number, ..Default::default() },
Duration::MAX,
validate_receiver,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl ConsensusContext for SequencerConsensusContext {
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(ProposalPart::Init(proposal_init.clone()))
.send(ProposalPart::Init(proposal_init))
.await
.expect("Failed to send proposal init");
tokio::spawn(
Expand All @@ -228,25 +228,28 @@ impl ConsensusContext for SequencerConsensusContext {
// That part is consumed by the caller, so it can know the height/round.
async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
validator: ValidatorId,
proposal_init: ProposalInit,
timeout: Duration,
content_receiver: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
assert_eq!(Some(height), self.current_height);
assert_eq!(Some(proposal_init.height), self.current_height);
let (fin_sender, fin_receiver) = oneshot::channel();
match round.cmp(&self.current_round) {
match proposal_init.round.cmp(&self.current_round) {
std::cmp::Ordering::Less => fin_receiver,
std::cmp::Ordering::Greater => {
self.queued_proposals
.insert(round, ((height, validator, timeout, content_receiver), fin_sender));
self.queued_proposals.insert(
proposal_init.round,
(
(proposal_init.height, proposal_init.proposer, timeout, content_receiver),
fin_sender,
),
);
fin_receiver
}
std::cmp::Ordering::Equal => {
self.validate_current_round_proposal(
height,
validator,
proposal_init.height,
proposal_init.proposer,
timeout,
content_receiver,
fin_sender,
Expand Down
Loading

0 comments on commit 093eb7d

Please sign in to comment.