Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sequencing): cache proposals from bigger heights #2325

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"segment_arena_cells": true,
"disable_cairo0_redeclaration": false,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"segment_arena_cells": true,
"disable_cairo0_redeclaration": false,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"segment_arena_cells": true,
"disable_cairo0_redeclaration": false,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
},
"disable_cairo0_redeclaration": true,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
},
"disable_cairo0_redeclaration": true,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
},
"disable_cairo0_redeclaration": true,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
},
"disable_cairo0_redeclaration": true,
"enable_stateful_compression": true,
"comprehensive_state_diff": true,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
1 change: 1 addition & 0 deletions crates/blockifier/src/versioned_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub struct VersionedConstants {
// Transactions settings.
pub disable_cairo0_redeclaration: bool,
pub enable_stateful_compression: bool,
pub comprehensive_state_diff: bool,
pub ignore_inner_event_resources: bool,

// Compiler settings.
Expand Down
86 changes: 62 additions & 24 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_C
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument, warn};
use tracing::{debug, info, instrument};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
Expand Down Expand Up @@ -94,34 +94,37 @@ where
/// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly
/// part of the single height consensus algorithm (e.g. messages from future heights).
#[derive(Debug, Default)]
struct MultiHeightManager {
struct MultiHeightManager<ContextT: ConsensusContext> {
validator_id: ValidatorId,
cached_messages: BTreeMap<u64, Vec<ConsensusMessage>>,
cached_proposals: BTreeMap<u64, (ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)>,
timeouts: TimeoutsConfig,
}

impl MultiHeightManager {
impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
/// Create a new consensus manager.
pub fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self {
Self { validator_id, cached_messages: BTreeMap::new(), timeouts }
Self {
validator_id,
cached_messages: BTreeMap::new(),
cached_proposals: BTreeMap::new(),
timeouts,
}
}

/// Run the consensus algorithm for a single height.
///
/// Assumes that `height` is monotonically increasing across calls for the sake of filtering
/// `cached_messaged`.
#[instrument(skip(self, context, broadcast_channels), level = "info")]
pub async fn run_height<ContextT>(
pub async fn run_height(
&mut self,
context: &mut ContextT,
height: BlockNumber,
is_observer: bool,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
) -> Result<Decision, ConsensusError>
where
ContextT: ConsensusContext,
{
) -> Result<Decision, ConsensusError> {
let validators = context.validators(height).await;
info!("running consensus for height {height:?} with validator set {validators:?}");
let mut shc = SingleHeightConsensus::new(
Expand All @@ -143,14 +146,31 @@ impl MultiHeightManager {
}

let mut current_height_messages = self.get_current_height_messages(height);
// If there's already a cached proposal, handle that before looping.
if let Some((init, proposal)) = self.get_current_proposal(height) {
let shc_return =
self.handle_proposal(context, height, &mut shc, init, proposal).await?;
// Handle potential tasks like validate the proposal.
match shc_return {
ShcReturn::Decision(decision) => return Ok(decision),
ShcReturn::Tasks(tasks) => {
for task in tasks {
shc_events.push(task.run());
}
}
}
};

// No cached proposal, loop over incoming proposals, messages, cached messages, and events.
loop {
let shc_return = tokio::select! {
// TODO(Matan): remove report peer / continue propagation, as they are not cancel safe.
message = next_message(&mut current_height_messages, broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Some(mut content_receiver) = proposal_receiver.next() => {
// Get the first message to verify the init was sent.
// TODO(guyn): add a timeout and panic, since StreamHandler should only send once
// TODO(guyn): add a timeout and panic, since StreamHandler should only send once
// the first message (message_id=0) has arrived.
let Some(first_part) = content_receiver.next().await else {
return Err(ConsensusError::InternalNetworkError(
Expand All @@ -177,37 +197,35 @@ impl MultiHeightManager {
}

// Handle a new proposal receiver from the network.
async fn handle_proposal<ContextT>(
async fn handle_proposal(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
proposal_init: ProposalInit,
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
{
// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
) -> Result<ShcReturn, ConsensusError> {
if proposal_init.height != height {
// TODO(guyn): add caching of heights for future use.
warn!("Received a proposal for a different height. {:?}", proposal_init);
debug!("Received a proposal for a different height. {:?}", proposal_init);
if proposal_init.height > height {
// Note: this will overwrite an existing content_receiver for this height!
self.cached_proposals
.insert(proposal_init.height.0, (proposal_init, content_receiver));
}
return Ok(ShcReturn::Tasks(Vec::new()));
}
shc.handle_proposal(context, proposal_init, content_receiver).await
}

// Handle a single consensus message.
async fn handle_message<ContextT>(
async fn handle_message(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
message: ConsensusMessage,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
{
// TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints:
) -> Result<ShcReturn, ConsensusError> {
// TODO(matan): We need to figure out an actual caching strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
// In general I think we will want to only cache (H+1, 0) messages.
Expand All @@ -229,6 +247,26 @@ impl MultiHeightManager {
}
}

// Checks if a cached proposal already exists
// - returns the proposal if it exists and removes it from the cache.
// - returns None if no proposal exists.
// - cleans up any proposals from earlier heights.
fn get_current_proposal(
&mut self,
height: BlockNumber,
) -> Option<(ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)> {
loop {
let entry = self.cached_proposals.first_entry()?;
match entry.key().cmp(&height.0) {
std::cmp::Ordering::Greater => return None,
std::cmp::Ordering::Equal => return Some(entry.remove()),
std::cmp::Ordering::Less => {
entry.remove();
}
}
}
}

// Filters the cached messages:
// - returns all of the current height messages.
// - drops messages from earlier heights.
Expand Down
Loading
Loading