Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sequencing): broadcast proposal in a stream #2286

Merged
merged 5 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::time::Duration;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use futures::{SinkExt, StreamExt};
use papyrus_consensus::types::{
ConsensusContext,
ConsensusError,
Expand All @@ -23,7 +23,15 @@ use papyrus_consensus::types::{
ValidatorId,
};
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, ProposalPart, Vote};
use papyrus_protobuf::consensus::{
ConsensusMessage,
Proposal,
ProposalFin,
ProposalInit,
ProposalPart,
TransactionBatch,
Vote,
};
use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader};
Expand All @@ -36,10 +44,12 @@ use tracing::{debug, debug_span, info, warn, Instrument};

type HeightToIdToContent = BTreeMap<BlockNumber, HashMap<ProposalContentId, Vec<Transaction>>>;

const CHANNEL_SIZE: usize = 100;

pub struct PapyrusConsensusContext {
storage_reader: StorageReader,
network_broadcast_client: BroadcastTopicClient<ConsensusMessage>,
_network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
validators: Vec<ValidatorId>,
sync_broadcast_sender: Option<BroadcastTopicClient<Vote>>,
// Proposal building/validating returns immediately, leaving the actual processing to a spawned
Expand All @@ -52,14 +62,14 @@ impl PapyrusConsensusContext {
pub fn new(
storage_reader: StorageReader,
network_broadcast_client: BroadcastTopicClient<ConsensusMessage>,
_network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
num_validators: u64,
sync_broadcast_sender: Option<BroadcastTopicClient<Vote>>,
) -> Self {
Self {
storage_reader,
network_broadcast_client,
_network_proposal_sender,
network_proposal_sender,
validators: (0..num_validators).map(ContractAddress::from).collect(),
sync_broadcast_sender,
valid_proposals: Arc::new(Mutex::new(BTreeMap::new())),
Expand All @@ -77,7 +87,7 @@ impl ConsensusContext for PapyrusConsensusContext {
proposal_init: ProposalInit,
_timeout: Duration,
) -> oneshot::Receiver<ProposalContentId> {
let mut network_broadcast_sender = self.network_broadcast_client.clone();
let mut proposal_sender_sender = self.network_proposal_sender.clone();
let (fin_sender, fin_receiver) = oneshot::channel();

let storage_reader = self.storage_reader.clone();
Expand Down Expand Up @@ -113,18 +123,27 @@ impl ConsensusContext for PapyrusConsensusContext {
})
.block_hash;

let proposal = Proposal {
height: proposal_init.height.0,
round: proposal_init.round,
proposer: proposal_init.proposer,
transactions: transactions.clone(),
block_hash,
valid_round: proposal_init.valid_round,
};
network_broadcast_sender
.broadcast_message(ConsensusMessage::Proposal(proposal))
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
let stream_id = proposal_init.height.0;
proposal_sender_sender
.send((stream_id, proposal_receiver))
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(Self::ProposalPart::Init(proposal_init.clone()))
.await
.expect("Failed to send proposal init");
proposal_sender
.send(ProposalPart::Transactions(TransactionBatch {
transactions: transactions.clone(),
tx_hashes: vec![],
}))
.await
.expect("Failed to send transactions");
proposal_sender
.send(ProposalPart::Fin(ProposalFin { proposal_content_id: block_hash }))
.await
.expect("Failed to send proposal");
.expect("Failed to send fin");
{
let mut proposals = valid_proposals
.lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use std::time::Duration;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use futures::{SinkExt, StreamExt};
use papyrus_consensus::types::{
ConsensusContext,
ConsensusError,
ProposalContentId,
Round,
ValidatorId,
};
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_network::network_manager::BroadcastTopicClient;
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalFin,
Expand Down Expand Up @@ -54,6 +54,8 @@ type HeightToIdToContent =
BTreeMap<BlockNumber, HashMap<ProposalContentId, (Vec<Transaction>, ProposalId)>>;
type ValidationParams = (BlockNumber, Duration, mpsc::Receiver<Vec<Transaction>>);

const CHANNEL_SIZE: usize = 100;

pub struct SequencerConsensusContext {
batcher: Arc<dyn BatcherClient>,
validators: Vec<ValidatorId>,
Expand All @@ -69,28 +71,28 @@ pub struct SequencerConsensusContext {
current_round: Round,
// Used to broadcast proposals to other consensus nodes.
// TODO(Guy) switch to the actual streaming struct.
proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
_proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
// The active proposal refers to the proposal being validated at the current height/round.
// Building proposals are not tracked as active, as consensus can't move on to the next
// height/round until building is done. Context only works on proposals for the
// current round.
active_proposal: Option<(Arc<Notify>, JoinHandle<()>)>,
// Stores proposals for future rounds until the round is reached.
queued_proposals: BTreeMap<Round, (ValidationParams, oneshot::Sender<ProposalContentId>)>,
_outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
}

impl SequencerConsensusContext {
pub fn new(
batcher: Arc<dyn BatcherClient>,
proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
_outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
_proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
num_validators: u64,
) -> Self {
Self {
batcher,
proposal_streaming_client,
_outbound_proposal_sender,
_proposal_streaming_client,
outbound_proposal_sender,
validators: (0..num_validators).map(ValidatorId::from).collect(),
valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())),
proposal_id: 0,
Expand Down Expand Up @@ -147,19 +149,24 @@ impl ConsensusContext for SequencerConsensusContext {
.await
.expect("Failed to initiate proposal build");
debug!("Broadcasting proposal init: {proposal_init:?}");
self.proposal_streaming_client
.broadcast_message(ProposalPart::Init(proposal_init.clone()))
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
let stream_id = proposal_init.height.0;
self.outbound_proposal_sender
.send((stream_id, proposal_receiver))
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(ProposalPart::Init(proposal_init.clone()))
.await
.expect("Failed to broadcast proposal init");
let broadcast_client = self.proposal_streaming_client.clone();
.expect("Failed to send proposal init");
tokio::spawn(
async move {
stream_build_proposal(
proposal_init.height,
proposal_id,
batcher,
valid_proposals,
broadcast_client,
proposal_sender,
fin_sender,
)
.await;
Expand Down Expand Up @@ -352,16 +359,16 @@ impl SequencerConsensusContext {

// Handles building a new proposal without blocking consensus:
// 1. Receive chunks of content from the batcher.
// 2. Forward these to consensus to be streamed out to the network.
// 2. Forward these to the stream handler to be streamed out to the network.
// 3. Once finished, receive the commitment from the batcher.
// 4. Store the proposal for re-proposal.
// 5. Send the commitment to consensus.
// 5. Send the commitment to the stream handler (to send fin).
async fn stream_build_proposal(
height: BlockNumber,
proposal_id: ProposalId,
batcher: Arc<dyn BatcherClient>,
valid_proposals: Arc<Mutex<HeightToIdToContent>>,
mut broadcast_client: BroadcastTopicClient<ProposalPart>,
mut proposal_sender: mpsc::Sender<ProposalPart>,
fin_sender: oneshot::Sender<ProposalContentId>,
) {
let mut content = Vec::new();
Expand All @@ -388,8 +395,8 @@ async fn stream_build_proposal(
}
debug!("Broadcasting proposal content: {transaction_hashes:?}");
trace!("Broadcasting proposal content: {transactions:?}");
broadcast_client
.broadcast_message(ProposalPart::Transactions(TransactionBatch {
proposal_sender
.send(ProposalPart::Transactions(TransactionBatch {
transactions,
tx_hashes: transaction_hashes,
}))
Expand All @@ -407,8 +414,8 @@ async fn stream_build_proposal(
height
);
debug!("Broadcasting proposal fin: {proposal_content_id:?}");
broadcast_client
.broadcast_message(ProposalPart::Fin(ProposalFin { proposal_content_id }))
proposal_sender
.send(ProposalPart::Fin(ProposalFin { proposal_content_id }))
.await
.expect("Failed to broadcast proposal fin");
// Update valid_proposals before sending fin to avoid a race condition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::ConsensusContext;
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
TestSubscriberChannels,
};
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::{ProposalInit, ProposalPart};
use papyrus_protobuf::consensus::{ProposalInit, ProposalPart, StreamMessage};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::{ContractAddress, StateDiffCommitment};
use starknet_api::executable_transaction::{AccountTransaction, Transaction};
Expand Down Expand Up @@ -52,18 +53,20 @@ fn generate_invoke_tx(tx_hash: Felt) -> Transaction {
})))
}

fn make_streaming_channels()
-> (mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>, mpsc::Receiver<mpsc::Receiver<ProposalPart>>)
{
let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } =
fn make_streaming_channels() -> (
mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
mpsc::Receiver<mpsc::Receiver<ProposalPart>>,
BroadcastNetworkMock<StreamMessage<ProposalPart>>,
) {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
} = subscriber_channels;
let (outbound_internal_sender, inbound_internal_receiver, _) =
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);
(outbound_internal_sender, inbound_internal_receiver)
(outbound_internal_sender, inbound_internal_receiver, mock_network)
}

#[tokio::test]
Expand Down Expand Up @@ -99,7 +102,8 @@ async fn build_proposal() {
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels();
let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
Expand Down Expand Up @@ -160,7 +164,8 @@ async fn validate_proposal_success() {
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels();
let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
Expand Down Expand Up @@ -210,7 +215,8 @@ async fn repropose() {
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels();
let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
Expand Down Expand Up @@ -280,7 +286,8 @@ async fn proposals_from_different_rounds() {
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels();
let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
Expand Down Expand Up @@ -363,7 +370,8 @@ async fn interrupt_active_proposal() {
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels();
let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
Expand Down
2 changes: 0 additions & 2 deletions crates/starknet_api/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,10 @@ impl From<(Transaction, TransactionHash)> for crate::executable_transaction::Tra
#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
pub struct TransactionOptions {
/// Transaction that shouldn't be broadcasted to StarkNet. For example, users that want to
/// test the execution result of a transaction without the risk of it being rebroadcasted (the
/// signature will be different while the execution remain the same). Using this flag will
/// modify the transaction version by setting the 128-th bit to 1.
pub only_query: bool,
}

macro_rules! implement_v3_tx_getters {
($(($field:ident, $field_type:ty)),*) => {
$(pub fn $field(&self) -> $field_type {
Expand Down
4 changes: 2 additions & 2 deletions crates/starknet_integration_tests/src/flow_test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::net::SocketAddr;

use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_gateway_types::errors::GatewaySpecError;
Expand Down Expand Up @@ -33,7 +33,7 @@ pub struct FlowTestSetup {
pub sequencer_node_handle: JoinHandle<Result<(), anyhow::Error>>,

// Channels for consensus proposals, used for asserting the right transactions are proposed.
pub consensus_proposals_channels: BroadcastTopicChannels<ProposalPart>,
pub consensus_proposals_channels: BroadcastTopicChannels<StreamMessage<ProposalPart>>,
}

impl FlowTestSetup {
Expand Down
10 changes: 6 additions & 4 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use mempool_test_utils::starknet_api_test_utils::{AccountId, MultiAccountTransac
use papyrus_consensus::config::ConsensusConfig;
use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
use papyrus_storage::StorageConfig;
use starknet_api::block::BlockNumber;
use starknet_api::contract_address;
Expand Down Expand Up @@ -42,7 +42,7 @@ pub async fn create_config(
chain_info: ChainInfo,
rpc_server_addr: SocketAddr,
batcher_storage_config: StorageConfig,
) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels<ProposalPart>) {
) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels<StreamMessage<ProposalPart>>) {
let fee_token_addresses = chain_info.fee_token_addresses.clone();
let batcher_config = create_batcher_config(batcher_storage_config, chain_info.clone());
let gateway_config = create_gateway_config(chain_info.clone()).await;
Expand Down Expand Up @@ -72,14 +72,16 @@ pub async fn create_config(

fn create_consensus_manager_configs_and_channels(
n_managers: usize,
) -> (Vec<ConsensusManagerConfig>, BroadcastTopicChannels<ProposalPart>) {
) -> (Vec<ConsensusManagerConfig>, BroadcastTopicChannels<StreamMessage<ProposalPart>>) {
let (network_configs, broadcast_channels) =
create_network_configs_connected_to_broadcast_channels(
n_managers,
papyrus_network::gossipsub_impl::Topic::new(
starknet_consensus_manager::consensus_manager::CONSENSUS_PROPOSALS_TOPIC,
// TODO(guyn): return this to NETWORK_TOPIC once we have integrated streaming.
starknet_consensus_manager::consensus_manager::NETWORK_TOPIC2,
),
);
// TODO: Need to also add a channel for votes, in addition to the proposals channel.

let consensus_manager_configs = network_configs
.into_iter()
Expand Down
Loading
Loading