Skip to content

Commit

Permalink
Merge branch 'unstable' of https://github.com/sigp/lighthouse into up…
Browse files Browse the repository at this point in the history
…date-simulator
  • Loading branch information
realbigsean committed Apr 18, 2024
2 parents 3284c1e + 5c30afb commit 79cde4a
Show file tree
Hide file tree
Showing 41 changed files with 1,170 additions and 1,363 deletions.
1 change: 0 additions & 1 deletion account_manager/src/validator/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
"The path where the validator keystore passwords will be stored. \
Defaults to ~/.lighthouse/{network}/secrets",
)
.conflicts_with("datadir")
.takes_value(true),
)
.arg(
Expand Down
89 changes: 56 additions & 33 deletions beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes};
use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1};
use slog::{crit, debug, Logger};
use slog::{crit, debug, error, Logger};
use std::collections::HashMap;
use std::sync::Arc;
use store::{DatabaseBlock, ExecutionPayloadDeneb};
use task_executor::TaskExecutor;
use tokio::sync::{
mpsc::{self, UnboundedSender},
RwLock,
Expand Down Expand Up @@ -395,18 +394,18 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
pub fn new(
beacon_chain: &Arc<BeaconChain<T>>,
check_caches: CheckCaches,
) -> Result<Self, BeaconChainError> {
) -> Result<Arc<Self>, BeaconChainError> {
let execution_layer = beacon_chain
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)?
.clone();

Ok(Self {
Ok(Arc::new(Self {
execution_layer,
check_caches,
beacon_chain: beacon_chain.clone(),
})
}))
}

fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
Expand All @@ -425,30 +424,44 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}
}

fn load_payloads(&self, block_roots: Vec<Hash256>) -> Vec<(Hash256, LoadResult<T::EthSpec>)> {
let mut db_blocks = Vec::new();

for root in block_roots {
if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) {
db_blocks.push((root, Ok(Some(cached_block))));
continue;
}

match self.beacon_chain.store.try_get_full_block(&root) {
Err(e) => db_blocks.push((root, Err(e.into()))),
Ok(opt_block) => db_blocks.push((
root,
Ok(opt_block.map(|db_block| match db_block {
DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)),
DatabaseBlock::Blinded(block) => {
LoadedBeaconBlock::Blinded(Box::new(block))
async fn load_payloads(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
) -> Result<Vec<(Hash256, LoadResult<T::EthSpec>)>, BeaconChainError> {
let streamer = self.clone();
// Loading from the DB is slow -> spawn a blocking task
self.beacon_chain
.spawn_blocking_handle(
move || {
let mut db_blocks = Vec::new();
for root in block_roots {
if let Some(cached_block) =
streamer.check_caches(root).map(LoadedBeaconBlock::Full)
{
db_blocks.push((root, Ok(Some(cached_block))));
continue;
}
})),
)),
}
}

db_blocks
match streamer.beacon_chain.store.try_get_full_block(&root) {
Err(e) => db_blocks.push((root, Err(e.into()))),
Ok(opt_block) => db_blocks.push((
root,
Ok(opt_block.map(|db_block| match db_block {
DatabaseBlock::Full(block) => {
LoadedBeaconBlock::Full(Arc::new(block))
}
DatabaseBlock::Blinded(block) => {
LoadedBeaconBlock::Blinded(Box::new(block))
}
})),
)),
}
}
db_blocks
},
"load_beacon_blocks",
)
.await
}

/// Pre-process the loaded blocks into execution engine requests.
Expand Down Expand Up @@ -549,7 +562,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {

// used when the execution engine doesn't support the payload bodies methods
async fn stream_blocks_fallback(
&self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
Expand All @@ -575,7 +588,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}

async fn stream_blocks(
&self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
Expand All @@ -584,7 +597,17 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
let mut n_sent = 0usize;
let mut engine_requests = 0usize;

let payloads = self.load_payloads(block_roots);
let payloads = match self.load_payloads(block_roots).await {
Ok(payloads) => payloads,
Err(e) => {
error!(
self.beacon_chain.log,
"BeaconBlockStreamer: Failed to load payloads";
"error" => ?e
);
return;
}
};
let requests = self.get_requests(payloads).await;

for (root, request) in requests {
Expand Down Expand Up @@ -624,7 +647,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}

pub async fn stream(
self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
Expand All @@ -650,16 +673,16 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}

pub fn launch_stream(
self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> impl Stream<Item = (Hash256, Arc<BlockResult<T::EthSpec>>)> {
let (block_tx, block_rx) = mpsc::unbounded_channel();
debug!(
self.beacon_chain.log,
"Launching a BeaconBlockStreamer";
"blocks" => block_roots.len(),
);
let executor = self.beacon_chain.task_executor.clone();
executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender");
UnboundedReceiverStream::new(block_rx)
}
Expand Down
8 changes: 2 additions & 6 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn get_blocks_checking_caches(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> Result<
impl Stream<
Item = (
Expand All @@ -1149,14 +1148,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>,
Error,
> {
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::Yes)?
.launch_stream(block_roots, executor))
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::Yes)?.launch_stream(block_roots))
}

pub fn get_blocks(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> Result<
impl Stream<
Item = (
Expand All @@ -1166,8 +1163,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>,
Error,
> {
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::No)?
.launch_stream(block_roots, executor))
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::No)?.launch_stream(block_roots))
}

pub fn get_blobs_checking_early_attester_cache(
Expand Down
12 changes: 4 additions & 8 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,21 +419,17 @@ where
self
}

pub fn execution_layer_from_urls(mut self, urls: &[&str]) -> Self {
pub fn execution_layer_from_url(mut self, url: &str) -> Self {
assert!(
self.execution_layer.is_none(),
"execution layer already defined"
);

let urls: Vec<SensitiveUrl> = urls
.iter()
.map(|s| SensitiveUrl::parse(s))
.collect::<Result<_, _>>()
.unwrap();
let url = SensitiveUrl::parse(url).ok();

let config = execution_layer::Config {
execution_endpoints: urls,
secret_files: vec![],
execution_endpoint: url,
secret_file: None,
suggested_fee_recipient: Some(Address::repeat_byte(42)),
..Default::default()
};
Expand Down
35 changes: 9 additions & 26 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ pub enum BlockingOrAsync {
/// queuing specifics.
pub enum Work<E: EthSpec> {
GossipAttestation {
attestation: GossipAttestationPackage<E>,
attestation: Box<GossipAttestationPackage<E>>,
process_individual: Box<dyn FnOnce(GossipAttestationPackage<E>) + Send + Sync>,
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
},
Expand All @@ -583,7 +583,7 @@ pub enum Work<E: EthSpec> {
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
},
GossipAggregate {
aggregate: GossipAggregatePackage<E>,
aggregate: Box<GossipAggregatePackage<E>>,
process_individual: Box<dyn FnOnce(GossipAggregatePackage<E>) + Send + Sync>,
process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
},
Expand Down Expand Up @@ -624,8 +624,8 @@ pub enum Work<E: EthSpec> {
ChainSegment(AsyncFn),
ChainSegmentBackfill(AsyncFn),
Status(BlockingFn),
BlocksByRangeRequest(BlockingFnWithManualSendOnIdle),
BlocksByRootsRequest(BlockingFnWithManualSendOnIdle),
BlocksByRangeRequest(AsyncFn),
BlocksByRootsRequest(AsyncFn),
BlobsByRangeRequest(BlockingFn),
BlobsByRootsRequest(BlockingFn),
GossipBlsToExecutionChange(BlockingFn),
Expand Down Expand Up @@ -1015,7 +1015,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual: _,
process_batch,
} => {
aggregates.push(aggregate);
aggregates.push(*aggregate);
if process_batch_opt.is_none() {
process_batch_opt = Some(process_batch);
}
Expand Down Expand Up @@ -1075,7 +1075,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual: _,
process_batch,
} => {
attestations.push(attestation);
attestations.push(*attestation);
if process_batch_opt.is_none() {
process_batch_opt = Some(process_batch);
}
Expand Down Expand Up @@ -1445,7 +1445,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual,
process_batch: _,
} => task_spawner.spawn_blocking(move || {
process_individual(attestation);
process_individual(*attestation);
}),
Work::GossipAttestationBatch {
attestations,
Expand All @@ -1458,7 +1458,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual,
process_batch: _,
} => task_spawner.spawn_blocking(move || {
process_individual(aggregate);
process_individual(*aggregate);
}),
Work::GossipAggregateBatch {
aggregates,
Expand Down Expand Up @@ -1493,7 +1493,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
task_spawner.spawn_blocking(process_fn)
}
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
task_spawner.spawn_blocking_with_manual_send_idle(work)
task_spawner.spawn_async(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
Expand Down Expand Up @@ -1555,23 +1555,6 @@ impl TaskSpawner {
WORKER_TASK_NAME,
)
}

/// Spawn a blocking task, passing the `SendOnDrop` into the task.
///
/// ## Notes
///
/// Users must ensure the `SendOnDrop` is dropped at the appropriate time!
pub fn spawn_blocking_with_manual_send_idle<F>(self, task: F)
where
F: FnOnce(SendOnDrop) + Send + 'static,
{
self.executor.spawn_blocking(
|| {
task(self.send_idle_on_drop);
},
WORKER_TASK_NAME,
)
}
}

/// This struct will send a message on `self.tx` when it is dropped. An error will be logged on
Expand Down
22 changes: 8 additions & 14 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,14 @@ struct Inner<E: EthSpec> {

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Config {
/// Endpoint urls for EL nodes that are running the engine api.
pub execution_endpoints: Vec<SensitiveUrl>,
/// Endpoint url for EL nodes that are running the engine api.
pub execution_endpoint: Option<SensitiveUrl>,
/// Endpoint urls for services providing the builder api.
pub builder_url: Option<SensitiveUrl>,
/// User agent to send with requests to the builder API.
pub builder_user_agent: Option<String>,
/// JWT secrets for the above endpoints running the engine api.
pub secret_files: Vec<PathBuf>,
/// JWT secret for the above endpoint running the engine api.
pub secret_file: Option<PathBuf>,
/// The default fee recipient to use on the beacon node if none if provided from
/// the validator client during block preparation.
pub suggested_fee_recipient: Option<Address>,
Expand All @@ -386,27 +386,21 @@ impl<E: EthSpec> ExecutionLayer<E> {
/// Instantiate `Self` with an Execution engine specified in `Config`, using JSON-RPC via HTTP.
pub fn from_config(config: Config, executor: TaskExecutor, log: Logger) -> Result<Self, Error> {
let Config {
execution_endpoints: urls,
execution_endpoint: url,
builder_url,
builder_user_agent,
secret_files,
secret_file,
suggested_fee_recipient,
jwt_id,
jwt_version,
default_datadir,
execution_timeout_multiplier,
} = config;

if urls.len() > 1 {
warn!(log, "Only the first execution engine url will be used");
}
let execution_url = urls.into_iter().next().ok_or(Error::NoEngine)?;
let execution_url = url.ok_or(Error::NoEngine)?;

// Use the default jwt secret path if not provided via cli.
let secret_file = secret_files
.into_iter()
.next()
.unwrap_or_else(|| default_datadir.join(DEFAULT_JWT_FILE));
let secret_file = secret_file.unwrap_or_else(|| default_datadir.join(DEFAULT_JWT_FILE));

let jwt_key = if secret_file.exists() {
// Read secret from file if it already exists
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/execution_layer/src/test_utils/mock_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ impl<E: EthSpec> MockBuilder<E> {

// This EL should not talk to a builder
let config = Config {
execution_endpoints: vec![mock_el_url],
secret_files: vec![path],
execution_endpoint: Some(mock_el_url),
secret_file: Some(path),
suggested_fee_recipient: None,
..Default::default()
};
Expand Down
Loading

0 comments on commit 79cde4a

Please sign in to comment.