Skip to content

Commit

Permalink
refactor(consensus): create manager which encapsulates messages cache…
Browse files Browse the repository at this point in the history
…d between heights
  • Loading branch information
matan-starkware committed Jul 30, 2024
1 parent 63a79b9 commit cf37628
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 95 deletions.
97 changes: 7 additions & 90 deletions crates/sequencing/papyrus_consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,17 @@ use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::Stream;
use manager::Manager;
use papyrus_common::metrics as papyrus_metrics;
use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal};
use papyrus_protobuf::converters::ProtobufConversionError;
use single_height_consensus::SingleHeightConsensus;
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::{debug, info, instrument};
use types::{
ConsensusBlock,
ConsensusContext,
ConsensusError,
Decision,
ProposalInit,
ValidatorId,
};
use types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};

pub mod config;
pub mod manager;
#[allow(missing_docs)]
pub mod papyrus_consensus_context;
#[allow(missing_docs)]
Expand All @@ -37,78 +31,6 @@ pub(crate) mod test_utils;
#[allow(missing_docs)]
pub mod types;

use futures::StreamExt;

#[instrument(skip(context, validator_id, network_receiver, cached_messages), level = "info")]
#[allow(missing_docs)]
async fn run_height<BlockT, ContextT, NetworkReceiverT>(
context: &mut ContextT,
height: BlockNumber,
validator_id: ValidatorId,
network_receiver: &mut NetworkReceiverT,
cached_messages: &mut Vec<ConsensusMessage>,
) -> Result<Decision<BlockT>, ConsensusError>
where
BlockT: ConsensusBlock,
ContextT: ConsensusContext<Block = BlockT>,
NetworkReceiverT:
Stream<Item = (Result<ConsensusMessage, ProtobufConversionError>, ReportSender)> + Unpin,
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<BlockT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
let validators = context.validators(height).await;
let mut shc = SingleHeightConsensus::new(height, validator_id, validators);

if let Some(decision) = shc.start(context).await? {
return Ok(decision);
}

let mut current_height_messages = Vec::new();
for msg in std::mem::take(cached_messages) {
match height.0.cmp(&msg.height()) {
std::cmp::Ordering::Less => cached_messages.push(msg),
std::cmp::Ordering::Equal => current_height_messages.push(msg),
std::cmp::Ordering::Greater => {}
}
}

loop {
let message = if let Some(msg) = current_height_messages.pop() {
msg
} else {
// TODO(matan): Handle parsing failures and utilize ReportCallback.
network_receiver
.next()
.await
.expect("Network receiver closed unexpectedly")
.0
.expect("Failed to parse consensus message")
};

if message.height() != height.0 {
debug!("Received a message for a different height. {:?}", message);
if message.height() > height.0 {
cached_messages.push(message);
}
continue;
}

let maybe_decision = match message {
ConsensusMessage::Proposal(proposal) => {
// Special case due to fake streaming.
let (proposal_init, content_receiver, fin_receiver) =
ProposalWrapper(proposal).into();
shc.handle_proposal(context, proposal_init, content_receiver, fin_receiver).await?
}
_ => shc.handle_message(context, message).await?,
};

if let Some(decision) = maybe_decision {
return Ok(decision);
}
}
}

// TODO(dvir): add test for this.
#[instrument(skip(context, start_height, network_receiver), level = "info")]
#[allow(missing_docs)]
Expand All @@ -130,16 +52,11 @@ where
// Add a short delay to allow peers to connect and avoid "InsufficientPeers" error
tokio::time::sleep(consensus_delay).await;
let mut current_height = start_height;
let mut future_messages = Vec::new();
let mut manager = Manager::new();
loop {
let decision = run_height(
&mut context,
current_height,
validator_id,
&mut network_receiver,
&mut future_messages,
)
.await?;
let decision = manager
.run_height(&mut context, current_height, validator_id, &mut network_receiver)
.await?;

info!(
"Finished consensus for height: {current_height}. Agreed on block with id: {:x}",
Expand Down
114 changes: 114 additions & 0 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//! Consensus manager, see Manager struct.
#[cfg(test)]
#[path = "manager_test.rs"]
mod manager_test;

use futures::channel::{mpsc, oneshot};
use futures::{Stream, StreamExt};
use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::ConsensusMessage;
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::{debug, instrument};

use crate::single_height_consensus::SingleHeightConsensus;
use crate::types::{
ConsensusBlock,
ConsensusContext,
ConsensusError,
Decision,
ProposalInit,
ValidatorId,
};
use crate::ProposalWrapper;

/// Used run Tendermint. Handles issues which are not explicitly part of the single height consensus
/// algorithm (e.g. messages from future heights).
pub struct Manager {
cached_messages: Vec<ConsensusMessage>,
}

impl Manager {
/// Create a new consensus manager.
pub fn new() -> Self {
Self { cached_messages: Vec::new() }
}

/// Run the consensus algorithm for a single height.
///
/// Assumes that `height` is monotonically increasing across calls for the sake of filtering
/// `cached_messaged`.
#[instrument(skip(self, context, validator_id, network_receiver), level = "info")]
pub async fn run_height<BlockT, ContextT, NetworkReceiverT>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
validator_id: ValidatorId,
network_receiver: &mut NetworkReceiverT,
) -> Result<Decision<BlockT>, ConsensusError>
where
BlockT: ConsensusBlock,
ContextT: ConsensusContext<Block = BlockT>,
NetworkReceiverT: Stream<Item = (Result<ConsensusMessage, ProtobufConversionError>, ReportSender)>
+ Unpin,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<BlockT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
{
let validators = context.validators(height).await;
let mut shc = SingleHeightConsensus::new(height, validator_id, validators);

if let Some(decision) = shc.start(context).await? {
return Ok(decision);
}

let mut current_height_messages = Vec::new();
for msg in std::mem::take(&mut self.cached_messages) {
match height.0.cmp(&msg.height()) {
std::cmp::Ordering::Less => self.cached_messages.push(msg),
std::cmp::Ordering::Equal => current_height_messages.push(msg),
std::cmp::Ordering::Greater => {}
}
}

loop {
let message = if let Some(msg) = current_height_messages.pop() {
msg
} else {
// TODO(matan): Handle parsing failures and utilize ReportCallback.
network_receiver
.next()
.await
.expect("Network receiver closed unexpectedly")
.0
.expect("Failed to parse consensus message")
};

if message.height() != height.0 {
debug!("Received a message for a different height. {:?}", message);
if message.height() > height.0 {
self.cached_messages.push(message);
}
continue;
}

let maybe_decision = match message {
ConsensusMessage::Proposal(proposal) => {
// Special case due to fake streaming.
let (proposal_init, content_receiver, fin_receiver) =
ProposalWrapper(proposal).into();
shc.handle_proposal(context, proposal_init, content_receiver, fin_receiver)
.await?
}
_ => shc.handle_message(context, message).await?,
};

if let Some(decision) = maybe_decision {
return Ok(decision);
}
}
}
}
Loading

0 comments on commit cf37628

Please sign in to comment.