Skip to content

Commit

Permalink
feat(consensus): non-blocking proposal handling
Browse files Browse the repository at this point in the history
  • Loading branch information
asmaastarkware committed Oct 30, 2024
1 parent 52a20af commit 13d790c
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 135 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tokio.workspace = true
tracing.workspace = true

[dev-dependencies]
enum-as-inner = "0.6.1"
mockall.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_network_types = { workspace = true, features = ["testing"] }
Expand Down
17 changes: 8 additions & 9 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use starknet_api::core::ContractAddress;
use tracing::{debug, info, instrument};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, ShcTask, SingleHeightConsensus};
use crate::single_height_consensus::{ShcEvent, ShcReturn, ShcTask, SingleHeightConsensus};
use crate::types::{
BroadcastConsensusMessageChannel,
ConsensusContext,
Expand Down Expand Up @@ -125,13 +125,13 @@ impl MultiHeightManager {
validators,
self.timeouts.clone(),
);
let mut shc_tasks = FuturesUnordered::new();
let mut shc_events = FuturesUnordered::new();

match shc.start(context).await? {
ShcReturn::Decision(decision) => return Ok(decision),
ShcReturn::Tasks(tasks) => {
for task in tasks {
shc_tasks.push(create_task_handler(task));
shc_events.push(create_shc_event(task));
}
}
}
Expand All @@ -142,16 +142,16 @@ impl MultiHeightManager {
message = next_message(&mut current_height_messages, broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Some(shc_task) = shc_tasks.next() => {
shc.handle_event(context, shc_task.event).await?
Some(shc_event) = shc_events.next() => {
shc.handle_event(context, shc_event).await?
},
};

match shc_return {
ShcReturn::Decision(decision) => return Ok(decision),
ShcReturn::Tasks(tasks) => {
for task in tasks {
shc_tasks.push(create_task_handler(task));
shc_events.push(create_shc_event(task));
}
}
}
Expand Down Expand Up @@ -279,7 +279,6 @@ where
}
}

async fn create_task_handler(task: ShcTask) -> ShcTask {
tokio::time::sleep(task.duration).await;
task
async fn create_shc_event(task: ShcTask) -> ShcEvent {
task.run().await
}
Loading

0 comments on commit 13d790c

Please sign in to comment.