Skip to content

Commit

Permalink
feat(mempool_infra): make ClientError cloneable (#173)
Browse files Browse the repository at this point in the history
  • Loading branch information
yair-starkware authored and matan-starkware committed Jul 31, 2024
1 parent 6870856 commit d166e52
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ type ResponsesReceiverForNetwork = GenericReceiver<Option<Bytes>>;
type ResponsesSender<Response> =
GenericSender<Result<Response, <Response as TryFrom<Bytes>>::Error>>;

type ReportSender = oneshot::Sender<()>;
pub type ReportSender = oneshot::Sender<()>;
type ReportReceiver = oneshot::Receiver<()>;

pub struct SqmrClientPayload<Query, Response: TryFrom<Bytes>> {
Expand Down
8 changes: 4 additions & 4 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use starknet_api::block::BlockHash;
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Default, Hash, Clone, Eq, PartialEq)]
pub struct Proposal {
pub height: u64,
pub round: u32,
Expand All @@ -11,13 +11,13 @@ pub struct Proposal {
pub block_hash: BlockHash,
}

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Hash, Clone, Eq, PartialEq)]
pub enum VoteType {
Prevote,
Precommit,
}

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Hash, Clone, Eq, PartialEq)]
pub struct Vote {
pub vote_type: VoteType,
pub height: u64,
Expand All @@ -26,7 +26,7 @@ pub struct Vote {
pub voter: ContractAddress,
}

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum ConsensusMessage {
Proposal(Proposal),
Vote(Vote),
Expand Down
1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tokio = { workspace = true, features = ["full"] }
tracing.workspace = true

[dev-dependencies]
lru.workspace = true
mockall.workspace = true
papyrus_network = { path = "../../papyrus_network", version = "0.4.0-dev.2", features = [
"testing",
Expand Down
2 changes: 2 additions & 0 deletions crates/sequencing/papyrus_consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub mod single_height_consensus;
#[allow(missing_docs)]
pub mod state_machine;
#[cfg(test)]
pub(crate) mod test_network_receiver;
#[cfg(test)]
pub(crate) mod test_utils;
#[allow(missing_docs)]
pub mod types;
Expand Down
202 changes: 202 additions & 0 deletions crates/sequencing/papyrus_consensus/src/test_network_receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;
use std::task::Poll;

use futures::{Stream, StreamExt};
use lru::LruCache;
use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::ConsensusMessage;
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::BlockHash;

/// Receiver used to help run simulations of consensus. It has 2 goals in mind:
/// 1. Simulate network failures.
/// 2. Make tests repeatable - This is challenging because simulations involve a noisy environment;
/// so the actual network issues experienced may differ between 2 test runs.
/// - We expect simulations to use fairly reliable networks. That means messages arriving in
/// different order between runs will make up most of the actual noise between runs, as
/// opposed to actual drops or corruption.
/// - Tendermint is, to a large extent, unaffected by minor network reorderings. For instance it
/// doesn't matter if prevotes arrive before or after the Proposal they are for.
/// - This struct is therefore also designed not to be overly sensistive to message order. If
/// message A was dropped by this struct in one run, it should be dropped in the rerun. This
/// is as opposed to using a stateful RNG where the random number is a function of all the
/// previous calls to the RNG.
pub struct NetworkReceiver<ReceiverT> {
pub receiver: ReceiverT,
// Cache is used so that repeat sends of a message can be processed differently. For example,
// if a message is dropped resending it should result in a new decision.
pub cache: LruCache<ConsensusMessage, u32>,
pub seed: u64,
// Probability of dropping a message [0, 1].
pub drop_probability: f64,
// Probability of making a message invalid [0, 1].
pub invalid_probability: f64,
}

impl<ReceiverT> NetworkReceiver<ReceiverT>
where
ReceiverT: Stream<Item = (Result<ConsensusMessage, ProtobufConversionError>, ReportSender)>,
{
pub fn new(
receiver: ReceiverT,
cache_size: usize,
seed: u64,
drop_probability: f64,
invalid_probability: f64,
) -> Self {
assert!(0.0 <= drop_probability && drop_probability <= 1.0);
assert!(0.0 <= invalid_probability && invalid_probability <= 1.0);
Self {
receiver,
cache: LruCache::new(NonZeroUsize::new(cache_size).unwrap()),
seed,
drop_probability,
invalid_probability,
}
}

/// Determine how to handle a message. If None then the message is silently droppeds. If some,
/// the returned message is what is sent to the consensus crate.
///
/// Applies `drop_probability` followed by `invalid_probability`. So the probability of an
/// invalid message is `(1- drop_probability) * invalid_probability`.
pub fn filter_msg(&mut self, mut msg: ConsensusMessage) -> Option<ConsensusMessage> {
if !matches!(msg, ConsensusMessage::Proposal(_)) {
// TODO(matan): Add support for dropping/invalidating votes.
return Some(msg);
}

if self.should_drop_msg(&msg) {
return None;
}

if self.should_invalidate_msg(&msg) {
self.invalidate_msg(&mut msg);
}
Some(msg)
}

fn calculate_msg_hash(&mut self, msg: &ConsensusMessage) -> u64 {
let count = if let Some(count) = self.cache.get_mut(msg) {
*count += 1;
*count
} else {
self.cache.put(msg.clone(), 1);
1
};

let mut hasher = DefaultHasher::new();
msg.hash(&mut hasher);
self.seed.hash(&mut hasher);
count.hash(&mut hasher);
hasher.finish()
}

fn should_drop_msg(&mut self, msg: &ConsensusMessage) -> bool {
let prob = (self.calculate_msg_hash(&msg) as f64) / (u64::MAX as f64);
prob <= self.drop_probability
}

fn should_invalidate_msg(&mut self, msg: &ConsensusMessage) -> bool {
let prob = (self.calculate_msg_hash(&msg) as f64) / (u64::MAX as f64);
prob <= self.invalid_probability
}

fn invalidate_msg(&mut self, msg: &mut ConsensusMessage) {
match msg {
ConsensusMessage::Proposal(ref mut proposal) => {
proposal.block_hash = BlockHash(proposal.block_hash.0 + 1);
}
// TODO(matan): Allow for invalid votes based on signatures.
_ => {}
}
}
}

impl<ReceiverT> Stream for NetworkReceiver<ReceiverT>
where
ReceiverT:
Stream<Item = (Result<ConsensusMessage, ProtobufConversionError>, ReportSender)> + Unpin,
{
type Item = (Result<ConsensusMessage, ProtobufConversionError>, ReportSender);

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
let item = self.receiver.poll_next_unpin(cx);
let (msg, report_sender) = match item {
Poll::Ready(Some((Ok(msg), report_sender))) => (msg, report_sender),
_ => return item,
};
if let Some(msg) = self.filter_msg(msg) {
return Poll::Ready(Some((Ok(msg), report_sender)));
}
}
}
}

// Tests for the NetworkReceiver, which is itself a test utility.
mod test {

use futures::{SinkExt, StreamExt};
use papyrus_protobuf::consensus::ConsensusMessage;
use test_case::test_case;

use super::NetworkReceiver;

const CACHE_SIZE: usize = 10;
const SEED: u64 = 123;
const DROP_PROBABILITY: f64 = 0.5;
const INVALID_PROBABILITY: f64 = 0.5;

#[test_case(true; "distinct_messages")]
#[test_case(false; "repeat_messages")]
#[tokio::test]
async fn test_invalid(distinct_messages: bool) {
let (mut sender, receiver) = futures::channel::mpsc::unbounded();
let mut receiver =
NetworkReceiver::new(receiver, CACHE_SIZE, SEED, 0.0, INVALID_PROBABILITY);
let mut invalid_messages = 0;
for height in 0..100 {
let mut proposal = papyrus_protobuf::consensus::Proposal::default();
if distinct_messages {
proposal.height = height;
}
let report_sender = futures::channel::oneshot::channel().0;
let msg = ConsensusMessage::Proposal(proposal.clone());
sender.send((Ok(msg.clone()), report_sender)).await.unwrap();
if receiver.next().await.unwrap().0.unwrap() != msg {
invalid_messages += 1;
}
}
assert!(30 <= invalid_messages && invalid_messages <= 70, "num_invalid={invalid_messages}");
}

#[test_case(true; "distinct_messages")]
#[test_case(false; "repeat_messages")]
#[tokio::test]
async fn test_drops(distinct_messages: bool) {
let (mut sender, receiver) = futures::channel::mpsc::unbounded();
let mut receiver = NetworkReceiver::new(receiver, CACHE_SIZE, SEED, DROP_PROBABILITY, 0.0);
let mut num_received = 0;
for height in 0..100 {
let mut proposal = papyrus_protobuf::consensus::Proposal::default();
if distinct_messages {
proposal.height = height;
}
let report_sender = futures::channel::oneshot::channel().0;
let msg = ConsensusMessage::Proposal(proposal.clone());
sender.send((Ok(msg.clone()), report_sender)).await.unwrap();
}
drop(sender);

while receiver.next().await.is_some() {
num_received += 1;
}
assert!(30 <= num_received && num_received <= 70, "num_received={num_received}");
}
}

0 comments on commit d166e52

Please sign in to comment.