Skip to content

Commit

Permalink
Fix stuck backfill when scheduled work queue is at capacity (#5575)
Browse files Browse the repository at this point in the history
* Fix stuck backfill and add regression test.

* Remove unnecessary `yield_now`

* Merge branch 'unstable' into fix-stuck-backfill

* Revert previous change and add extra comment.

* Merge branch 'unstable' into fix-stuck-backfill

* Update tests to use configured event schedule instead of hard coded values.

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into fix-stuck-backfill
  • Loading branch information
jimmygchen authored Apr 22, 2024
1 parent f7aca97 commit 532206e
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 34 deletions.
5 changes: 4 additions & 1 deletion beacon_node/beacon_processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ lazy_static = { workspace = true }
lighthouse_metrics = { workspace = true }
parking_lot = { workspace = true }
num_cpus = { workspace = true }
serde = { workspace = true }
serde = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }
2 changes: 1 addition & 1 deletion beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
ready_work_tx,
work_reprocessing_rx,
&self.executor,
slot_clock,
Arc::new(slot_clock),
self.log.clone(),
maximum_gossip_clock_disparity,
)?;
Expand Down
163 changes: 131 additions & 32 deletions beacon_node/beacon_processor/src/work_reprocessing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::time::Duration;
use strum::AsRefStr;
Expand Down Expand Up @@ -243,7 +244,7 @@ struct ReprocessQueue<S> {
attestation_delay_debounce: TimeLatch,
lc_update_delay_debounce: TimeLatch,
next_backfill_batch_event: Option<Pin<Box<tokio::time::Sleep>>>,
slot_clock: Pin<Box<S>>,
slot_clock: Arc<S>,
}

pub type QueuedLightClientUpdateId = usize;
Expand Down Expand Up @@ -362,42 +363,20 @@ pub fn spawn_reprocess_scheduler<S: SlotClock + 'static>(
ready_work_tx: Sender<ReadyWork>,
work_reprocessing_rx: Receiver<ReprocessQueueMessage>,
executor: &TaskExecutor,
slot_clock: S,
slot_clock: Arc<S>,
log: Logger,
maximum_gossip_clock_disparity: Duration,
) -> Result<(), String> {
// Sanity check
if ADDITIONAL_QUEUED_BLOCK_DELAY >= maximum_gossip_clock_disparity {
return Err("The block delay and gossip disparity don't match.".to_string());
}
let mut queue = ReprocessQueue {
work_reprocessing_rx,
ready_work_tx,
gossip_block_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(),
awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(),
next_attestation: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
rpc_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(),
lc_update_delay_debounce: TimeLatch::default(),
next_backfill_batch_event: None,
slot_clock: Box::pin(slot_clock.clone()),
};
let mut queue = ReprocessQueue::new(ready_work_tx, work_reprocessing_rx, slot_clock);

executor.spawn(
async move {
while let Some(msg) = queue.next().await {
queue.handle_message(msg, &slot_clock, &log);
queue.handle_message(msg, &log);
}

debug!(
Expand All @@ -412,7 +391,37 @@ pub fn spawn_reprocess_scheduler<S: SlotClock + 'static>(
}

impl<S: SlotClock> ReprocessQueue<S> {
fn handle_message(&mut self, msg: InboundEvent, slot_clock: &S, log: &Logger) {
fn new(
ready_work_tx: Sender<ReadyWork>,
work_reprocessing_rx: Receiver<ReprocessQueueMessage>,
slot_clock: Arc<S>,
) -> Self {
ReprocessQueue {
work_reprocessing_rx,
ready_work_tx,
gossip_block_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(),
awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(),
next_attestation: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
rpc_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(),
lc_update_delay_debounce: TimeLatch::default(),
next_backfill_batch_event: None,
slot_clock,
}
}

fn handle_message(&mut self, msg: InboundEvent, log: &Logger) {
use ReprocessQueueMessage::*;
match msg {
// Some block has been indicated as "early" and should be processed when the
Expand All @@ -426,7 +435,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
return;
}

if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) {
if let Some(duration_till_slot) = self.slot_clock.duration_to_slot(block_slot) {
// Check to ensure this won't over-fill the queue.
if self.queued_gossip_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS {
if self.early_block_debounce.elapsed() {
Expand Down Expand Up @@ -459,7 +468,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
// This logic is slightly awkward since `SlotClock::duration_to_slot`
// doesn't distinguish between a slot that has already arrived and an
// error reading the slot clock.
if let Some(now) = slot_clock.now() {
if let Some(now) = self.slot_clock.now() {
if block_slot <= now
&& self
.ready_work_tx
Expand Down Expand Up @@ -860,7 +869,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
InboundEvent::ReadyBackfillSync(queued_backfill_batch) => {
let millis_from_slot_start = slot_clock
let millis_from_slot_start = self
.slot_clock
.millis_from_current_slot_start()
.map_or("null".to_string(), |duration| {
duration.as_millis().to_string()
Expand All @@ -886,7 +896,12 @@ impl<S: SlotClock> ReprocessQueue<S> {
"Failed to send scheduled backfill work";
"info" => "sending work back to queue"
);
self.queued_backfill_batches.insert(0, batch)
self.queued_backfill_batches.insert(0, batch);

// only recompute if there is no `next_backfill_batch_event` already scheduled
if self.next_backfill_batch_event.is_none() {
self.recompute_next_backfill_batch_event();
}
}
// The message was not sent and we didn't get the correct
// return result. This is a logic error.
Expand Down Expand Up @@ -963,7 +978,11 @@ impl<S: SlotClock> ReprocessQueue<S> {
#[cfg(test)]
mod tests {
use super::*;
use slot_clock::TestingSlotClock;
use logging::test_logger;
use slot_clock::{ManualSlotClock, TestingSlotClock};
use std::ops::Add;
use std::sync::Arc;
use task_executor::test_utils::TestRuntime;

#[test]
fn backfill_processing_schedule_calculation() {
Expand Down Expand Up @@ -1002,4 +1021,84 @@ mod tests {
duration_to_next_slot + event_times[0]
);
}

// Regression test for issue #5504.
// See: https://github.com/sigp/lighthouse/issues/5504#issuecomment-2050930045
#[tokio::test]
async fn backfill_schedule_failed_should_reschedule() {
let runtime = TestRuntime::default();
let log = test_logger();
let (work_reprocessing_tx, work_reprocessing_rx) = mpsc::channel(1);
let (ready_work_tx, mut ready_work_rx) = mpsc::channel(1);
let slot_duration = 12;
let slot_clock = Arc::new(testing_slot_clock(slot_duration));

spawn_reprocess_scheduler(
ready_work_tx.clone(),
work_reprocessing_rx,
&runtime.task_executor,
slot_clock.clone(),
log,
Duration::from_millis(500),
)
.unwrap();

// Pause time so it only advances manually
tokio::time::pause();

// Send some random work to `ready_work_tx` to fill up the capacity first.
ready_work_tx
.try_send(ReadyWork::IgnoredRpcBlock(IgnoredRpcBlock {
process_fn: Box::new(|| {}),
}))
.unwrap();

// Now queue a backfill sync batch.
work_reprocessing_tx
.try_send(ReprocessQueueMessage::BackfillSync(QueuedBackfillBatch(
Box::pin(async {}),
)))
.unwrap();
tokio::task::yield_now().await;

// Advance the time by more than 1/2 the slot to trigger a scheduled backfill batch to be sent.
// This should fail as the `ready_work` channel is at capacity, and it should be rescheduled.
let duration_to_next_event =
ReprocessQueue::duration_until_next_backfill_batch_event(slot_clock.as_ref());
let one_ms = Duration::from_millis(1);
advance_time(&slot_clock, duration_to_next_event.add(one_ms)).await;

// Now drain the `ready_work` channel.
assert!(matches!(
ready_work_rx.try_recv(),
Ok(ReadyWork::IgnoredRpcBlock { .. })
));
assert!(ready_work_rx.try_recv().is_err());

// Advance time again, and assert that the re-scheduled batch is successfully sent.
let duration_to_next_event =
ReprocessQueue::duration_until_next_backfill_batch_event(slot_clock.as_ref());
advance_time(&slot_clock, duration_to_next_event.add(one_ms)).await;
assert!(matches!(
ready_work_rx.try_recv(),
Ok(ReadyWork::BackfillSync { .. })
));
}

/// Advances slot clock and test clock time by the same duration.
async fn advance_time(slot_clock: &ManualSlotClock, duration: Duration) {
slot_clock.advance_time(duration);
tokio::time::advance(duration).await;
// NOTE: The `tokio::time::advance` fn actually calls `yield_now()` after advancing the
// clock. Why do we need an extra `yield_now`?
tokio::task::yield_now().await;
}

fn testing_slot_clock(slot_duration: u64) -> ManualSlotClock {
TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(0),
Duration::from_secs(slot_duration),
)
}
}
6 changes: 6 additions & 0 deletions common/slot_clock/src/manual_slot_clock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::SlotClock;
use parking_lot::RwLock;
use std::ops::Add;
use std::sync::Arc;
use std::time::Duration;
use types::Slot;
Expand Down Expand Up @@ -41,6 +42,11 @@ impl ManualSlotClock {
*self.current_time.write() = duration;
}

pub fn advance_time(&self, duration: Duration) {
let current_time = *self.current_time.read();
*self.current_time.write() = current_time.add(duration);
}

pub fn advance_slot(&self) {
self.set_slot(self.now().unwrap().as_u64() + 1)
}
Expand Down

0 comments on commit 532206e

Please sign in to comment.