Skip to content

Commit

Permalink
fix(sequencing): remove old proposal pipes from consensus (#2452)
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware authored Dec 5, 2024
1 parent 75de4cd commit e1047bc
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ pub struct SequencerConsensusContext {
proposal_id: u64,
current_height: Option<BlockNumber>,
current_round: Round,
// Used to broadcast proposals to other consensus nodes.
// TODO(Guy) switch to the actual streaming struct.
_proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
// The active proposal refers to the proposal being validated at the current height/round.
// Building proposals are not tracked as active, as consensus can't move on to the next
// height/round until building is done. Context only works on proposals for the
Expand All @@ -112,14 +109,12 @@ pub struct SequencerConsensusContext {
impl SequencerConsensusContext {
pub fn new(
batcher: Arc<dyn BatcherClient>,
_proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
vote_broadcast_client: BroadcastTopicClient<ConsensusMessage>,
num_validators: u64,
) -> Self {
Self {
batcher,
_proposal_streaming_client,
outbound_proposal_sender,
vote_broadcast_client,
// TODO(Matan): Set the actual validator IDs (contract addresses).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ fn generate_executable_invoke_tx(tx_hash: Felt) -> ExecutableTransaction {
// Structs which aren't utilized but should not be dropped.
struct NetworkDependencies {
_vote_network: BroadcastNetworkMock<ConsensusMessage>,
_old_proposal_network: BroadcastNetworkMock<ProposalPart>,
_new_proposal_network: BroadcastNetworkMock<StreamMessage<ProposalPart>>,
}

Expand All @@ -85,28 +84,20 @@ fn setup(batcher: MockBatcherClient) -> (SequencerConsensusContext, NetworkDepen
let (outbound_proposal_stream_sender, _, _) =
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);

// TODO(guyn): remove this first set of channels once we are using only the streaming channels.
let TestSubscriberChannels { mock_network: mock_proposal_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcast_topic_client: proposal_streaming_client, .. } =
subscriber_channels;

let TestSubscriberChannels { mock_network: mock_vote_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcast_topic_client: votes_topic_client, .. } =
subscriber_channels;

let context = SequencerConsensusContext::new(
Arc::new(batcher),
proposal_streaming_client,
outbound_proposal_stream_sender,
votes_topic_client,
NUM_VALIDATORS,
);

let network_dependencies = NetworkDependencies {
_vote_network: mock_vote_network,
_old_proposal_network: mock_proposal_network,
_new_proposal_network: mock_proposal_stream_network,
};

Expand Down
23 changes: 1 addition & 22 deletions crates/starknet_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::any::type_name;
use std::sync::Arc;

use async_trait::async_trait;
use futures::StreamExt;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::ConsensusError;
use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext;
Expand All @@ -20,8 +19,6 @@ use crate::config::ConsensusManagerConfig;
pub const BROADCAST_BUFFER_SIZE: usize = 100;
pub const CONSENSUS_PROPOSALS_TOPIC: &str = "consensus_proposals";
pub const CONSENSUS_VOTES_TOPIC: &str = "consensus_votes";
// TODO(guyn): remove this once we have integrated streaming.
pub const NETWORK_TOPIC2: &str = "streamed_consensus_proposals";

#[derive(Clone)]
pub struct ConsensusManager {
Expand All @@ -38,17 +35,9 @@ impl ConsensusManager {
let mut network_manager =
NetworkManager::new(self.config.consensus_config.network_config.clone(), None);

// TODO(guyn): remove this channel once we have integrated streaming.
let mut old_proposals_broadcast_channels = network_manager
.register_broadcast_topic::<ProposalPart>(
Topic::new(CONSENSUS_PROPOSALS_TOPIC),
BROADCAST_BUFFER_SIZE,
)
.expect("Failed to register broadcast topic");

let proposals_broadcast_channels = network_manager
.register_broadcast_topic::<StreamMessage<ProposalPart>>(
Topic::new(NETWORK_TOPIC2),
Topic::new(CONSENSUS_PROPOSALS_TOPIC),
BROADCAST_BUFFER_SIZE,
)
.expect("Failed to register broadcast topic");
Expand All @@ -70,7 +59,6 @@ impl ConsensusManager {

let context = SequencerConsensusContext::new(
Arc::clone(&self.batcher_client),
old_proposals_broadcast_channels.broadcast_topic_client.clone(),
outbound_internal_sender,
votes_broadcast_channels.broadcast_topic_client.clone(),
self.config.consensus_config.num_validators,
Expand Down Expand Up @@ -103,15 +91,6 @@ impl ConsensusManager {
stream_handler_result = &mut stream_handler_task_handle => {
panic!("Consensus' stream handler task finished unexpectedly: {:?}", stream_handler_result);
}
_ = async {
while let Some(_broadcasted_message) =
old_proposals_broadcast_channels.broadcasted_messages_receiver.next().await
{
// TODO(matan): pass receiver to consensus and sender to context.
}
} => {
panic!("Broadcasted messages channel finished unexpectedly");
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ fn create_consensus_manager_configs_and_channels(
create_network_configs_connected_to_broadcast_channels(
n_managers,
papyrus_network::gossipsub_impl::Topic::new(
// TODO(guyn): return this to NETWORK_TOPIC once we have integrated streaming.
starknet_consensus_manager::consensus_manager::NETWORK_TOPIC2,
starknet_consensus_manager::consensus_manager::CONSENSUS_PROPOSALS_TOPIC,
),
);
// TODO: Need to also add a channel for votes, in addition to the proposals channel.
Expand Down

0 comments on commit e1047bc

Please sign in to comment.