Skip to content

Commit

Permalink
chore(consensus): add documentation to consensus crate
Browse files Browse the repository at this point in the history
  • Loading branch information
matan-starkware committed Dec 16, 2024
1 parent 2602630 commit 7af7abc
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 41 deletions.
41 changes: 31 additions & 10 deletions crates/sequencing/papyrus_consensus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
#![warn(missing_docs)]
// TODO(Matan): Add a description of the crate.
// TODO(Matan): Add links to the spec.
// TODO(Matan): fix #[allow(missing_docs)].
//! A consensus implementation for a [`Starknet`](https://www.starknet.io/) node.
//! A consensus implementation for a [Starknet](https://www.starknet.io/) node. The consensus
//! algorithm is based on [Tendermint](https://arxiv.org/pdf/1807.04938).
//!
//! Consensus communicates with other nodes via a gossip network; sending and receiving votes on one
//! topic and streaming proposals on a separate topic. [details](https://github.com/starknet-io/starknet-p2p-specs/tree/main/p2p/proto/consensus).
//!
//! In addition to the network inputs, consensus reaches out to the rest of the node via the
//! [`Context`](types::ConsensusContext) API.
//!
//! Consensus is generic over the content of the proposals, and merely requires an identifier to be
//! produced by the Context.
//!
//! Consensus can run in two modes:
//! 1. Observer - receive consensus messages and update the node when a decision is reached.
//! 2. Active - in addition to receiving messages the node can send messages to the network.
//!
//! Observer mode is lower latency than sync, since we process Proposals and votes as they happen,
//! not after the decision is made.
//!
//! Consensus is an active component, it doesn't follow the server/client model:
//! 1. The outbound messages are not sent as responses to the inbound messages.
//! 2. It generates and runs its own events (e.g. timeouts).
pub mod config;
pub mod manager;
#[allow(missing_docs)]
pub mod types;
pub use manager::run_consensus;
#[allow(missing_docs)]
pub mod simulation_network_receiver;
pub mod stream_handler;

mod manager;
#[allow(missing_docs)]
pub mod single_height_consensus;
mod single_height_consensus;
#[allow(missing_docs)]
pub mod state_machine;
pub mod stream_handler;
mod state_machine;
#[cfg(test)]
pub(crate) mod test_utils;
#[allow(missing_docs)]
pub mod types;

pub use manager::run_consensus;
52 changes: 43 additions & 9 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
//! Consensus manager, see Manager struct.
//! Top level of consensus, used to run multiple heights of consensus.
//!
//! [`run_consensus`] - This is the primary entrypoint for running the consensus component.
//!
//! [`MultiHeightManager`] - Run consensus repeatedly across different heights
//! ([`run_height`](MultiHeightManager::run_height)).
#[cfg(test)]
#[path = "manager_test.rs"]
Expand Down Expand Up @@ -28,7 +33,26 @@ use crate::types::{
ValidatorId,
};

/// Run consensus indefinitely.
///
/// If a decision is reached via consensus the context is updated. If a decision is learned via the
/// sync protocol, consensus silently moves on to the next height.
///
/// Inputs:
/// - `context`: The API for consensus to reach out to the rest of the node.
/// - `start_active_height`: The height at which the node may participate in consensus (if it is a
/// validator).
/// - `start_observe_height`: The height at which the node begins to run consensus.
/// - `validator_id`: The ID of this node.
/// - `consensus_delay`: The delay before starting consensus. There to allow the network to connect
/// to peers.
/// - `timeouts`: The timeouts for the consensus algorithm.
/// - `inbound_vote_receiver`: The channels to receive votes from the network. These are self
/// contained messages.
/// - `inbound_proposal_receiver`: The channel to receive proposals from the network. Proposals are
/// represented as streams (ProposalInit, Content.*, ProposalFin).
// TODO(dvir): add test for this.
// TODO(Asmaa): Update documentation when we update for the real sync.
#[instrument(skip_all, level = "info")]
#[allow(missing_docs)]
#[allow(clippy::too_many_arguments)]
Expand All @@ -39,8 +63,8 @@ pub async fn run_consensus<ContextT, SyncReceiverT>(
validator_id: ValidatorId,
consensus_delay: Duration,
timeouts: TimeoutsConfig,
mut broadcast_channels: BroadcastConsensusMessageChannel,
mut inbound_proposal_receiver: mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
mut vote_receiver: BroadcastConsensusMessageChannel,
mut proposal_receiver: mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
mut sync_receiver: SyncReceiverT,
) -> Result<(), ConsensusError>
where
Expand Down Expand Up @@ -72,8 +96,8 @@ where
&mut context,
current_height,
is_observer,
&mut broadcast_channels,
&mut inbound_proposal_receiver,
&mut vote_receiver,
&mut proposal_receiver,
&mut sync_receiver,
)
.await?
Expand Down Expand Up @@ -115,7 +139,7 @@ struct MultiHeightManager<ContextT: ConsensusContext> {

impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
/// Create a new consensus manager.
pub fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self {
pub(crate) fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self {
Self {
validator_id,
cached_messages: BTreeMap::new(),
Expand All @@ -126,10 +150,20 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {

/// Run the consensus algorithm for a single height.
///
/// Assumes that `height` is monotonically increasing across calls for the sake of filtering
/// `cached_messaged`.
/// A height of consensus ends either when the node learns of a decision, either by consensus
/// directly or via the sync protocol.
/// - An error implies that consensus cannot continue, not just that the current height failed.
///
/// This is the "top level" task of consensus, which is able to multiplex across activities:
/// network messages and self generated events.
///
/// Assumes that `height` is monotonically increasing across calls.
///
/// Inputs - see [`run_consensus`].
/// - `is_observer`: Whether the node must observe or if it is allowed to be active (assuming it
/// is in the validator set).
#[instrument(skip(self, context, broadcast_channels, sync_receiver), level = "info")]
pub async fn run_height<SyncReceiverT>(
pub(crate) async fn run_height<SyncReceiverT>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,8 @@ use starknet_api::block::BlockHash;
use starknet_api::core::{ContractAddress, PatriciaKey};
use tracing::{debug, instrument};

/// Receiver used to help run simulations of consensus. It has 2 goals in mind:
/// 1. Simulate network failures.
/// 2. Make tests repeatable - This is challenging because simulations involve a noisy environment;
/// so the actual network issues experienced may differ between 2 test runs.
/// - We expect simulations to use fairly reliable networks. That means messages arriving in
/// different order between runs will make up most of the actual noise between runs, as
/// opposed to actual drops or corruption.
/// - Tendermint is, to a large extent, unaffected by minor network reorderings. For instance it
/// doesn't matter if prevotes arrive before or after the Proposal they are for.
/// - This struct is therefore also designed not to be overly sensistive to message order. If
/// message A was dropped by this struct in one run, it should be dropped in the rerun. This
/// is as opposed to using a stateful RNG where the random number is a function of all the
/// previous calls to the RNG.
/// Receiver which can simulate network issues in a repeatable manner. Simulates drops and network
/// corruption. The errors are meant to be repeatable regardless of the order of messages received.
pub struct NetworkReceiver {
pub broadcasted_messages_receiver: BroadcastTopicServer<ConsensusMessage>,
// Cache is used so that repeat sends of a message can be processed differently. For example,
Expand All @@ -43,6 +32,14 @@ pub struct NetworkReceiver {
}

impl NetworkReceiver {
/// Create a new NetworkReceiver.
///
/// Inputs:
/// - `broadcasted_messages_receiver`: The receiver to listen to.
/// - `cache_size`: A small cache risks repeat messages all being handled the same way.
/// - `seed`: Seed for the random number generator.
/// - `drop_probability`: Probability of dropping a message [0, 1].
/// - `invalid_probability`: Probability of making a message invalid [0, 1].
pub fn new(
broadcasted_messages_receiver: BroadcastTopicServer<ConsensusMessage>,
cache_size: usize,
Expand Down
26 changes: 22 additions & 4 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//! Run a single height of consensus.
//!
//! [`SingleHeightConsensus`] (SHC) - run consensus for a single height.
//!
//! [`ShcTask`] - a task which should be run without blocking consensus.
//!
//! [`ShcEvent`] - an event, generated from an `ShcTask` which should be handled by the SHC.
#[cfg(test)]
#[path = "single_height_consensus_test.rs"]
mod single_height_consensus_test;
Expand All @@ -24,13 +32,16 @@ use crate::types::{
ValidatorId,
};

/// The SHC can either update the manager of a decision or return tasks that should be run without
/// blocking further calls to itself.
#[derive(Debug, PartialEq)]
#[cfg_attr(test, derive(EnumAsInner))]
pub enum ShcReturn {
Tasks(Vec<ShcTask>),
Decision(Decision),
}

/// Events produced from tasks for the SHC to handle.
#[derive(Debug, Clone)]
pub enum ShcEvent {
TimeoutPropose(StateMachineEvent),
Expand All @@ -43,6 +54,7 @@ pub enum ShcEvent {
ValidateProposal(StateMachineEvent, Option<ProposalFin>),
}

/// A task which should be run without blocking calls to SHC.
#[derive(Debug)]
#[cfg_attr(test, derive(EnumAsInner))]
pub enum ShcTask {
Expand Down Expand Up @@ -135,10 +147,16 @@ impl ShcTask {
}
}

/// Struct which represents a single height of consensus. Each height is expected to be begun with a
/// call to `start`, which is relevant if we are the proposer for this height's first round.
/// SingleHeightConsensus receives messages directly as parameters to function calls. It can send
/// out messages "directly" to the network, and returning a decision to the caller.
/// Represents a single height of consensus. It is responsible for mapping between the idealized
/// view of consensus represented in the StateMachine and the real world implementation. Example:
/// - Timeouts: the SM returns an event timeout, but SHC then maps that to a task which can be run
/// by the Manager. The manager though unaware of the specific task as it has minimal consensus
/// logic.
///
/// Each height is begun with a call to `start`, with no further calls to it.
///
/// SHC is not a top level task, it is called directly and returns values (doesn't directly run sub
/// tasks). SHC does have side effects, such as sending messages to the network via the context.
pub(crate) struct SingleHeightConsensus {
height: BlockNumber,
validators: Vec<ValidatorId>,
Expand Down
11 changes: 6 additions & 5 deletions crates/sequencing/papyrus_consensus/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::types::{ProposalContentId, Round, ValidatorId};
pub enum StateMachineEvent {
/// Sent by the state machine when a block is required to propose (ProposalContentId is always
/// None). While waiting for the response of GetProposal, the state machine will buffer all
/// other events. The caller must respond with a valid proposal id for this height to the
/// other events. The caller *must* respond with a valid proposal id for this height to the
/// state machine, and the same round sent out.
GetProposal(Option<ProposalContentId>, Round),
/// Consensus message, can be both sent from and to the state machine.
Expand Down Expand Up @@ -48,9 +48,10 @@ pub enum Step {
}

/// State Machine. Major assumptions:
/// 1. SHC handles replays and conflicts.
/// 1. SHC handles: authentication, replays, and conflicts.
/// 2. SM must handle "out of order" messages (E.g. vote arrives before proposal).
/// 3. No network failures.
///
/// Each height is begun with a call to `start`, with no further calls to it.
pub struct StateMachine {
id: ValidatorId,
round: Round,
Expand Down Expand Up @@ -117,8 +118,8 @@ impl StateMachine {

/// Process the incoming event.
///
/// If we are waiting for a response to `GetProposal` all other incoming events are buffered
/// until that response arrives.
/// If we are waiting for a response to [`GetProposal`](`StateMachineEvent::GetProposal`) all
/// other incoming events are buffered until that response arrives.
///
/// Returns a set of events for the caller to handle. The caller should not mirror the output
/// events back to the state machine, as it makes sure to handle them before returning.
Expand Down
4 changes: 4 additions & 0 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Types for interfacing between consensus and the node.
use std::fmt::Debug;
use std::time::Duration;

Expand Down Expand Up @@ -28,6 +30,8 @@ pub type ProposalContentId = BlockHash;
pub const DEFAULT_VALIDATOR_ID: u64 = 100;

/// Interface for consensus to call out to the node.
///
/// Function calls should be assumed to not be cancel safe.
#[async_trait]
pub trait ConsensusContext {
/// The parts of the proposal that are streamed in.
Expand Down

0 comments on commit 7af7abc

Please sign in to comment.