Skip to content

Commit

Permalink
Only wait for recent newPayload calls (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul authored Sep 1, 2023
1 parent fbd3e1a commit 461497d
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 18 deletions.
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ pub struct Config {
/// the behaviour of a full execution engine.
#[arg(long, value_name = "MILLIS", default_value = "2000")]
pub new_payload_wait_millis: u128,
/// Maximum age of a payload that will trigger a wait on `newPayload`
///
/// Payloads older than this age receive an instant SYNCING response. See docs for
/// `--new-payload-wait-millis` for the purpose of this wait.
#[arg(long, value_name = "NUM_BLOCKS", default_value = "64")]
pub new_payload_wait_cutoff: u64,
/// Maximum time that a consensus node should wait for a forkchoiceUpdated response from the
/// cache.
///
Expand Down
7 changes: 6 additions & 1 deletion src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::sync::Mutex;
pub struct Multiplexer<E: EthSpec> {
pub engine: Engine,
pub fcu_cache: Mutex<LruCache<JsonForkchoiceStateV1, JsonPayloadStatusV1>>,
pub new_payload_cache: Mutex<LruCache<ExecutionBlockHash, JsonPayloadStatusV1>>,
pub new_payload_cache: Mutex<LruCache<ExecutionBlockHash, NewPayloadCacheEntry>>,
pub justified_block_cache: Mutex<LruCache<ExecutionBlockHash, ()>>,
pub finalized_block_cache: Mutex<LruCache<ExecutionBlockHash, ()>>,
pub payload_builder: Mutex<PayloadBuilder<E>>,
Expand All @@ -30,6 +30,11 @@ pub struct Multiplexer<E: EthSpec> {
_phantom: PhantomData<E>,
}

pub struct NewPayloadCacheEntry {
pub status: JsonPayloadStatusV1,
pub block_number: u64,
}

impl<E: EthSpec> Multiplexer<E> {
pub fn new(config: Config, executor: TaskExecutor, log: Logger) -> Result<Self, String> {
let engine: Engine = {
Expand Down
67 changes: 50 additions & 17 deletions src/new_payload.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Handler for new payload.
use crate::{
multiplexer::Multiplexer,
multiplexer::{Multiplexer, NewPayloadCacheEntry},
types::{
ErrorResponse, JsonExecutionPayload, JsonPayloadStatusV1, JsonPayloadStatusV1Status,
JsonValue, QuantityU64, Request, Response,
Expand All @@ -20,6 +20,7 @@ impl<E: EthSpec> Multiplexer<E> {

// TODO: verify block hash
let block_hash = *json_execution_payload.block_hash();
let block_number = *json_execution_payload.block_number();

let status = if let Some(status) = self.get_cached_payload_status(&block_hash, true).await {
status
Expand All @@ -31,10 +32,13 @@ impl<E: EthSpec> Multiplexer<E> {
let json_status = JsonPayloadStatusV1::from(status);

// Update newPayload cache.
self.new_payload_cache
.lock()
.await
.put(block_hash, json_status.clone());
self.new_payload_cache.lock().await.put(
block_hash,
NewPayloadCacheEntry {
status: json_status.clone(),
block_number,
},
);

// Update payload builder.
self.register_canonical_payload(&execution_payload, json_status.status)
Expand Down Expand Up @@ -63,15 +67,19 @@ impl<E: EthSpec> Multiplexer<E> {

// TODO: verify block hash
let block_hash = execution_payload.block_hash();

// Wait a short time for a definite response from the EL. Chances are it's busy processing
// the payload sent by the controlling BN.
let start = Instant::now();
while start.elapsed().as_millis() < self.config.new_payload_wait_millis {
if let Some(status) = self.get_cached_payload_status(block_hash, true).await {
return Response::new(id, status);
let block_number = *execution_payload.block_number();

// If this is a *recent* payload, wait a short time for a definite response from the EL.
// Chances are it's busy processing the payload sent by the controlling BN.
let is_recent = self.is_recent_payload(block_number).await;
if is_recent {
let start = Instant::now();
while start.elapsed().as_millis() < self.config.new_payload_wait_millis {
if let Some(status) = self.get_cached_payload_status(block_hash, true).await {
return Response::new(id, status);
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}

// Try again to get any status from the cache, or fall back on a SYNCING response.
Expand All @@ -81,7 +89,11 @@ impl<E: EthSpec> Multiplexer<E> {
}
status
} else {
tracing::info!("sending SYNCING response on newPayload");
if is_recent {
tracing::info!("sending SYNCING response on recent newPayload");
} else {
tracing::info!("sending instant SYNCING response for old newPayload");
}
// Synthetic syncing response.
JsonPayloadStatusV1 {
status: JsonPayloadStatusV1Status::Syncing,
Expand Down Expand Up @@ -145,11 +157,32 @@ impl<E: EthSpec> Multiplexer<E> {
definite_only: bool,
) -> Option<JsonPayloadStatusV1> {
let mut cache = self.new_payload_cache.lock().await;
if let Some(existing_status) = cache.get(execution_block_hash) {
if !definite_only || Self::is_definite(existing_status) {
return Some(existing_status.clone());
if let Some(existing) = cache.get(execution_block_hash) {
if !definite_only || Self::is_definite(&existing.status) {
return Some(existing.status.clone());
}
}
None
}

/// Return the highest `block_number` of any cached payload, or 0 if none is cached.
///
/// This is useful for approximately time-based cutoffs & heuristics.
pub async fn highest_cached_payload_number(&self) -> u64 {
let cache = self.new_payload_cache.lock().await;
cache
.iter()
.map(|(_, entry)| entry.block_number)
.max()
.unwrap_or(0)
}

/// Check if the given block number is recent based on the `highest_cached_payload_number`.
pub async fn is_recent_payload(&self, block_number: u64) -> bool {
let cutoff = self
.highest_cached_payload_number()
.await
.saturating_sub(self.config.new_payload_wait_cutoff);
block_number >= cutoff
}
}

0 comments on commit 461497d

Please sign in to comment.