Skip to content

Commit

Permalink
chore(sequencing): remove ConsensusMessage from simulation network
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 17, 2024
1 parent 287dd4d commit 43790e2
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use futures::{Stream, StreamExt};
use lru::LruCache;
use papyrus_network::network_manager::BroadcastTopicServer;
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::ConsensusMessage;
use papyrus_protobuf::consensus::Vote;
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::BlockHash;
use starknet_api::core::{ContractAddress, PatriciaKey};
use tracing::{debug, instrument};

Expand All @@ -26,15 +25,15 @@ use tracing::{debug, instrument};
/// opposed to actual drops or corruption.
/// - Tendermint is, to a large extent, unaffected by minor network reorderings. For instance it
/// doesn't matter if prevotes arrive before or after the Proposal they are for.
/// - This struct is therefore also designed not to be overly sensistive to message order. If
/// - This struct is therefore also designed not to be overly sensitive to message order. If
/// message A was dropped by this struct in one run, it should be dropped in the rerun. This
/// is as opposed to using a stateful RNG where the random number is a function of all the
/// previous calls to the RNG.
pub struct NetworkReceiver {
pub broadcasted_messages_receiver: BroadcastTopicServer<ConsensusMessage>,
pub broadcasted_messages_receiver: BroadcastTopicServer<Vote>,
// Cache is used so that repeat sends of a message can be processed differently. For example,
// if a message is dropped resending it should result in a new decision.
pub cache: LruCache<ConsensusMessage, u32>,
pub cache: LruCache<Vote, u32>,
pub seed: u64,
// Probability of dropping a message [0, 1].
pub drop_probability: f64,
Expand All @@ -44,7 +43,7 @@ pub struct NetworkReceiver {

impl NetworkReceiver {
pub fn new(
broadcasted_messages_receiver: BroadcastTopicServer<ConsensusMessage>,
broadcasted_messages_receiver: BroadcastTopicServer<Vote>,
cache_size: usize,
seed: u64,
drop_probability: f64,
Expand All @@ -61,13 +60,13 @@ impl NetworkReceiver {
}
}

/// Determine how to handle a message. If None then the message is silently droppeds. If some,
/// Determine how to handle a message. If None then the message is silently dropped. If some,
/// the returned message is what is sent to the consensus crate.
///
/// Applies `drop_probability` followed by `invalid_probability`. So the probability of an
/// invalid message is `(1- drop_probability) * invalid_probability`.
#[instrument(skip(self), level = "debug")]
pub fn filter_msg(&mut self, msg: ConsensusMessage) -> Option<ConsensusMessage> {
pub fn filter_msg(&mut self, msg: Vote) -> Option<Vote> {
let msg_hash = self.calculate_msg_hash(&msg);

if self.should_drop_msg(msg_hash) {
Expand All @@ -78,7 +77,7 @@ impl NetworkReceiver {
Some(self.maybe_invalidate_msg(msg, msg_hash))
}

fn calculate_msg_hash(&mut self, msg: &ConsensusMessage) -> u64 {
fn calculate_msg_hash(&mut self, msg: &Vote) -> u64 {
let count = if let Some(count) = self.cache.get_mut(msg) {
*count += 1;
*count
Expand All @@ -100,31 +99,20 @@ impl NetworkReceiver {
prob <= self.drop_probability
}

fn maybe_invalidate_msg(
&mut self,
mut msg: ConsensusMessage,
msg_hash: u64,
) -> ConsensusMessage {
fn maybe_invalidate_msg(&mut self, mut msg: Vote, msg_hash: u64) -> Vote {
#[allow(clippy::as_conversions)]
if (msg_hash as f64) / (u64::MAX as f64) > self.invalid_probability {
return msg;
}
debug!("Invalidating message");
// TODO(matan): Allow for invalid votes based on signature.
match msg {
ConsensusMessage::Proposal(ref mut proposal) => {
proposal.block_hash = BlockHash(proposal.block_hash.0 + 1);
}
ConsensusMessage::Vote(ref mut vote) => {
vote.voter = ContractAddress(PatriciaKey::from(msg_hash));
}
}
msg.voter = ContractAddress(PatriciaKey::from(msg_hash));
msg
}
}

impl Stream for NetworkReceiver {
type Item = (Result<ConsensusMessage, ProtobufConversionError>, BroadcastedMessageMetadata);
type Item = (Result<Vote, ProtobufConversionError>, BroadcastedMessageMetadata);

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use papyrus_network::network_manager::test_utils::{
TestSubscriberChannels,
};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::ConsensusMessage;
use papyrus_protobuf::consensus::Vote;
use papyrus_test_utils::{get_rng, GetTestInstance};
use test_case::test_case;

Expand All @@ -15,12 +15,10 @@ const SEED: u64 = 123;
const DROP_PROBABILITY: f64 = 0.5;
const INVALID_PROBABILITY: f64 = 0.5;

#[test_case(true, true; "distinct_vote")]
#[test_case(false, true; "repeat_vote")]
#[test_case(true, false; "distinct_proposal")]
#[test_case(false, false; "repeat_proposal")]
#[test_case(true; "distinct_vote")]
#[test_case(false; "repeat_vote")]
#[tokio::test]
async fn test_invalid(distinct_messages: bool, is_vote: bool) {
async fn test_invalid(distinct_messages: bool) {
let TestSubscriberChannels { subscriber_channels, mut mock_network } =
mock_register_broadcast_topic().unwrap();
let mut receiver = NetworkReceiver::new(
Expand All @@ -33,7 +31,7 @@ async fn test_invalid(distinct_messages: bool, is_vote: bool) {
let mut invalid_messages = 0;

for height in 0..1000 {
let msg = create_consensus_msg(if distinct_messages { height } else { 0 }, is_vote);
let msg = create_vote_message(if distinct_messages { height } else { 0 });
let broadcasted_message_metadata =
BroadcastedMessageMetadata::get_test_instance(&mut get_rng());
mock_network
Expand All @@ -48,12 +46,10 @@ async fn test_invalid(distinct_messages: bool, is_vote: bool) {
assert!((400..=600).contains(&invalid_messages), "num_invalid={invalid_messages}");
}

#[test_case(true, true; "distinct_vote")]
#[test_case(false, true; "repeat_vote")]
#[test_case(true, false; "distinct_proposal")]
#[test_case(false, false; "repeat_proposal")]
#[test_case(true; "distinct_vote")]
#[test_case(false; "repeat_vote")]
#[tokio::test]
async fn test_drops(distinct_messages: bool, is_vote: bool) {
async fn test_drops(distinct_messages: bool) {
let TestSubscriberChannels { subscriber_channels, mut mock_network } =
mock_register_broadcast_topic().unwrap();
let mut receiver = NetworkReceiver::new(
Expand All @@ -66,7 +62,7 @@ async fn test_drops(distinct_messages: bool, is_vote: bool) {
let mut num_received = 0;

for height in 0..1000 {
let msg = create_consensus_msg(if distinct_messages { height } else { 0 }, is_vote);
let msg = create_vote_message(if distinct_messages { height } else { 0 });
let broadcasted_message_metadata =
BroadcastedMessageMetadata::get_test_instance(&mut get_rng());
mock_network
Expand All @@ -83,13 +79,6 @@ async fn test_drops(distinct_messages: bool, is_vote: bool) {
assert!((400..=600).contains(&num_received), "num_received={num_received}");
}

fn create_consensus_msg(height: u64, is_vote: bool) -> ConsensusMessage {
if is_vote {
ConsensusMessage::Vote(papyrus_protobuf::consensus::Vote { height, ..Default::default() })
} else {
ConsensusMessage::Proposal(papyrus_protobuf::consensus::Proposal {
height,
..Default::default()
})
}
fn create_vote_message(height: u64) -> Vote {
papyrus_protobuf::consensus::Vote { height, ..Default::default() }
}

0 comments on commit 43790e2

Please sign in to comment.