Skip to content

Commit

Permalink
feat(consensus): send proposal content
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-starkware committed Nov 5, 2024
1 parent 98f96a6 commit 1b85d33
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 9 deletions.
12 changes: 10 additions & 2 deletions crates/consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use futures::SinkExt;
use libp2p::PeerId;
use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError};
use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::ConsensusMessage;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart};
use starknet_batcher_types::communication::SharedBatcherClient;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
Expand All @@ -34,10 +35,17 @@ impl ConsensusManager {
}

pub async fn run(&self) -> Result<(), ConsensusError> {
let network_manager =
let mut network_manager =
NetworkManager::new(self.config.consensus_config.network_config.clone(), None);
let proposals_broadcast_channels = network_manager
.register_broadcast_topic::<ProposalPart>(
Topic::new(NETWORK_TOPIC),
BROADCAST_BUFFER_SIZE,
)
.expect("Failed to register broadcast topic");
let context = SequencerConsensusContext::new(
Arc::clone(&self.batcher_client),
proposals_broadcast_channels.broadcast_topic_client.clone(),
self.config.consensus_config.num_validators,
);
let network_handle = tokio::task::spawn(network_manager.run());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ use papyrus_consensus::types::{
Round,
ValidatorId,
};
use papyrus_protobuf::consensus::{ConsensusMessage, Vote};
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalInit as ProtobufProposalInit,
ProposalPart,
TransactionBatch,
Vote,
};
use starknet_api::block::{BlockHash, BlockHashAndNumber, BlockNumber};
use starknet_api::executable_transaction::Transaction;
use starknet_batcher_types::batcher_types::{
Expand All @@ -36,7 +43,7 @@ use starknet_batcher_types::batcher_types::{
ValidateProposalInput,
};
use starknet_batcher_types::communication::BatcherClient;
use tracing::{debug, debug_span, error, info, warn, Instrument};
use tracing::{debug, debug_span, error, info, trace, warn, Instrument};

// {height: {proposal_id: (content, [proposal_ids])}}
// Note that multiple proposals IDs can be associated with the same content, but we only need to
Expand All @@ -56,12 +63,18 @@ pub struct SequencerConsensusContext {
// restarting.
proposal_id: u64,
current_height: Option<BlockNumber>,
network_broadcast_client: BroadcastTopicClient<ProposalPart>,
}

impl SequencerConsensusContext {
pub fn new(batcher: Arc<dyn BatcherClient>, num_validators: u64) -> Self {
pub fn new(
batcher: Arc<dyn BatcherClient>,
network_broadcast_client: BroadcastTopicClient<ProposalPart>,
num_validators: u64,
) -> Self {
Self {
batcher,
network_broadcast_client,
validators: (0..num_validators).map(ValidatorId::from).collect(),
valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())),
proposal_id: 0,
Expand Down Expand Up @@ -112,13 +125,26 @@ impl ConsensusContext for SequencerConsensusContext {
.build_proposal(build_proposal_input)
.await
.expect("Failed to initiate proposal build");
let protobuf_consensus_init = ProtobufProposalInit {
height: proposal_init.height.0,
round: proposal_init.round,
proposer: proposal_init.proposer,
valid_round: proposal_init.valid_round,
};
debug!("Broadcasting proposal init: {protobuf_consensus_init:?}");
self.network_broadcast_client
.broadcast_message(ProposalPart::Init(protobuf_consensus_init))
.await
.expect("Failed to broadcast proposal init");
let broadcast_client = self.network_broadcast_client.clone();
tokio::spawn(
async move {
stream_build_proposal(
proposal_init.height,
proposal_id,
batcher,
valid_proposals,
broadcast_client,
fin_sender,
)
.await;
Expand Down Expand Up @@ -255,6 +281,7 @@ async fn stream_build_proposal(
proposal_id: ProposalId,
batcher: Arc<dyn BatcherClient>,
valid_proposals: Arc<Mutex<HeightToIdToContent>>,
mut broadcast_client: BroadcastTopicClient<ProposalPart>,
fin_sender: oneshot::Sender<ProposalContentId>,
) {
let mut content = Vec::new();
Expand All @@ -273,6 +300,20 @@ async fn stream_build_proposal(
// TODO: Broadcast the transactions to the network.
// TODO(matan): Convert to protobuf and make sure this isn't too large for a single
// proto message (could this be a With adapter added to the channel in `new`?).
let mut transaction_hashes = Vec::with_capacity(txs.len());
let mut transactions = Vec::with_capacity(txs.len());
txs.into_iter().for_each(|tx| {
transaction_hashes.push(tx.tx_hash());
transactions.push(tx.tx());
});
debug!("Broadcasting proposal content: {transaction_hashes:?}");
trace!("Broadcasting proposal content: {transactions:?}");
broadcast_client
.broadcast_message(ProposalPart::Transactions(TransactionBatch {
transactions,
}))
.await
.expect("Failed to broadcast proposal content");
}
GetProposalContent::Finished(id) => {
let proposal_content_id = BlockHash(id.state_diff_commitment.0.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ use futures::channel::mpsc;
use futures::SinkExt;
use lazy_static::lazy_static;
use papyrus_consensus::types::{ConsensusContext, ProposalInit};
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
TestSubscriberChannels,
};
use papyrus_network::network_manager::BroadcastTopicChannels;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::{ContractAddress, StateDiffCommitment};
use starknet_api::executable_transaction::Transaction;
Expand Down Expand Up @@ -73,7 +79,15 @@ async fn build_proposal() {
}),
})
});
let mut context = SequencerConsensusContext::new(Arc::new(batcher), NUM_VALIDATORS);
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;
let BroadcastNetworkMock {
broadcasted_messages_sender: _mock_broadcasted_messages_sender, ..
} = mock_network;
let mut context =
SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS);
let init = ProposalInit {
height: BlockNumber(0),
round: 0,
Expand Down Expand Up @@ -121,7 +135,12 @@ async fn validate_proposal_success() {
})
},
);
let mut context = SequencerConsensusContext::new(Arc::new(batcher), NUM_VALIDATORS);
let TestSubscriberChannels { mock_network: _, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;
let mut context =
SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS);
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(TX_BATCH.clone()).await.unwrap();
let fin_receiver = context.validate_proposal(BlockNumber(0), TIMEOUT, content_receiver).await;
Expand Down Expand Up @@ -154,8 +173,12 @@ async fn repropose() {
})
},
);

let mut context = SequencerConsensusContext::new(Arc::new(batcher), NUM_VALIDATORS);
let TestSubscriberChannels { mock_network: _, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;
let mut context =
SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS);

// Receive a valid proposal.
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
Expand Down
10 changes: 10 additions & 0 deletions crates/starknet_api/src/executable_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ pub enum Transaction {
}

impl Transaction {
pub fn tx(self) -> crate::transaction::Transaction {
match self {
Transaction::Declare(tx_data) => crate::transaction::Transaction::Declare(tx_data.tx),
Transaction::DeployAccount(tx_data) => {
crate::transaction::Transaction::DeployAccount(tx_data.tx)
}
Transaction::Invoke(tx_data) => crate::transaction::Transaction::Invoke(tx_data.tx),
}
}

pub fn contract_address(&self) -> ContractAddress {
match self {
Transaction::Declare(tx_data) => tx_data.tx.sender_address(),
Expand Down

0 comments on commit 1b85d33

Please sign in to comment.