diff --git a/Cargo.lock b/Cargo.lock index 9a27bff80a..09db8a662d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7405,6 +7405,7 @@ dependencies = [ "starknet_batcher_types", "test-case", "tokio", + "tokio-util", "tracing", ] @@ -11376,9 +11377,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.12" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" dependencies = [ "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index e962c10d0a..a51d19f4d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -254,6 +254,7 @@ tokio = "1.37.0" tokio-retry = "0.3" tokio-stream = "0.1.8" tokio-test = "0.4.4" +tokio-util = "0.7.13" toml = "0.8" tower = "0.4.13" tracing = "0.1.37" diff --git a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml index 6fb31cdd99..172f59af14 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml +++ b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml @@ -18,6 +18,7 @@ starknet-types-core.workspace = true starknet_api.workspace = true starknet_batcher_types = { workspace = true, features = ["testing"] } tokio = { workspace = true, features = ["full"] } +tokio-util.workspace = true tracing.workspace = true [dev-dependencies] diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 733acf0e8f..d3b5b5f740 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -54,8 +54,8 @@ use starknet_batcher_types::batcher_types::{ ValidateBlockInput, }; use starknet_batcher_types::communication::BatcherClient; -use tokio::sync::Notify; use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, info, trace, warn, Instrument}; // TODO(Dan, Matan): Remove this once and replace with real gas prices. @@ -104,7 +104,7 @@ pub struct SequencerConsensusContext { // Building proposals are not tracked as active, as consensus can't move on to the next // height/round until building is done. Context only works on proposals for the // current round. - active_proposal: Option<(Arc, JoinHandle<()>)>, + active_proposal: Option<(CancellationToken, JoinHandle<()>)>, // Stores proposals for future rounds until the round is reached. queued_proposals: BTreeMap)>, @@ -399,15 +399,15 @@ impl SequencerConsensusContext { }; batcher.validate_block(input).await.expect("Failed to initiate proposal validation"); - let notify = Arc::new(Notify::new()); - let notify_clone = Arc::clone(¬ify); + let token = CancellationToken::new(); + let token_clone = token.clone(); let chain_id = self.chain_id.clone(); let mut content = Vec::new(); let handle = tokio::spawn(async move { let (built_block, received_fin) = loop { tokio::select! { - _ = notify_clone.notified() => { + _ = token_clone.cancelled() => { warn!("Proposal interrupted: {:?}", proposal_id); batcher_abort_proposal(batcher.as_ref(), proposal_id).await; return; @@ -443,12 +443,12 @@ impl SequencerConsensusContext { warn!("Failed to send proposal content ids"); } }); - self.active_proposal = Some((notify, handle)); + self.active_proposal = Some((token, handle)); } fn interrupt_active_proposal(&self) { - if let Some((notify, _)) = &self.active_proposal { - notify.notify_one(); + if let Some((token, _)) = &self.active_proposal { + token.cancel(); } } }