Skip to content

Commit

Permalink
feat: broadcast proposal as stream
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 1, 2024
1 parent f54941b commit 0271cd9
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@ use papyrus_consensus::types::{
ValidatorId,
};
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalFin, ProposalInit, ProposalPart, TransactionBatch, Vote};
use papyrus_protobuf::consensus::{
ConsensusMessage,
Proposal,
ProposalFin,
ProposalInit,
ProposalPart,
TransactionBatch,
Vote,
};
use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,6 @@ async fn stream_build_proposal(
.entry(height)
.or_default()
.insert(proposal_content_id, (content, proposal_id));
// proposal_sender
// .send(ProposalPart::Fin(ProposalFin { proposal_content_id }))
// .await
// .expect("Failed to broadcast proposal fin");
if fin_sender.send(proposal_content_id).is_err() {
// Consensus may exit early (e.g. sync).
warn!("Failed to send proposal content id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,7 @@ async fn repropose() {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
} = subscriber_channels;
<<<<<<< HEAD
let (outbound_internal_sender, _inbound_internal_receiver, _) =
=======
let (outbound_internal_sender, _inbound_internal_receiver) =
>>>>>>> 883a253be (feat: allow a streamed proposal channel on top of existing one)
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);

let mut context = SequencerConsensusContext::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/starknet_api/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl From<(Transaction, TransactionHash)> for crate::executable_transaction::Tra
),
_ => {
unimplemented!("Unsupported transaction type. Only Invoke is currently supported.")
},
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/starknet_integration_tests/src/flow_test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::net::SocketAddr;

use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_gateway_types::errors::GatewaySpecError;
Expand Down Expand Up @@ -32,7 +32,7 @@ pub struct FlowTestSetup {
pub sequencer_node_handle: JoinHandle<Result<(), anyhow::Error>>,

// Channels for consensus proposals, used for asserting the right transactions are proposed.
pub consensus_proposals_channels: BroadcastTopicChannels<ProposalPart>,
pub consensus_proposals_channels: BroadcastTopicChannels<StreamMessage<ProposalPart>>,
}

impl FlowTestSetup {
Expand Down
9 changes: 5 additions & 4 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use mempool_test_utils::starknet_api_test_utils::{
use papyrus_consensus::config::ConsensusConfig;
use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
use papyrus_storage::StorageConfig;
use reqwest::{Client, Response};
use starknet_api::block::BlockNumber;
Expand Down Expand Up @@ -49,7 +49,7 @@ pub async fn create_config(
chain_info: ChainInfo,
rpc_server_addr: SocketAddr,
batcher_storage_config: StorageConfig,
) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels<ProposalPart>) {
) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels<StreamMessage<ProposalPart>>) {
let fee_token_addresses = chain_info.fee_token_addresses.clone();
let batcher_config = create_batcher_config(batcher_storage_config, chain_info.clone());
let gateway_config = create_gateway_config(chain_info.clone()).await;
Expand Down Expand Up @@ -77,11 +77,12 @@ pub async fn create_config(
}

fn create_consensus_manager_config_and_channels()
-> (ConsensusManagerConfig, BroadcastTopicChannels<ProposalPart>) {
-> (ConsensusManagerConfig, BroadcastTopicChannels<StreamMessage<ProposalPart>>) {
let (network_config, broadcast_channels) =
create_network_config_connected_to_broadcast_channels(
papyrus_network::gossipsub_impl::Topic::new(
starknet_consensus_manager::consensus_manager::NETWORK_TOPIC,
// TODO(guyn): return this to NETWORK_TOPIC once we have integrated streaming.
starknet_consensus_manager::consensus_manager::NETWORK_TOPIC2,
),
);
let consensus_manager_config = ConsensusManagerConfig {
Expand Down
48 changes: 37 additions & 11 deletions crates/starknet_integration_tests/tests/end_to_end_flow_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ use std::collections::HashSet;
use futures::StreamExt;
use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, ProposalPart};
use papyrus_protobuf::consensus::{
ProposalFin,
ProposalInit,
ProposalPart,
StreamMessage,
StreamMessageBody,
};
use papyrus_storage::test_utils::CHAIN_ID_FOR_TESTS;
use pretty_assertions::assert_eq;
use rstest::{fixture, rstest};
Expand Down Expand Up @@ -48,7 +54,7 @@ async fn end_to_end(tx_generator: MultiAccountTransactionGenerator) {
}

async fn listen_to_broadcasted_messages(
consensus_proposals_channels: &mut BroadcastTopicChannels<ProposalPart>,
consensus_proposals_channels: &mut BroadcastTopicChannels<StreamMessage<ProposalPart>>,
expected_batched_tx_hashes: &[TransactionHash],
) {
let chain_id = CHAIN_ID_FOR_TESTS.clone();
Expand All @@ -67,25 +73,45 @@ async fn listen_to_broadcasted_messages(
"0x4597ceedbef644865917bf723184538ef70d43954d63f5b7d8cb9d1bd4c2c32",
)),
};
assert_eq!(
broadcasted_messages_receiver.next().await.unwrap().0.unwrap(),
ProposalPart::Init(expected_proposal_init)
);

let incoming_message = broadcasted_messages_receiver.next().await.unwrap().0.unwrap();
let incoming_stream_id = incoming_message.stream_id;
assert_eq!(incoming_message.message_id, 0);
let incoming_message = incoming_message.message;
let StreamMessageBody::Content(ProposalPart::Init(received_proposal_init)) = incoming_message
else {
panic!("Unexpected init: {:?}", incoming_message);
};
assert_eq!(received_proposal_init, expected_proposal_init);

let mut proposal_parts_fin = false;
let mut message_body_fin = false;
loop {
match broadcasted_messages_receiver.next().await.unwrap().0.unwrap() {
ProposalPart::Init(init) => panic!("Unexpected init: {:?}", init),
ProposalPart::Fin(proposal_fin) => {
let message = broadcasted_messages_receiver.next().await.unwrap().0.unwrap();
assert_eq!(message.stream_id, incoming_stream_id);
match message.message {
StreamMessageBody::Content(ProposalPart::Init(init)) => {
panic!("Unexpected init: {:?}", init)
}
StreamMessageBody::Content(ProposalPart::Fin(proposal_fin)) => {
assert_eq!(proposal_fin, expected_proposal_fin);
break;
proposal_parts_fin = true;
}
ProposalPart::Transactions(transactions) => {
StreamMessageBody::Content(ProposalPart::Transactions(transactions)) => {
received_tx_hashes.extend(
transactions
.transactions
.iter()
.map(|tx| tx.calculate_transaction_hash(&chain_id).unwrap()),
);
}
// Ignore this, in case it comes out of the network before some of the other messages.
StreamMessageBody::Fin => {
message_body_fin = true;
}
}
if proposal_parts_fin && message_body_fin {
break;
}
}
// Using HashSet to ignore the order of the transactions (broadcast can lead to reordering).
Expand Down

0 comments on commit 0271cd9

Please sign in to comment.