From 532206e008d47a702e4a699bcf0c26041cf72cb6 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 23 Apr 2024 02:06:46 +1000 Subject: [PATCH] Fix stuck backfill when scheduled work queue is at capacity (#5575) * Fix stuck backfill and add regression test. * Remove unnecessary `yield_now` * Merge branch 'unstable' into fix-stuck-backfill * Revert previous change and add extra comment. * Merge branch 'unstable' into fix-stuck-backfill * Update tests to use configured event schedule instead of hard coded values. * Merge branch 'unstable' of https://github.com/sigp/lighthouse into fix-stuck-backfill --- beacon_node/beacon_processor/Cargo.toml | 5 +- beacon_node/beacon_processor/src/lib.rs | 2 +- .../src/work_reprocessing_queue.rs | 163 ++++++++++++++---- common/slot_clock/src/manual_slot_clock.rs | 6 + 4 files changed, 142 insertions(+), 34 deletions(-) diff --git a/beacon_node/beacon_processor/Cargo.toml b/beacon_node/beacon_processor/Cargo.toml index 723b09b581c..6c49a28ec87 100644 --- a/beacon_node/beacon_processor/Cargo.toml +++ b/beacon_node/beacon_processor/Cargo.toml @@ -23,4 +23,7 @@ lazy_static = { workspace = true } lighthouse_metrics = { workspace = true } parking_lot = { workspace = true } num_cpus = { workspace = true } -serde = { workspace = true } \ No newline at end of file +serde = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["test-util"] } diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 9b83e9cacbc..fee55b39adc 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -851,7 +851,7 @@ impl BeaconProcessor { ready_work_tx, work_reprocessing_rx, &self.executor, - slot_clock, + Arc::new(slot_clock), self.log.clone(), maximum_gossip_clock_disparity, )?; diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs index c9be28444c8..496fa683d2c 100644 --- a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs @@ -22,6 +22,7 @@ use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::time::Duration; use strum::AsRefStr; @@ -243,7 +244,7 @@ struct ReprocessQueue { attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, next_backfill_batch_event: Option>>, - slot_clock: Pin>, + slot_clock: Arc, } pub type QueuedLightClientUpdateId = usize; @@ -362,7 +363,7 @@ pub fn spawn_reprocess_scheduler( ready_work_tx: Sender, work_reprocessing_rx: Receiver, executor: &TaskExecutor, - slot_clock: S, + slot_clock: Arc, log: Logger, maximum_gossip_clock_disparity: Duration, ) -> Result<(), String> { @@ -370,34 +371,12 @@ pub fn spawn_reprocess_scheduler( if ADDITIONAL_QUEUED_BLOCK_DELAY >= maximum_gossip_clock_disparity { return Err("The block delay and gossip disparity don't match.".to_string()); } - let mut queue = ReprocessQueue { - work_reprocessing_rx, - ready_work_tx, - gossip_block_delay_queue: DelayQueue::new(), - rpc_block_delay_queue: DelayQueue::new(), - attestations_delay_queue: DelayQueue::new(), - lc_updates_delay_queue: DelayQueue::new(), - queued_gossip_block_roots: HashSet::new(), - queued_lc_updates: FnvHashMap::default(), - queued_aggregates: FnvHashMap::default(), - queued_unaggregates: FnvHashMap::default(), - awaiting_attestations_per_root: HashMap::new(), - awaiting_lc_updates_per_parent_root: HashMap::new(), - queued_backfill_batches: Vec::new(), - next_attestation: 0, - next_lc_update: 0, - early_block_debounce: TimeLatch::default(), - rpc_block_debounce: TimeLatch::default(), - attestation_delay_debounce: TimeLatch::default(), - lc_update_delay_debounce: TimeLatch::default(), - next_backfill_batch_event: None, - slot_clock: Box::pin(slot_clock.clone()), - }; + let mut queue = ReprocessQueue::new(ready_work_tx, work_reprocessing_rx, slot_clock); executor.spawn( async move { while let Some(msg) = queue.next().await { - queue.handle_message(msg, &slot_clock, &log); + queue.handle_message(msg, &log); } debug!( @@ -412,7 +391,37 @@ pub fn spawn_reprocess_scheduler( } impl ReprocessQueue { - fn handle_message(&mut self, msg: InboundEvent, slot_clock: &S, log: &Logger) { + fn new( + ready_work_tx: Sender, + work_reprocessing_rx: Receiver, + slot_clock: Arc, + ) -> Self { + ReprocessQueue { + work_reprocessing_rx, + ready_work_tx, + gossip_block_delay_queue: DelayQueue::new(), + rpc_block_delay_queue: DelayQueue::new(), + attestations_delay_queue: DelayQueue::new(), + lc_updates_delay_queue: DelayQueue::new(), + queued_gossip_block_roots: HashSet::new(), + queued_lc_updates: FnvHashMap::default(), + queued_aggregates: FnvHashMap::default(), + queued_unaggregates: FnvHashMap::default(), + awaiting_attestations_per_root: HashMap::new(), + awaiting_lc_updates_per_parent_root: HashMap::new(), + queued_backfill_batches: Vec::new(), + next_attestation: 0, + next_lc_update: 0, + early_block_debounce: TimeLatch::default(), + rpc_block_debounce: TimeLatch::default(), + attestation_delay_debounce: TimeLatch::default(), + lc_update_delay_debounce: TimeLatch::default(), + next_backfill_batch_event: None, + slot_clock, + } + } + + fn handle_message(&mut self, msg: InboundEvent, log: &Logger) { use ReprocessQueueMessage::*; match msg { // Some block has been indicated as "early" and should be processed when the @@ -426,7 +435,7 @@ impl ReprocessQueue { return; } - if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) { + if let Some(duration_till_slot) = self.slot_clock.duration_to_slot(block_slot) { // Check to ensure this won't over-fill the queue. if self.queued_gossip_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS { if self.early_block_debounce.elapsed() { @@ -459,7 +468,7 @@ impl ReprocessQueue { // This logic is slightly awkward since `SlotClock::duration_to_slot` // doesn't distinguish between a slot that has already arrived and an // error reading the slot clock. - if let Some(now) = slot_clock.now() { + if let Some(now) = self.slot_clock.now() { if block_slot <= now && self .ready_work_tx @@ -860,7 +869,8 @@ impl ReprocessQueue { } } InboundEvent::ReadyBackfillSync(queued_backfill_batch) => { - let millis_from_slot_start = slot_clock + let millis_from_slot_start = self + .slot_clock .millis_from_current_slot_start() .map_or("null".to_string(), |duration| { duration.as_millis().to_string() @@ -886,7 +896,12 @@ impl ReprocessQueue { "Failed to send scheduled backfill work"; "info" => "sending work back to queue" ); - self.queued_backfill_batches.insert(0, batch) + self.queued_backfill_batches.insert(0, batch); + + // only recompute if there is no `next_backfill_batch_event` already scheduled + if self.next_backfill_batch_event.is_none() { + self.recompute_next_backfill_batch_event(); + } } // The message was not sent and we didn't get the correct // return result. This is a logic error. @@ -963,7 +978,11 @@ impl ReprocessQueue { #[cfg(test)] mod tests { use super::*; - use slot_clock::TestingSlotClock; + use logging::test_logger; + use slot_clock::{ManualSlotClock, TestingSlotClock}; + use std::ops::Add; + use std::sync::Arc; + use task_executor::test_utils::TestRuntime; #[test] fn backfill_processing_schedule_calculation() { @@ -1002,4 +1021,84 @@ mod tests { duration_to_next_slot + event_times[0] ); } + + // Regression test for issue #5504. + // See: https://github.com/sigp/lighthouse/issues/5504#issuecomment-2050930045 + #[tokio::test] + async fn backfill_schedule_failed_should_reschedule() { + let runtime = TestRuntime::default(); + let log = test_logger(); + let (work_reprocessing_tx, work_reprocessing_rx) = mpsc::channel(1); + let (ready_work_tx, mut ready_work_rx) = mpsc::channel(1); + let slot_duration = 12; + let slot_clock = Arc::new(testing_slot_clock(slot_duration)); + + spawn_reprocess_scheduler( + ready_work_tx.clone(), + work_reprocessing_rx, + &runtime.task_executor, + slot_clock.clone(), + log, + Duration::from_millis(500), + ) + .unwrap(); + + // Pause time so it only advances manually + tokio::time::pause(); + + // Send some random work to `ready_work_tx` to fill up the capacity first. + ready_work_tx + .try_send(ReadyWork::IgnoredRpcBlock(IgnoredRpcBlock { + process_fn: Box::new(|| {}), + })) + .unwrap(); + + // Now queue a backfill sync batch. + work_reprocessing_tx + .try_send(ReprocessQueueMessage::BackfillSync(QueuedBackfillBatch( + Box::pin(async {}), + ))) + .unwrap(); + tokio::task::yield_now().await; + + // Advance the time by more than 1/2 the slot to trigger a scheduled backfill batch to be sent. + // This should fail as the `ready_work` channel is at capacity, and it should be rescheduled. + let duration_to_next_event = + ReprocessQueue::duration_until_next_backfill_batch_event(slot_clock.as_ref()); + let one_ms = Duration::from_millis(1); + advance_time(&slot_clock, duration_to_next_event.add(one_ms)).await; + + // Now drain the `ready_work` channel. + assert!(matches!( + ready_work_rx.try_recv(), + Ok(ReadyWork::IgnoredRpcBlock { .. }) + )); + assert!(ready_work_rx.try_recv().is_err()); + + // Advance time again, and assert that the re-scheduled batch is successfully sent. + let duration_to_next_event = + ReprocessQueue::duration_until_next_backfill_batch_event(slot_clock.as_ref()); + advance_time(&slot_clock, duration_to_next_event.add(one_ms)).await; + assert!(matches!( + ready_work_rx.try_recv(), + Ok(ReadyWork::BackfillSync { .. }) + )); + } + + /// Advances slot clock and test clock time by the same duration. + async fn advance_time(slot_clock: &ManualSlotClock, duration: Duration) { + slot_clock.advance_time(duration); + tokio::time::advance(duration).await; + // NOTE: The `tokio::time::advance` fn actually calls `yield_now()` after advancing the + // clock. Why do we need an extra `yield_now`? + tokio::task::yield_now().await; + } + + fn testing_slot_clock(slot_duration: u64) -> ManualSlotClock { + TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_secs(slot_duration), + ) + } } diff --git a/common/slot_clock/src/manual_slot_clock.rs b/common/slot_clock/src/manual_slot_clock.rs index 7b42fa9062d..1d71533de15 100644 --- a/common/slot_clock/src/manual_slot_clock.rs +++ b/common/slot_clock/src/manual_slot_clock.rs @@ -1,5 +1,6 @@ use super::SlotClock; use parking_lot::RwLock; +use std::ops::Add; use std::sync::Arc; use std::time::Duration; use types::Slot; @@ -41,6 +42,11 @@ impl ManualSlotClock { *self.current_time.write() = duration; } + pub fn advance_time(&self, duration: Duration) { + let current_time = *self.current_time.read(); + *self.current_time.write() = current_time.add(duration); + } + pub fn advance_slot(&self) { self.set_slot(self.now().unwrap().as_u64() + 1) }