Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sequencing): make repropose use the proper channel #2532

Open
wants to merge 5 commits into
base: guyn/streams/remove_consensus_message3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct Vote {
pub voter: ContractAddress,
}

// TODO: remove this once we are sure everything works using just Vote.
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum ConsensusMessage {
Proposal(Proposal), // To be deprecated
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_protobuf/src/converters/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::consensus::{
use crate::converters::ProtobufConversionError;
use crate::{auto_impl_into_and_try_from_vec_u8, protobuf};

// TODO(guyn): remove this once we integrate ProposalPart everywhere.
impl TryFrom<protobuf::Proposal> for Proposal {
type Error = ProtobufConversionError;

Expand Down
11 changes: 6 additions & 5 deletions crates/papyrus_protobuf/src/converters/consensus_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use starknet_api::transaction::{
};

use crate::consensus::{
ConsensusMessage,
Proposal,
ConsensusMessage, // TODO: remove this
Proposal, // TODO: remove this
ProposalFin,
ProposalInit,
ProposalPart,
Expand Down Expand Up @@ -51,11 +51,11 @@ fn add_gas_values_to_transaction(transactions: &mut [Transaction]) {
fn convert_stream_message_to_vec_u8_and_back() {
let mut rng = get_rng();

// Test that we can convert a StreamMessage with a ConsensusMessage message to bytes and back.
let mut stream_message: StreamMessage<ConsensusMessage> =
// Test that we can convert a StreamMessage with a ProposalPart message to bytes and back.
let mut stream_message: StreamMessage<ProposalPart> =
StreamMessage::get_test_instance(&mut rng);

if let StreamMessageBody::Content(ConsensusMessage::Proposal(proposal)) =
if let StreamMessageBody::Content(ProposalPart::Transactions(proposal)) =
&mut stream_message.message
{
add_gas_values_to_transaction(&mut proposal.transactions);
Expand All @@ -66,6 +66,7 @@ fn convert_stream_message_to_vec_u8_and_back() {
assert_eq!(stream_message, res_data);
}

// TODO(guyn): this can be removed once ConsensusMessage is taken out.
#[test]
fn convert_consensus_message_to_vec_u8_and_back() {
let mut rng = get_rng();
Expand Down
12 changes: 8 additions & 4 deletions crates/papyrus_protobuf/src/converters/test_instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use starknet_api::core::ContractAddress;
use starknet_api::transaction::{Transaction, TransactionHash};

use crate::consensus::{
ConsensusMessage,
Proposal,
ConsensusMessage, // TODO: remove this
Proposal, // TODO: remove this
ProposalFin,
ProposalInit,
ProposalPart,
Expand All @@ -18,6 +18,7 @@ use crate::consensus::{
};

auto_impl_get_test_instance! {
// TODO(guyn): remove this once we integrate ProposalPart everywhere.
pub enum ConsensusMessage {
Proposal(Proposal) = 0,
Vote(Vote) = 1,
Expand Down Expand Up @@ -64,10 +65,13 @@ auto_impl_get_test_instance! {

// The auto_impl_get_test_instance macro does not work for StreamMessage because it has
// a generic type. TODO(guyn): try to make the macro work with generic types.
impl GetTestInstance for StreamMessage<ConsensusMessage> {
impl GetTestInstance for StreamMessage<ProposalPart> {
fn get_test_instance(rng: &mut rand_chacha::ChaCha8Rng) -> Self {
let message = if rng.gen_bool(0.5) {
StreamMessageBody::Content(ConsensusMessage::Proposal(Proposal::get_test_instance(rng)))
StreamMessageBody::Content(ProposalPart::Transactions(TransactionBatch {
transactions: vec![Transaction::get_test_instance(rng)],
tx_hashes: vec![TransactionHash::get_test_instance(rng)],
}))
} else {
StreamMessageBody::Fin
};
Expand Down
85 changes: 13 additions & 72 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,26 @@ use std::sync::Arc;
use std::time::Duration;
use std::vec;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::SinkExt;
use lazy_static::lazy_static;
use mockall::mock;
use mockall::predicate::eq;
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
MockBroadcastedMessagesSender,
TestSubscriberChannels,
};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalFin,
ProposalInit,
ProposalPart,
Vote,
};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin};
use papyrus_test_utils::{get_rng, GetTestInstance};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_types_core::felt::Felt;
use tokio::sync::Notify;

use super::{run_consensus, MultiHeightManager, RunHeightRes};
use crate::config::TimeoutsConfig;
use crate::test_utils::{precommit, prevote, proposal_init};
use crate::types::{
ConsensusContext,
ConsensusError,
ProposalContentId,
Round,
ValidatorId,
DEFAULT_VALIDATOR_ID,
};
use crate::test_utils::{precommit, prevote, proposal_init, MockTestContext, TestProposalPart};
use crate::types::{ConsensusError, ValidatorId, DEFAULT_VALIDATOR_ID};

lazy_static! {
static ref PROPOSER_ID: ValidatorId = DEFAULT_VALIDATOR_ID.into();
Expand All @@ -52,59 +37,15 @@ lazy_static! {

const CHANNEL_SIZE: usize = 10;

mock! {
pub TestContext {}

#[async_trait]
impl ConsensusContext for TestContext {
type ProposalPart = ProposalPart;

async fn build_proposal(
&mut self,
init: ProposalInit,
timeout: Duration
) -> oneshot::Receiver<ProposalContentId>;

async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<ProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;

async fn repropose(
&mut self,
id: ProposalContentId,
init: ProposalInit,
);

async fn validators(&self, height: BlockNumber) -> Vec<ValidatorId>;

fn proposer(&self, height: BlockNumber, round: Round) -> ValidatorId;

async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError>;

async fn decision_reached(
&mut self,
block: ProposalContentId,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;

async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);
}
}

async fn send(sender: &mut MockBroadcastedMessagesSender<ConsensusMessage>, msg: ConsensusMessage) {
let broadcasted_message_metadata =
BroadcastedMessageMetadata::get_test_instance(&mut get_rng());
sender.send((msg, broadcasted_message_metadata)).await.unwrap();
}

async fn send_proposal(
proposal_receiver_sender: &mut mpsc::Sender<mpsc::Receiver<ProposalPart>>,
content: Vec<ProposalPart>,
proposal_receiver_sender: &mut mpsc::Sender<mpsc::Receiver<TestProposalPart>>,
content: Vec<TestProposalPart>,
) {
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
Expand Down Expand Up @@ -150,8 +91,8 @@ async fn manager_multiple_heights_unordered() {
send_proposal(
&mut proposal_receiver_sender,
vec![
ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID)),
ProposalPart::Fin(ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }),
TestProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID)),
TestProposalPart::Fin(ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }),
],
)
.await;
Expand All @@ -161,8 +102,8 @@ async fn manager_multiple_heights_unordered() {
send_proposal(
&mut proposal_receiver_sender,
vec![
ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)),
ProposalPart::Fin(ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }),
TestProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)),
TestProposalPart::Fin(ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }),
],
)
.await;
Expand Down Expand Up @@ -232,7 +173,7 @@ async fn run_consensus_sync() {
// Send messages for height 2.
send_proposal(
&mut proposal_receiver_sender,
vec![ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))],
vec![TestProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))],
)
.await;
let TestSubscriberChannels { mock_network, subscriber_channels } =
Expand Down Expand Up @@ -326,7 +267,7 @@ async fn run_consensus_sync_cancellation_safety() {
// Send a proposal for height 1.
send_proposal(
&mut proposal_receiver_sender,
vec![ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))],
vec![TestProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))],
)
.await;
proposal_handled.notified().await;
Expand Down Expand Up @@ -358,7 +299,7 @@ async fn test_timeouts() {

send_proposal(
&mut proposal_receiver_sender,
vec![ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))],
vec![TestProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await;
Expand Down Expand Up @@ -414,7 +355,7 @@ async fn test_timeouts() {
// reach a decision.
send_proposal(
&mut proposal_receiver_sender,
vec![ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))],
vec![TestProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *PROPOSER_ID)).await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use futures::{Stream, StreamExt};
use lru::LruCache;
use papyrus_network::network_manager::BroadcastTopicServer;
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::ConsensusMessage;
use papyrus_protobuf::consensus::Vote;
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::BlockHash;
use starknet_api::core::{ContractAddress, PatriciaKey};
use tracing::{debug, instrument};

Expand All @@ -26,15 +25,15 @@ use tracing::{debug, instrument};
/// opposed to actual drops or corruption.
/// - Tendermint is, to a large extent, unaffected by minor network reorderings. For instance it
/// doesn't matter if prevotes arrive before or after the Proposal they are for.
/// - This struct is therefore also designed not to be overly sensistive to message order. If
/// - This struct is therefore also designed not to be overly sensitive to message order. If
/// message A was dropped by this struct in one run, it should be dropped in the rerun. This
/// is as opposed to using a stateful RNG where the random number is a function of all the
/// previous calls to the RNG.
pub struct NetworkReceiver {
pub broadcasted_messages_receiver: BroadcastTopicServer<ConsensusMessage>,
pub broadcasted_messages_receiver: BroadcastTopicServer<Vote>,
// Cache is used so that repeat sends of a message can be processed differently. For example,
// if a message is dropped resending it should result in a new decision.
pub cache: LruCache<ConsensusMessage, u32>,
pub cache: LruCache<Vote, u32>,
pub seed: u64,
// Probability of dropping a message [0, 1].
pub drop_probability: f64,
Expand All @@ -44,7 +43,7 @@ pub struct NetworkReceiver {

impl NetworkReceiver {
pub fn new(
broadcasted_messages_receiver: BroadcastTopicServer<ConsensusMessage>,
broadcasted_messages_receiver: BroadcastTopicServer<Vote>,
cache_size: usize,
seed: u64,
drop_probability: f64,
Expand All @@ -61,13 +60,13 @@ impl NetworkReceiver {
}
}

/// Determine how to handle a message. If None then the message is silently droppeds. If some,
/// Determine how to handle a message. If None then the message is silently dropped. If some,
/// the returned message is what is sent to the consensus crate.
///
/// Applies `drop_probability` followed by `invalid_probability`. So the probability of an
/// invalid message is `(1- drop_probability) * invalid_probability`.
#[instrument(skip(self), level = "debug")]
pub fn filter_msg(&mut self, msg: ConsensusMessage) -> Option<ConsensusMessage> {
pub fn filter_msg(&mut self, msg: Vote) -> Option<Vote> {
let msg_hash = self.calculate_msg_hash(&msg);

if self.should_drop_msg(msg_hash) {
Expand All @@ -78,7 +77,7 @@ impl NetworkReceiver {
Some(self.maybe_invalidate_msg(msg, msg_hash))
}

fn calculate_msg_hash(&mut self, msg: &ConsensusMessage) -> u64 {
fn calculate_msg_hash(&mut self, msg: &Vote) -> u64 {
let count = if let Some(count) = self.cache.get_mut(msg) {
*count += 1;
*count
Expand All @@ -100,31 +99,20 @@ impl NetworkReceiver {
prob <= self.drop_probability
}

fn maybe_invalidate_msg(
&mut self,
mut msg: ConsensusMessage,
msg_hash: u64,
) -> ConsensusMessage {
fn maybe_invalidate_msg(&mut self, mut msg: Vote, msg_hash: u64) -> Vote {
#[allow(clippy::as_conversions)]
if (msg_hash as f64) / (u64::MAX as f64) > self.invalid_probability {
return msg;
}
debug!("Invalidating message");
// TODO(matan): Allow for invalid votes based on signature.
match msg {
ConsensusMessage::Proposal(ref mut proposal) => {
proposal.block_hash = BlockHash(proposal.block_hash.0 + 1);
}
ConsensusMessage::Vote(ref mut vote) => {
vote.voter = ContractAddress(PatriciaKey::from(msg_hash));
}
}
msg.voter = ContractAddress(PatriciaKey::from(msg_hash));
msg
}
}

impl Stream for NetworkReceiver {
type Item = (Result<ConsensusMessage, ProtobufConversionError>, BroadcastedMessageMetadata);
type Item = (Result<Vote, ProtobufConversionError>, BroadcastedMessageMetadata);

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
Loading
Loading