Skip to content

Commit

Permalink
refactor(consensus): consensus takes a generic network receiver
Browse files Browse the repository at this point in the history
This will enable mocking for unit tests and replacing with a fake for simulations.
  • Loading branch information
matan-starkware committed Jul 25, 2024
1 parent d1ebb90 commit eec4b9c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
2 changes: 1 addition & 1 deletion crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ type ResponsesSenderForNetwork = GenericSender<Bytes>;
type ResponsesSender<Response> =
GenericSender<Result<Response, <Response as TryFrom<Bytes>>::Error>>;

type ReportSender = oneshot::Sender<()>;
pub type ReportSender = oneshot::Sender<()>;
type ReportReceiver = oneshot::Receiver<()>;

pub struct SqmrClientPayload<Query, Response: TryFrom<Bytes>> {
Expand Down
20 changes: 15 additions & 5 deletions crates/sequencing/papyrus_consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::Stream;
use papyrus_common::metrics as papyrus_metrics;
use papyrus_network::network_manager::BroadcastSubscriberReceiver;
use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal};
use papyrus_protobuf::converters::ProtobufConversionError;
use single_height_consensus::SingleHeightConsensus;
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::{debug, info, instrument};
Expand Down Expand Up @@ -37,14 +39,18 @@ use futures::StreamExt;

#[instrument(skip(context, validator_id, network_receiver, cached_messages), level = "info")]
#[allow(missing_docs)]
async fn run_height<BlockT: ConsensusBlock, ContextT: ConsensusContext<Block = BlockT>>(
async fn run_height<BlockT, ContextT, NetworkReceiverT>(
context: &mut ContextT,
height: BlockNumber,
validator_id: ValidatorId,
network_receiver: &mut BroadcastSubscriberReceiver<ConsensusMessage>,
network_receiver: &mut NetworkReceiverT,
cached_messages: &mut Vec<ConsensusMessage>,
) -> Result<Decision<BlockT>, ConsensusError>
where
BlockT: ConsensusBlock,
ContextT: ConsensusContext<Block = BlockT>,
NetworkReceiverT:
Stream<Item = (Result<ConsensusMessage, ProtobufConversionError>, ReportSender)> + Unpin,
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<BlockT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
Expand Down Expand Up @@ -104,14 +110,18 @@ where
// TODO(dvir): add test for this.
#[instrument(skip(context, start_height, network_receiver), level = "info")]
#[allow(missing_docs)]
pub async fn run_consensus<BlockT: ConsensusBlock, ContextT: ConsensusContext<Block = BlockT>>(
pub async fn run_consensus<BlockT, ContextT, NetworkReceiverT>(
mut context: ContextT,
start_height: BlockNumber,
validator_id: ValidatorId,
consensus_delay: Duration,
mut network_receiver: BroadcastSubscriberReceiver<ConsensusMessage>,
mut network_receiver: NetworkReceiverT,
) -> Result<(), ConsensusError>
where
BlockT: ConsensusBlock,
ContextT: ConsensusContext<Block = BlockT>,
NetworkReceiverT:
Stream<Item = (Result<ConsensusMessage, ProtobufConversionError>, ReportSender)> + Unpin,
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<BlockT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
Expand Down

0 comments on commit eec4b9c

Please sign in to comment.