Skip to content

Commit

Permalink
feat(consensus): handle timeouts in shc and manager
Browse files Browse the repository at this point in the history
  • Loading branch information
asmaastarkware committed Aug 21, 2024
1 parent 8299959 commit df519dc
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 97 deletions.
2 changes: 2 additions & 0 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ fn run_consensus(
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_receiver,
sync_receiver,
)))
Expand All @@ -149,6 +150,7 @@ fn run_consensus(
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_channels.broadcasted_messages_receiver,
futures::stream::pending(),
)))
Expand Down
132 changes: 96 additions & 36 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::collections::BTreeMap;
use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt};
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::ReportSender;
Expand All @@ -16,7 +17,8 @@ use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::{debug, info, instrument};

use crate::single_height_consensus::SingleHeightConsensus;
use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, ShcTask, SingleHeightConsensus};
use crate::types::{
ConsensusBlock,
ConsensusContext,
Expand All @@ -27,13 +29,14 @@ use crate::types::{
};

// TODO(dvir): add test for this.
#[instrument(skip(context, start_height, network_receiver, sync_receiver), level = "info")]
#[instrument(skip_all, level = "info")]
#[allow(missing_docs)]
pub async fn run_consensus<BlockT, ContextT, NetworkReceiverT, SyncReceiverT>(
mut context: ContextT,
start_height: BlockNumber,
validator_id: ValidatorId,
consensus_delay: Duration,
timeouts: TimeoutsConfig,
mut network_receiver: NetworkReceiverT,
mut sync_receiver: SyncReceiverT,
) -> Result<(), ConsensusError>
Expand All @@ -46,17 +49,22 @@ where
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<BlockT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
info!("Running consensus");
info!(
"Running consensus, start_height={}, validator_id={}, consensus_delay={}, timeouts={:?}",
start_height,
validator_id,
consensus_delay.as_secs(),
timeouts
);

// Add a short delay to allow peers to connect and avoid "InsufficientPeers" error
tokio::time::sleep(consensus_delay).await;
let mut current_height = start_height;
let mut manager = MultiHeightManager::new();
let mut manager = MultiHeightManager::new(validator_id, timeouts);
loop {
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64);

let run_height =
manager.run_height(&mut context, current_height, validator_id, &mut network_receiver);
let run_height = manager.run_height(&mut context, current_height, &mut network_receiver);

// `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop
// it. We also cannot restart the height; when we dropped the future we dropped the state it
Expand Down Expand Up @@ -85,25 +93,26 @@ pub struct ProposalWrapper(pub Proposal);
/// part of the single height consensus algorithm (e.g. messages from future heights).
#[derive(Debug, Default)]
struct MultiHeightManager {
validator_id: ValidatorId,
cached_messages: BTreeMap<u64, Vec<ConsensusMessage>>,
timeouts: TimeoutsConfig,
}

impl MultiHeightManager {
/// Create a new consensus manager.
pub fn new() -> Self {
Self { cached_messages: BTreeMap::new() }
pub fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self {
Self { validator_id, cached_messages: BTreeMap::new(), timeouts }
}

/// Run the consensus algorithm for a single height.
///
/// Assumes that `height` is monotonically increasing across calls for the sake of filtering
/// `cached_messaged`.
#[instrument(skip(self, context, validator_id, network_receiver), level = "info")]
#[instrument(skip(self, context, network_receiver), level = "info")]
pub async fn run_height<BlockT, ContextT, NetworkReceiverT>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
validator_id: ValidatorId,
network_receiver: &mut NetworkReceiverT,
) -> Result<Decision<BlockT>, ConsensusError>
where
Expand All @@ -118,40 +127,86 @@ impl MultiHeightManager {
)>,
{
let validators = context.validators(height).await;
let mut shc = SingleHeightConsensus::new(height, validator_id, validators);
let mut shc = SingleHeightConsensus::new(
height,
self.validator_id,
validators,
self.timeouts.clone(),
);
let mut shc_tasks = FuturesUnordered::new();

if let Some(decision) = shc.start(context).await? {
return Ok(decision);
match shc.start(context).await? {
ShcReturn::Decision(decision) => return Ok(decision),
ShcReturn::Tasks(tasks) => {
for task in tasks {
shc_tasks.push(create_task_handler(task));
}
}
}

let mut current_height_messages = self.get_current_height_messages(height);
loop {
let message = next_message(&mut current_height_messages, network_receiver).await?;
// TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
// In general I think we will want to only cache (H+1, 0) messages.
if message.height() != height.0 {
debug!("Received a message for a different height. {:?}", message);
if message.height() > height.0 {
self.cached_messages.entry(message.height()).or_default().push(message);
}
continue;
}
let shc_return = tokio::select! {
message = next_message(&mut current_height_messages, network_receiver) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Some(shc_task) = shc_tasks.next() => {
shc.handle_task(context, shc_task).await?
},
};

let maybe_decision = match message {
ConsensusMessage::Proposal(proposal) => {
// Special case due to fake streaming.
let (proposal_init, content_receiver, fin_receiver) =
ProposalWrapper(proposal).into();
shc.handle_proposal(context, proposal_init, content_receiver, fin_receiver)
.await?
match shc_return {
ShcReturn::Decision(decision) => return Ok(decision),
ShcReturn::Tasks(tasks) => {
for task in tasks {
shc_tasks.push(create_task_handler(task));
}
}
_ => shc.handle_message(context, message).await?,
};
}
}
}

if let Some(decision) = maybe_decision {
return Ok(decision);
// Handle a single consensus message.
async fn handle_message<BlockT, ContextT>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus<BlockT>,
message: ConsensusMessage,
) -> Result<ShcReturn<BlockT>, ConsensusError>
where
BlockT: ConsensusBlock,
ContextT: ConsensusContext<Block = BlockT>,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<BlockT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
{
// TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
// In general I think we will want to only cache (H+1, 0) messages.
if message.height() != height.0 {
debug!("Received a message for a different height. {:?}", message);
if message.height() > height.0 {
self.cached_messages.entry(message.height()).or_default().push(message);
}
return Ok(ShcReturn::Tasks(vec![]));
}
match message {
ConsensusMessage::Proposal(proposal) => {
// Special case due to fake streaming.
let (proposal_init, content_receiver, fin_receiver) =
ProposalWrapper(proposal).into();
let res = shc
.handle_proposal(context, proposal_init, content_receiver, fin_receiver)
.await?;
Ok(res)
}
_ => {
let res = shc.handle_message(context, message).await?;
Ok(res)
}
}
}
Expand Down Expand Up @@ -228,3 +283,8 @@ where
}
}
}

async fn create_task_handler(task: ShcTask) -> ShcTask {
tokio::time::sleep(task.duration).await;
task
}
78 changes: 65 additions & 13 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use starknet_api::transaction::Transaction;
use starknet_types_core::felt::Felt;

use super::{run_consensus, MultiHeightManager};
use crate::config::TimeoutsConfig;
use crate::test_utils::{precommit, prevote, proposal};
use crate::types::{
ConsensusBlock,
Expand All @@ -26,8 +27,11 @@ use crate::types::{
};

lazy_static! {
static ref VALIDATOR_ID: ValidatorId = 1_u32.into();
static ref PROPOSER_ID: ValidatorId = 0_u32.into();
static ref VALIDATOR_ID: ValidatorId = 1_u32.into();
static ref VALIDATOR_ID_2: ValidatorId = 2_u32.into();
static ref VALIDATOR_ID_3: ValidatorId = 3_u32.into();
static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default();
}

// TODO(matan): Switch to using TestBlock & MockTestContext in `test_utils` once streaming is
Expand Down Expand Up @@ -102,8 +106,6 @@ async fn send(sender: &mut Sender, msg: ConsensusMessage) {

#[tokio::test]
async fn manager_multiple_heights_unordered() {
let mut context = MockTestContext::new();

let (mut sender, mut receiver) = mpsc::unbounded();
// Send messages for height 2 followed by those for height 1.
send(&mut sender, proposal(BlockHash(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
Expand All @@ -113,8 +115,7 @@ async fn manager_multiple_heights_unordered() {
send(&mut sender, prevote(Some(BlockHash(Felt::ONE)), 1, 0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(BlockHash(Felt::ONE)), 1, 0, *PROPOSER_ID)).await;

let mut manager = MultiHeightManager::new();

let mut context = MockTestContext::new();
// Run the manager for height 1.
context
.expect_validate_proposal()
Expand All @@ -127,10 +128,9 @@ async fn manager_multiple_heights_unordered() {
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_broadcast().returning(move |_| Ok(()));
let decision = manager
.run_height(&mut context, BlockNumber(1), *VALIDATOR_ID, &mut receiver)
.await
.unwrap();

let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone());
let decision = manager.run_height(&mut context, BlockNumber(1), &mut receiver).await.unwrap();
assert_eq!(decision.block.id(), BlockHash(Felt::ONE));

// Run the manager for height 2.
Expand All @@ -142,10 +142,7 @@ async fn manager_multiple_heights_unordered() {
block_receiver
})
.times(1);
let decision = manager
.run_height(&mut context, BlockNumber(2), *VALIDATOR_ID, &mut receiver)
.await
.unwrap();
let decision = manager.run_height(&mut context, BlockNumber(2), &mut receiver).await.unwrap();
assert_eq!(decision.block.id(), BlockHash(Felt::TWO));
}

Expand Down Expand Up @@ -184,6 +181,7 @@ async fn run_consensus_sync() {
BlockNumber(1),
*VALIDATOR_ID,
Duration::ZERO,
TIMEOUTS.clone(),
&mut network_receiver,
&mut sync_receiver,
)
Expand Down Expand Up @@ -242,6 +240,7 @@ async fn run_consensus_sync_cancellation_safety() {
BlockNumber(1),
*VALIDATOR_ID,
Duration::ZERO,
TIMEOUTS.clone(),
&mut network_receiver,
&mut sync_receiver,
)
Expand All @@ -266,3 +265,56 @@ async fn run_consensus_sync_cancellation_safety() {
drop(sync_sender);
assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_))));
}

#[tokio::test]
async fn test_timeouts() {
let (mut sender, mut receiver) = mpsc::unbounded();
send(&mut sender, proposal(BlockHash(Felt::ONE), 1, 0, *PROPOSER_ID)).await;
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await;
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_3)).await;
send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_2)).await;
send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_3)).await;

let mut context = MockTestContext::new();
context.expect_validate_proposal().returning(move |_, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(TestBlock { content: vec![], id: BlockHash(Felt::ONE) }).unwrap();
block_receiver
});
context
.expect_validators()
.returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID, *VALIDATOR_ID_2, *VALIDATOR_ID_3]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);

let (timeout_send, timeout_receive) = oneshot::channel();
// Node handled Timeout events and responded with NIL vote.
context
.expect_broadcast()
.times(1)
.withf(move |msg: &ConsensusMessage| msg == &prevote(None, 1, 1, *VALIDATOR_ID))
.return_once(move |_| {
timeout_send.send(()).unwrap();
Ok(())
});
context.expect_broadcast().returning(move |_| Ok(()));

let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone());
let manager_handle = tokio::spawn(async move {
let decision =
manager.run_height(&mut context, BlockNumber(1), &mut receiver).await.unwrap();
assert_eq!(decision.block.id(), BlockHash(Felt::ONE));
});

// Wait for the timeout to be triggered.
timeout_receive.await.unwrap();
// Show that after the timeout is triggered we can still precommit in favor of the block and
// reach a decision.
send(&mut sender, proposal(BlockHash(Felt::ONE), 1, 1, *PROPOSER_ID)).await;
send(&mut sender, prevote(Some(BlockHash(Felt::ONE)), 1, 1, *PROPOSER_ID)).await;
send(&mut sender, prevote(Some(BlockHash(Felt::ONE)), 1, 1, *VALIDATOR_ID_2)).await;
send(&mut sender, prevote(Some(BlockHash(Felt::ONE)), 1, 1, *VALIDATOR_ID_3)).await;
send(&mut sender, precommit(Some(BlockHash(Felt::ONE)), 1, 1, *VALIDATOR_ID_2)).await;
send(&mut sender, precommit(Some(BlockHash(Felt::ONE)), 1, 1, *VALIDATOR_ID_3)).await;

manager_handle.await.unwrap();
}
Loading

0 comments on commit df519dc

Please sign in to comment.