Skip to content

Commit

Permalink
feat: broadcast proposal as stream
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Nov 27, 2024
1 parent fd5c84b commit f54941b
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::time::Duration;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use futures::{SinkExt, StreamExt};
use papyrus_consensus::types::{
ConsensusContext,
ConsensusError,
Expand All @@ -23,7 +23,7 @@ use papyrus_consensus::types::{
ValidatorId,
};
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, ProposalPart, Vote};
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalFin, ProposalInit, ProposalPart, TransactionBatch, Vote};
use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader};
Expand All @@ -36,10 +36,12 @@ use tracing::{debug, debug_span, info, warn, Instrument};

type HeightToIdToContent = BTreeMap<BlockNumber, HashMap<ProposalContentId, Vec<Transaction>>>;

const CHANNEL_SIZE: usize = 100;

pub struct PapyrusConsensusContext {
storage_reader: StorageReader,
network_broadcast_client: BroadcastTopicClient<ConsensusMessage>,
_network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
validators: Vec<ValidatorId>,
sync_broadcast_sender: Option<BroadcastTopicClient<Vote>>,
// Proposal building/validating returns immediately, leaving the actual processing to a spawned
Expand All @@ -52,14 +54,14 @@ impl PapyrusConsensusContext {
pub fn new(
storage_reader: StorageReader,
network_broadcast_client: BroadcastTopicClient<ConsensusMessage>,
_network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
num_validators: u64,
sync_broadcast_sender: Option<BroadcastTopicClient<Vote>>,
) -> Self {
Self {
storage_reader,
network_broadcast_client,
_network_proposal_sender,
network_proposal_sender,
validators: (0..num_validators).map(ContractAddress::from).collect(),
sync_broadcast_sender,
valid_proposals: Arc::new(Mutex::new(BTreeMap::new())),
Expand All @@ -77,7 +79,7 @@ impl ConsensusContext for PapyrusConsensusContext {
proposal_init: ProposalInit,
_timeout: Duration,
) -> oneshot::Receiver<ProposalContentId> {
let mut network_broadcast_sender = self.network_broadcast_client.clone();
let mut proposal_sender_sender = self.network_proposal_sender.clone();
let (fin_sender, fin_receiver) = oneshot::channel();

let storage_reader = self.storage_reader.clone();
Expand Down Expand Up @@ -113,18 +115,27 @@ impl ConsensusContext for PapyrusConsensusContext {
})
.block_hash;

let proposal = Proposal {
height: proposal_init.height.0,
round: proposal_init.round,
proposer: proposal_init.proposer,
transactions: transactions.clone(),
block_hash,
valid_round: proposal_init.valid_round,
};
network_broadcast_sender
.broadcast_message(ConsensusMessage::Proposal(proposal))
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
let stream_id = proposal_init.height.0;
proposal_sender_sender
.send((stream_id, proposal_receiver))
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(Self::ProposalPart::Init(proposal_init.clone()))
.await
.expect("Failed to send proposal init");
proposal_sender
.send(ProposalPart::Transactions(TransactionBatch {
transactions: transactions.clone(),
tx_hashes: vec![],
}))
.await
.expect("Failed to send transactions");
proposal_sender
.send(ProposalPart::Fin(ProposalFin { proposal_content_id: block_hash }))
.await
.expect("Failed to send proposal");
.expect("Failed to send fin");
{
let mut proposals = valid_proposals
.lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use std::time::Duration;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use futures::{SinkExt, StreamExt};
use papyrus_consensus::types::{
ConsensusContext,
ConsensusError,
ProposalContentId,
Round,
ValidatorId,
};
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_network::network_manager::BroadcastTopicClient;
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalFin,
Expand Down Expand Up @@ -51,6 +51,8 @@ use tracing::{debug, debug_span, error, info, trace, warn, Instrument};
type HeightToIdToContent =
BTreeMap<BlockNumber, HashMap<ProposalContentId, (Vec<Transaction>, ProposalId)>>;

const CHANNEL_SIZE: usize = 100;

pub struct SequencerConsensusContext {
batcher: Arc<dyn BatcherClient>,
validators: Vec<ValidatorId>,
Expand All @@ -63,21 +65,21 @@ pub struct SequencerConsensusContext {
// restarting.
proposal_id: u64,
current_height: Option<BlockNumber>,
network_broadcast_client: BroadcastTopicClient<ProposalPart>,
_outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
_network_broadcast_client: BroadcastTopicClient<ProposalPart>,
outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
}

impl SequencerConsensusContext {
pub fn new(
batcher: Arc<dyn BatcherClient>,
network_broadcast_client: BroadcastTopicClient<ProposalPart>,
_outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
_network_broadcast_client: BroadcastTopicClient<ProposalPart>,
outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
num_validators: u64,
) -> Self {
Self {
batcher,
network_broadcast_client,
_outbound_proposal_sender,
_network_broadcast_client,
outbound_proposal_sender,
validators: (0..num_validators).map(ValidatorId::from).collect(),
valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())),
proposal_id: 0,
Expand Down Expand Up @@ -130,19 +132,24 @@ impl ConsensusContext for SequencerConsensusContext {
.await
.expect("Failed to initiate proposal build");
debug!("Broadcasting proposal init: {proposal_init:?}");
self.network_broadcast_client
.broadcast_message(ProposalPart::Init(proposal_init.clone()))
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
let stream_id = proposal_init.height.0;
self.outbound_proposal_sender
.send((stream_id, proposal_receiver))
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(ProposalPart::Init(proposal_init.clone()))
.await
.expect("Failed to broadcast proposal init");
let broadcast_client = self.network_broadcast_client.clone();
.expect("Failed to send proposal init");
tokio::spawn(
async move {
stream_build_proposal(
proposal_init.height,
proposal_id,
batcher,
valid_proposals,
broadcast_client,
proposal_sender,
fin_sender,
)
.await;
Expand Down Expand Up @@ -270,16 +277,16 @@ impl SequencerConsensusContext {

// Handles building a new proposal without blocking consensus:
// 1. Receive chunks of content from the batcher.
// 2. Forward these to consensus to be streamed out to the network.
// 2. Forward these to the stream handler to be streamed out to the network.
// 3. Once finished, receive the commitment from the batcher.
// 4. Store the proposal for re-proposal.
// 5. Send the commitment to consensus.
// 5. Send the commitment to the stream handler (to send fin).
async fn stream_build_proposal(
height: BlockNumber,
proposal_id: ProposalId,
batcher: Arc<dyn BatcherClient>,
valid_proposals: Arc<Mutex<HeightToIdToContent>>,
mut broadcast_client: BroadcastTopicClient<ProposalPart>,
mut proposal_sender: mpsc::Sender<ProposalPart>,
fin_sender: oneshot::Sender<ProposalContentId>,
) {
let mut content = Vec::new();
Expand All @@ -306,8 +313,8 @@ async fn stream_build_proposal(
}
debug!("Broadcasting proposal content: {transaction_hashes:?}");
trace!("Broadcasting proposal content: {transactions:?}");
broadcast_client
.broadcast_message(ProposalPart::Transactions(TransactionBatch {
proposal_sender
.send(ProposalPart::Transactions(TransactionBatch {
transactions,
tx_hashes: transaction_hashes,
}))
Expand All @@ -325,8 +332,8 @@ async fn stream_build_proposal(
height
);
debug!("Broadcasting proposal fin: {proposal_content_id:?}");
broadcast_client
.broadcast_message(ProposalPart::Fin(ProposalFin { proposal_content_id }))
proposal_sender
.send(ProposalPart::Fin(ProposalFin { proposal_content_id }))
.await
.expect("Failed to broadcast proposal fin");
// Update valid_proposals before sending fin to avoid a race condition
Expand All @@ -336,6 +343,10 @@ async fn stream_build_proposal(
.entry(height)
.or_default()
.insert(proposal_content_id, (content, proposal_id));
// proposal_sender
// .send(ProposalPart::Fin(ProposalFin { proposal_content_id }))
// .await
// .expect("Failed to broadcast proposal fin");
if fin_sender.send(proposal_content_id).is_err() {
// Consensus may exit early (e.g. sync).
warn!("Failed to send proposal content id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ async fn validate_proposal_success() {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
} = subscriber_channels;
<<<<<<< HEAD
let (outbound_internal_sender, _inbound_internal_receiver, _) =
=======
let (outbound_internal_sender, _inbound_internal_receiver) =
>>>>>>> 883a253be (feat: allow a streamed proposal channel on top of existing one)
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);

let mut context = SequencerConsensusContext::new(
Expand Down

0 comments on commit f54941b

Please sign in to comment.