From 14c5f8e0304962c7d568b56e71daa06f2b3a04ea Mon Sep 17 00:00:00 2001 From: yoavGrs <97383386+yoavGrs@users.noreply.github.com> Date: Wed, 4 Dec 2024 18:43:50 +0200 Subject: [PATCH] feat(blockifier): add comprehensive state diff versioned constant (#2407) --- .../resources/versioned_constants_0_13_0.json | 1 + .../resources/versioned_constants_0_13_1.json | 1 + .../versioned_constants_0_13_1_1.json | 1 + .../resources/versioned_constants_0_13_2.json | 1 + .../versioned_constants_0_13_2_1.json | 1 + .../resources/versioned_constants_0_13_3.json | 1 + .../resources/versioned_constants_0_13_4.json | 1 + crates/blockifier/src/versioned_constants.rs | 1 + .../papyrus_consensus/src/manager.rs | 86 +++++++++++----- .../papyrus_consensus/src/manager_test.rs | 99 +++++++++---------- .../papyrus_consensus/src/test_utils.rs | 27 +---- .../src/sequencer_consensus_context.rs | 1 + .../src/sequencer_consensus_context_test.rs | 22 +++-- 13 files changed, 132 insertions(+), 111 deletions(-) diff --git a/crates/blockifier/resources/versioned_constants_0_13_0.json b/crates/blockifier/resources/versioned_constants_0_13_0.json index 294fa14ecc..fd98c3ef07 100644 --- a/crates/blockifier/resources/versioned_constants_0_13_0.json +++ b/crates/blockifier/resources/versioned_constants_0_13_0.json @@ -9,6 +9,7 @@ "segment_arena_cells": true, "disable_cairo0_redeclaration": false, "enable_stateful_compression": false, + "comprehensive_state_diff": false, "allocation_cost": { "blob_cost": { "l1_gas": 0, diff --git a/crates/blockifier/resources/versioned_constants_0_13_1.json b/crates/blockifier/resources/versioned_constants_0_13_1.json index d70e03cc55..2b634be904 100644 --- a/crates/blockifier/resources/versioned_constants_0_13_1.json +++ b/crates/blockifier/resources/versioned_constants_0_13_1.json @@ -42,6 +42,7 @@ "segment_arena_cells": true, "disable_cairo0_redeclaration": false, "enable_stateful_compression": false, + "comprehensive_state_diff": false, "allocation_cost": { "blob_cost": { "l1_gas": 0, diff --git a/crates/blockifier/resources/versioned_constants_0_13_1_1.json b/crates/blockifier/resources/versioned_constants_0_13_1_1.json index 20463ac307..17d83179e5 100644 --- a/crates/blockifier/resources/versioned_constants_0_13_1_1.json +++ b/crates/blockifier/resources/versioned_constants_0_13_1_1.json @@ -42,6 +42,7 @@ "segment_arena_cells": true, "disable_cairo0_redeclaration": false, "enable_stateful_compression": false, + "comprehensive_state_diff": false, "allocation_cost": { "blob_cost": { "l1_gas": 0, diff --git a/crates/blockifier/resources/versioned_constants_0_13_2.json b/crates/blockifier/resources/versioned_constants_0_13_2.json index e02561d466..edec011294 100644 --- a/crates/blockifier/resources/versioned_constants_0_13_2.json +++ b/crates/blockifier/resources/versioned_constants_0_13_2.json @@ -40,6 +40,7 @@ }, "disable_cairo0_redeclaration": true, "enable_stateful_compression": false, + "comprehensive_state_diff": false, "allocation_cost": { "blob_cost": { "l1_gas": 0, diff --git a/crates/blockifier/resources/versioned_constants_0_13_2_1.json b/crates/blockifier/resources/versioned_constants_0_13_2_1.json index 0cc5406078..aa76c68c45 100644 --- a/crates/blockifier/resources/versioned_constants_0_13_2_1.json +++ b/crates/blockifier/resources/versioned_constants_0_13_2_1.json @@ -40,6 +40,7 @@ }, "disable_cairo0_redeclaration": true, "enable_stateful_compression": false, + "comprehensive_state_diff": false, "allocation_cost": { "blob_cost": { "l1_gas": 0, diff --git a/crates/blockifier/resources/versioned_constants_0_13_3.json b/crates/blockifier/resources/versioned_constants_0_13_3.json index 0cc5406078..aa76c68c45 100644 --- a/crates/blockifier/resources/versioned_constants_0_13_3.json +++ b/crates/blockifier/resources/versioned_constants_0_13_3.json @@ -40,6 +40,7 @@ }, "disable_cairo0_redeclaration": true, "enable_stateful_compression": false, + "comprehensive_state_diff": false, "allocation_cost": { "blob_cost": { "l1_gas": 0, diff --git a/crates/blockifier/resources/versioned_constants_0_13_4.json b/crates/blockifier/resources/versioned_constants_0_13_4.json index f8c835060d..3993b8567d 100644 --- a/crates/blockifier/resources/versioned_constants_0_13_4.json +++ b/crates/blockifier/resources/versioned_constants_0_13_4.json @@ -40,6 +40,7 @@ }, "disable_cairo0_redeclaration": true, "enable_stateful_compression": true, + "comprehensive_state_diff": true, "allocation_cost": { "blob_cost": { "l1_gas": 0, diff --git a/crates/blockifier/src/versioned_constants.rs b/crates/blockifier/src/versioned_constants.rs index 7be354b494..5c41d18b4d 100644 --- a/crates/blockifier/src/versioned_constants.rs +++ b/crates/blockifier/src/versioned_constants.rs @@ -192,6 +192,7 @@ pub struct VersionedConstants { // Transactions settings. pub disable_cairo0_redeclaration: bool, pub enable_stateful_compression: bool, + pub comprehensive_state_diff: bool, pub ignore_inner_event_resources: bool, // Compiler settings. diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 65dea0db9a..e90b5d5ee1 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -14,7 +14,7 @@ use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_C use papyrus_network::network_manager::BroadcastTopicClientTrait; use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit}; use starknet_api::block::BlockNumber; -use tracing::{debug, info, instrument, warn}; +use tracing::{debug, info, instrument}; use crate::config::TimeoutsConfig; use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus}; @@ -94,16 +94,22 @@ where /// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly /// part of the single height consensus algorithm (e.g. messages from future heights). #[derive(Debug, Default)] -struct MultiHeightManager { +struct MultiHeightManager { validator_id: ValidatorId, cached_messages: BTreeMap>, + cached_proposals: BTreeMap)>, timeouts: TimeoutsConfig, } -impl MultiHeightManager { +impl MultiHeightManager { /// Create a new consensus manager. pub fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self { - Self { validator_id, cached_messages: BTreeMap::new(), timeouts } + Self { + validator_id, + cached_messages: BTreeMap::new(), + cached_proposals: BTreeMap::new(), + timeouts, + } } /// Run the consensus algorithm for a single height. @@ -111,17 +117,14 @@ impl MultiHeightManager { /// Assumes that `height` is monotonically increasing across calls for the sake of filtering /// `cached_messaged`. #[instrument(skip(self, context, broadcast_channels), level = "info")] - pub async fn run_height( + pub async fn run_height( &mut self, context: &mut ContextT, height: BlockNumber, is_observer: bool, broadcast_channels: &mut BroadcastConsensusMessageChannel, proposal_receiver: &mut mpsc::Receiver>, - ) -> Result - where - ContextT: ConsensusContext, - { + ) -> Result { let validators = context.validators(height).await; info!("running consensus for height {height:?} with validator set {validators:?}"); let mut shc = SingleHeightConsensus::new( @@ -143,14 +146,31 @@ impl MultiHeightManager { } let mut current_height_messages = self.get_current_height_messages(height); + // If there's already a cached proposal, handle that before looping. + if let Some((init, proposal)) = self.get_current_proposal(height) { + let shc_return = + self.handle_proposal(context, height, &mut shc, init, proposal).await?; + // Handle potential tasks like validate the proposal. + match shc_return { + ShcReturn::Decision(decision) => return Ok(decision), + ShcReturn::Tasks(tasks) => { + for task in tasks { + shc_events.push(task.run()); + } + } + } + }; + + // No cached proposal, loop over incoming proposals, messages, cached messages, and events. loop { let shc_return = tokio::select! { + // TODO(Matan): remove report peer / continue propagation, as they are not cancel safe. message = next_message(&mut current_height_messages, broadcast_channels) => { self.handle_message(context, height, &mut shc, message?).await? }, Some(mut content_receiver) = proposal_receiver.next() => { // Get the first message to verify the init was sent. - // TODO(guyn): add a timeout and panic, since StreamHandler should only send once + // TODO(guyn): add a timeout and panic, since StreamHandler should only send once // the first message (message_id=0) has arrived. let Some(first_part) = content_receiver.next().await else { return Err(ConsensusError::InternalNetworkError( @@ -177,37 +197,35 @@ impl MultiHeightManager { } // Handle a new proposal receiver from the network. - async fn handle_proposal( + async fn handle_proposal( &mut self, context: &mut ContextT, height: BlockNumber, shc: &mut SingleHeightConsensus, proposal_init: ProposalInit, content_receiver: mpsc::Receiver, - ) -> Result - where - ContextT: ConsensusContext, - { - // TODO(guyn): what is the right thing to do if proposal's height doesn't match? + ) -> Result { if proposal_init.height != height { - // TODO(guyn): add caching of heights for future use. - warn!("Received a proposal for a different height. {:?}", proposal_init); + debug!("Received a proposal for a different height. {:?}", proposal_init); + if proposal_init.height > height { + // Note: this will overwrite an existing content_receiver for this height! + self.cached_proposals + .insert(proposal_init.height.0, (proposal_init, content_receiver)); + } + return Ok(ShcReturn::Tasks(Vec::new())); } shc.handle_proposal(context, proposal_init, content_receiver).await } // Handle a single consensus message. - async fn handle_message( + async fn handle_message( &mut self, context: &mut ContextT, height: BlockNumber, shc: &mut SingleHeightConsensus, message: ConsensusMessage, - ) -> Result - where - ContextT: ConsensusContext, - { - // TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints: + ) -> Result { + // TODO(matan): We need to figure out an actual caching strategy under 2 constraints: // 1. Malicious - must be capped so a malicious peer can't DoS us. // 2. Parallel proposals - we may send/receive a proposal for (H+1, 0). // In general I think we will want to only cache (H+1, 0) messages. @@ -229,6 +247,26 @@ impl MultiHeightManager { } } + // Checks if a cached proposal already exists + // - returns the proposal if it exists and removes it from the cache. + // - returns None if no proposal exists. + // - cleans up any proposals from earlier heights. + fn get_current_proposal( + &mut self, + height: BlockNumber, + ) -> Option<(ProposalInit, mpsc::Receiver)> { + loop { + let entry = self.cached_proposals.first_entry()?; + match entry.key().cmp(&height.0) { + std::cmp::Ordering::Greater => return None, + std::cmp::Ordering::Equal => return Some(entry.remove()), + std::cmp::Ordering::Less => { + entry.remove(); + } + } + } + } + // Filters the cached messages: // - returns all of the current height messages. // - drops messages from earlier heights. diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index c531dc9359..10f98ca2df 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -27,7 +27,7 @@ use starknet_types_core::felt::Felt; use super::{run_consensus, MultiHeightManager}; use crate::config::TimeoutsConfig; -use crate::test_utils::{precommit, prevote, proposal, proposal_init}; +use crate::test_utils::{precommit, prevote, proposal_init}; use crate::types::{ConsensusContext, ConsensusError, ProposalContentId, Round, ValidatorId}; lazy_static! { @@ -93,14 +93,31 @@ async fn send(sender: &mut MockBroadcastedMessagesSender, msg: async fn send_proposal( proposal_receiver_sender: &mut mpsc::Sender>, - content: ProposalPart, + content: Vec, ) { let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); proposal_receiver_sender.send(proposal_receiver).await.unwrap(); - proposal_sender.send(content).await.unwrap(); + for item in content { + proposal_sender.send(item).await.unwrap(); + } +} + +fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt) { + context + .expect_validate_proposal() + .return_once(move |_, _, _, _, _| { + let (block_sender, block_receiver) = oneshot::channel(); + block_sender + .send(( + BlockHash(block_hash), + ProposalFin { proposal_content_id: BlockHash(block_hash) }, + )) + .unwrap(); + block_receiver + }) + .times(1); } -#[ignore] // TODO(guyn): return this once caching proposals is implemented. #[tokio::test] async fn manager_multiple_heights_unordered() { let TestSubscriberChannels { mock_network, subscriber_channels } = @@ -108,32 +125,35 @@ async fn manager_multiple_heights_unordered() { let mut sender = mock_network.broadcasted_messages_sender; // TODO(guyn): refactor this test to pass proposals through the correct channels. - let (_proposal_receiver_sender, mut proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + let (mut proposal_receiver_sender, mut proposal_receiver_receiver) = + mpsc::channel(CHANNEL_SIZE); // Send messages for height 2 followed by those for height 1. - send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await; + send_proposal( + &mut proposal_receiver_sender, + vec![ + ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID)), + ProposalPart::Fin(ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }), + ], + ) + .await; send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; send(&mut sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; - send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await; + send_proposal( + &mut proposal_receiver_sender, + vec![ + ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)), + ProposalPart::Fin(ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }), + ], + ) + .await; send(&mut sender, prevote(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await; send(&mut sender, precommit(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await; let mut context = MockTestContext::new(); // Run the manager for height 1. - context - .expect_validate_proposal() - .return_once(move |_, _, _, _, _| { - let (block_sender, block_receiver) = oneshot::channel(); - block_sender - .send(( - BlockHash(Felt::ONE), - ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }, - )) - .unwrap(); - block_receiver - }) - .times(1); + expect_validate_proposal(&mut context, Felt::ONE); context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); context.expect_proposer().returning(move |_, _| *PROPOSER_ID); context.expect_set_height_and_round().returning(move |_, _| ()); @@ -154,19 +174,7 @@ async fn manager_multiple_heights_unordered() { assert_eq!(decision.block, BlockHash(Felt::ONE)); // Run the manager for height 2. - context - .expect_validate_proposal() - .return_once(move |_, _, _, _, _| { - let (block_sender, block_receiver) = oneshot::channel(); - block_sender - .send(( - BlockHash(Felt::TWO), - ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }, - )) - .unwrap(); - block_receiver - }) - .times(1); + expect_validate_proposal(&mut context, Felt::TWO); let decision = manager .run_height( &mut context, @@ -180,7 +188,6 @@ async fn manager_multiple_heights_unordered() { assert_eq!(decision.block, BlockHash(Felt::TWO)); } -#[ignore] // TODO(guyn): return this once caching proposals is implemented. #[tokio::test] async fn run_consensus_sync() { // Set expectations. @@ -190,13 +197,7 @@ async fn run_consensus_sync() { // TODO(guyn): refactor this test to pass proposals through the correct channels. let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); - context.expect_validate_proposal().return_once(move |_, _, _, _, _| { - let (block_sender, block_receiver) = oneshot::channel(); - block_sender - .send((BlockHash(Felt::TWO), ProposalFin { proposal_content_id: BlockHash(Felt::TWO) })) - .unwrap(); - block_receiver - }); + expect_validate_proposal(&mut context, Felt::TWO); context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); context.expect_proposer().returning(move |_, _| *PROPOSER_ID); context.expect_set_height_and_round().returning(move |_, _| ()); @@ -211,7 +212,7 @@ async fn run_consensus_sync() { // Send messages for height 2. send_proposal( &mut proposal_receiver_sender, - ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID)), + vec![ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))], ) .await; let TestSubscriberChannels { mock_network, subscriber_channels } = @@ -261,13 +262,7 @@ async fn run_consensus_sync_cancellation_safety() { // TODO(guyn): refactor this test to pass proposals through the correct channels. let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); - context.expect_validate_proposal().return_once(move |_, _, _, _, _| { - let (block_sender, block_receiver) = oneshot::channel(); - block_sender - .send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) })) - .unwrap(); - block_receiver - }); + expect_validate_proposal(&mut context, Felt::ONE); context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); context.expect_proposer().returning(move |_, _| *PROPOSER_ID); context.expect_set_height_and_round().returning(move |_, _| ()); @@ -308,7 +303,7 @@ async fn run_consensus_sync_cancellation_safety() { // Send a proposal for height 1. send_proposal( &mut proposal_receiver_sender, - ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)), + vec![ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))], ) .await; proposal_handled_rx.await.unwrap(); @@ -340,7 +335,7 @@ async fn test_timeouts() { send_proposal( &mut proposal_receiver_sender, - ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)), + vec![ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await; @@ -395,7 +390,7 @@ async fn test_timeouts() { // reach a decision. send_proposal( &mut proposal_receiver_sender, - ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID)), + vec![ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *PROPOSER_ID)).await; diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 806909a019..ae5a840cb7 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -3,14 +3,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use mockall::mock; -use papyrus_protobuf::consensus::{ - ConsensusMessage, - Proposal, - ProposalFin, - ProposalInit, - Vote, - VoteType, -}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit, Vote, VoteType}; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; @@ -130,24 +123,6 @@ pub fn precommit( voter, }) } - -pub fn proposal( - block_felt: Felt, - height: u64, - round: u32, - proposer: ValidatorId, -) -> ConsensusMessage { - let block_hash = BlockHash(block_felt); - ConsensusMessage::Proposal(Proposal { - height, - block_hash, - round, - proposer, - transactions: Vec::new(), - valid_round: None, - }) -} - pub fn proposal_init(height: u64, round: u32, proposer: ValidatorId) -> ProposalInit { ProposalInit { height: BlockNumber(height), round, proposer, valid_round: None } } diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 8f5322e6f6..5687a4d9f4 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -521,6 +521,7 @@ async fn stream_validate_proposal( let mut content = Vec::new(); let network_block_id = loop { let Some(prop_part) = content_receiver.next().await else { + // TODO(Asmaa): Tell the batcher to abort. warn!("Failed to receive proposal content: {proposal_id:?}"); return; }; diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index f89ed74604..178081a98b 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -250,15 +250,19 @@ async fn repropose() { // Receive a valid proposal. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); - let prop_part = ProposalPart::Transactions(TransactionBatch { - transactions: vec![generate_invoke_tx()], - tx_hashes: vec![TransactionHash(Felt::TWO)], - }); - content_sender.send(prop_part).await.unwrap(); - let prop_part = ProposalPart::Fin(ProposalFin { - proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0), - }); - content_sender.send(prop_part).await.unwrap(); + content_sender + .send(ProposalPart::Transactions(TransactionBatch { + transactions: vec![generate_invoke_tx()], + tx_hashes: vec![TransactionHash(Felt::TWO)], + })) + .await + .unwrap(); + content_sender + .send(ProposalPart::Fin(ProposalFin { + proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0), + })) + .await + .unwrap(); let fin_receiver = context .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) .await;