Skip to content

Commit

Permalink
feat(consensus): add sync receiver to consensus
Browse files Browse the repository at this point in the history
This allows for the block propogation to tell consensus to move on if it's working on an
old height. Special was taken to make this cancel safe.
  • Loading branch information
matan-starkware committed Aug 13, 2024
1 parent 50717da commit 33df538
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 10 deletions.
5 changes: 5 additions & 0 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::channel::mpsc;
use futures::future::BoxFuture;
use futures::FutureExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
Expand Down Expand Up @@ -109,6 +110,8 @@ fn run_consensus(
config.num_validators,
);
let start_height = config.start_height;
// TODO(matan): connect this to an actual channel.
let (_, sync_rx) = mpsc::channel(1);
match config.test {
Some(test_config) => {
let network_receiver = NetworkReceiver::new(
Expand All @@ -124,6 +127,7 @@ fn run_consensus(
validator_id,
config.consensus_delay,
network_receiver,
sync_rx,
)))
}
None => Ok(tokio::spawn(papyrus_consensus::run_consensus(
Expand All @@ -132,6 +136,7 @@ fn run_consensus(
validator_id,
config.consensus_delay,
consensus_channels.broadcasted_messages_receiver,
sync_rx,
))),
}
}
Expand Down
54 changes: 46 additions & 8 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal};
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::{debug, instrument};
use tracing::{debug, info, instrument};

use crate::single_height_consensus::SingleHeightConsensus;
use crate::types::{
Expand All @@ -26,20 +26,22 @@ use crate::types::{
};

// TODO(dvir): add test for this.
#[instrument(skip(context, start_height, network_receiver), level = "info")]
#[instrument(skip(context, start_height, network_receiver, sync_receiver), level = "info")]
#[allow(missing_docs)]
pub async fn run_consensus<BlockT, ContextT, NetworkReceiverT>(
pub async fn run_consensus<BlockT, ContextT, NetworkReceiverT, SyncReceiverT>(
mut context: ContextT,
start_height: BlockNumber,
validator_id: ValidatorId,
consensus_delay: Duration,
mut network_receiver: NetworkReceiverT,
mut sync_receiver: SyncReceiverT,
) -> Result<(), ConsensusError>
where
BlockT: ConsensusBlock,
ContextT: ConsensusContext<Block = BlockT>,
NetworkReceiverT:
Stream<Item = (Result<ConsensusMessage, ProtobufConversionError>, ReportSender)> + Unpin,
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<BlockT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
Expand All @@ -48,11 +50,23 @@ where
let mut current_height = start_height;
let mut manager = MultiHeightManager::new();
loop {
let decision = manager
.run_height(&mut context, current_height, validator_id, &mut network_receiver)
.await?;
context.decision(decision.block, decision.precommits).await?;
current_height = current_height.unchecked_next();
let run_height =
manager.run_height(&mut context, current_height, validator_id, &mut network_receiver);

// `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop
// it. We also cannot restart the height; when we dropped the future we dropped the state it
// built and risk equivocating. Therefore, we must only enter the other select branches if
// we are certain to leave this height.
tokio::select! {
decision = run_height => {
let decision = decision?;
context.decision(decision.block, decision.precommits).await?;
current_height = current_height.unchecked_next();
},
sync_height = sync_height(current_height, &mut sync_receiver) => {
current_height = sync_height?.unchecked_next();
}
}
}
}

Expand Down Expand Up @@ -184,3 +198,27 @@ where
}
}
}

// Return only when a height is reached that is greater than or equal to the current height.
async fn sync_height<SyncReceiverT>(
height: BlockNumber,
mut sync_receiver: SyncReceiverT,
) -> Result<BlockNumber, ConsensusError>
where
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
{
loop {
match sync_receiver.next().await {
Some(sync_height) if sync_height >= height => {
info!("Sync to height: {}. current_height={}", sync_height, height);
return Ok(sync_height);
}
Some(sync_height) => {
debug!("Ignoring sync to height: {}. current_height={}", sync_height, height);
}
None => {
return Err(ConsensusError::SyncError("Sync receiver closed".to_string()));
}
}
}
}
124 changes: 122 additions & 2 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use std::time::Duration;
use std::vec;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::SinkExt;
use lazy_static::lazy_static;
use mockall::mock;
use mockall::predicate::eq;
use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote, VoteType};
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::transaction::Transaction;
use starknet_types_core::felt::Felt;

use super::MultiHeightManager;
use super::{run_consensus, MultiHeightManager};
use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};

lazy_static! {
Expand Down Expand Up @@ -120,7 +122,7 @@ fn precommit(block_hash: Option<BlockHash>, height: u64, voter: ValidatorId) ->
}

#[tokio::test]
async fn run_multiple_heights_unordered() {
async fn manager_multiple_heights_unordered() {
let mut context = MockTestContext::new();

let (mut sender, mut receiver) = mpsc::unbounded();
Expand Down Expand Up @@ -167,3 +169,121 @@ async fn run_multiple_heights_unordered() {
.unwrap();
assert_eq!(decision.block.id(), BlockHash(Felt::TWO));
}

#[tokio::test]
async fn run_consensus_sync() {
// Set expectations.
let mut context = MockTestContext::new();
let (decision_tx, decision_rx) = oneshot::channel();

context.expect_validate_proposal().return_once(move |_, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::TWO) }).unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_broadcast().returning(move |_| Ok(()));
context.expect_decision().return_once(move |block, votes| {
assert_eq!(block.id(), BlockHash(Felt::TWO));
assert_eq!(votes[0].height, 2);
decision_tx.send(()).unwrap();
Ok(())
});

// Send messages for height 2.
let (mut network_sender, mut network_receiver) = mpsc::unbounded();
send(&mut network_sender, proposal(BlockHash(Felt::TWO), 2)).await;
send(&mut network_sender, prevote(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await;
send(&mut network_sender, precommit(Some(BlockHash(Felt::TWO)), 2, *PROPOSER_ID)).await;

// Start at height 1.
let (mut sync_sender, mut sync_receiver) = mpsc::unbounded();
let consensus_handle = tokio::spawn(async move {
run_consensus(
context,
BlockNumber(1),
*VALIDATOR_ID,
Duration::ZERO,
&mut network_receiver,
&mut sync_receiver,
)
.await
});

// Send sync for height 1.
sync_sender.send(BlockNumber(1)).await.unwrap();
// Make sure the sync is processed before the upcoming messages.
tokio::time::sleep(Duration::from_millis(100)).await;

// Decision for height 2.
decision_rx.await.unwrap();

// Drop the sender to close consensus and gracefully shut down.
drop(sync_sender);
assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_))));
}

// Check for cancellation safety when ignoring old heights. If the current height check was done
// within the select branch this test would hang.
#[tokio::test]
async fn run_consensus_sync_cancellation_safety() {
let mut context = MockTestContext::new();
let (proposal_handled_tx, proposal_handled_rx) = oneshot::channel();
let (decision_tx, decision_rx) = oneshot::channel();

context.expect_validate_proposal().return_once(move |_, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::ONE) }).unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context
.expect_broadcast()
.with(eq(prevote(Some(BlockHash(Felt::ONE)), 1, *VALIDATOR_ID)))
.return_once(move |_| {
proposal_handled_tx.send(()).unwrap();
Ok(())
});
context.expect_broadcast().returning(move |_| Ok(()));
context.expect_decision().return_once(|block, votes| {
assert_eq!(block.id(), BlockHash(Felt::ONE));
assert_eq!(votes[0].height, 1);
decision_tx.send(()).unwrap();
Ok(())
});

let (mut network_sender, mut network_receiver) = mpsc::unbounded();
let (mut sync_sender, mut sync_receiver) = mpsc::unbounded();

let consensus_handle = tokio::spawn(async move {
run_consensus(
context,
BlockNumber(1),
*VALIDATOR_ID,
Duration::ZERO,
&mut network_receiver,
&mut sync_receiver,
)
.await
});

// Send a proposal for height 1.
send(&mut network_sender, proposal(BlockHash(Felt::ONE), 1)).await;
proposal_handled_rx.await.unwrap();

// Send an old sync. This should not cancel the current height.
sync_sender.send(BlockNumber(0)).await.unwrap();
// Make sure the sync is processed before the upcoming messages.
tokio::time::sleep(Duration::from_millis(100)).await;

// Finished messages for 1
send(&mut network_sender, prevote(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await;
send(&mut network_sender, precommit(Some(BlockHash(Felt::ONE)), 1, *PROPOSER_ID)).await;
decision_rx.await.unwrap();

// Drop the sender to close consensus and gracefully shut down.
drop(sync_sender);
assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_))));
}
2 changes: 2 additions & 0 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,6 @@ pub enum ConsensusError {
// As opposed to an error between this node and peer nodes.
#[error("{0}")]
InternalNetworkError(String),
#[error("{0}")]
SyncError(String),
}

0 comments on commit 33df538

Please sign in to comment.