From f1c28f497f3e8450d9c5f3c01afc3af3e50d1f27 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 15 Nov 2023 19:47:41 +0300 Subject: [PATCH] use dedicated channel --- beacon_node/beacon_chain/src/beacon_chain.rs | 14 ++++++++++++++ beacon_node/beacon_chain/src/builder.rs | 16 +++++++++++++++- beacon_node/client/src/builder.rs | 2 +- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 588506459ab..66fe7c85d31 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -289,6 +289,8 @@ struct PartialBeaconBlock> { bls_to_execution_changes: Vec, } +pub type LightclientProducerEvent = (Hash256, Slot, SyncAggregate); + pub type BeaconForkChoice = ForkChoice< BeaconForkChoiceStore< ::EthSpec, @@ -410,6 +412,8 @@ pub struct BeaconChain { pub pre_finalization_block_cache: PreFinalizationBlockCache, /// A cache used to produce lightclient server messages pub lightclient_server_cache: LightclientServerCache, + /// Sender to signal the lightclient server to produce new updates + pub lightclient_server_tx: Option>>, /// Sender given to tasks, so that if they encounter a state in which execution cannot /// continue they can request that everything shuts down. pub shutdown_sender: Sender, @@ -3477,6 +3481,16 @@ impl BeaconChain { })); } } + + if let Some(lightclient_server_tx) = self.lightclient_server_tx { + if let Ok(sync_aggregate) = block.body().sync_aggregate() { + lightclient_server_tx.try_send(( + block.parent_root(), + block.slot(), + sync_aggregate.clone(), + )); + } + } } // For the current and next epoch of this state, ensure we have the shuffling from this diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 1076a433344..c3e46d3d15d 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,4 +1,6 @@ -use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; +use crate::beacon_chain::{ + CanonicalHead, LightclientProducerEvent, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY, +}; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; use crate::eth1_finalization_cache::Eth1FinalizationCache; use crate::fork_choice_signal::ForkChoiceSignalTx; @@ -84,6 +86,7 @@ pub struct BeaconChainBuilder { event_handler: Option>, slot_clock: Option, shutdown_sender: Option>, + lightclient_server_tx: Option>>, head_tracker: Option, validator_pubkey_cache: Option>, spec: ChainSpec, @@ -125,6 +128,7 @@ where event_handler: None, slot_clock: None, shutdown_sender: None, + lightclient_server_tx: None, head_tracker: None, validator_pubkey_cache: None, spec: TEthSpec::default_spec(), @@ -561,6 +565,15 @@ where self } + /// Sets a `Sender` to allow the beacon chain to send shutdown signals. + pub fn lightclient_server_tx( + mut self, + sender: Sender>, + ) -> Self { + self.lightclient_server_tx = Some(sender); + self + } + /// Creates a new, empty operation pool. fn empty_op_pool(mut self) -> Self { self.op_pool = Some(OperationPool::new()); @@ -861,6 +874,7 @@ where attester_cache: <_>::default(), early_attester_cache: <_>::default(), lightclient_server_cache: LightclientServerCache::new(), + lightclient_server_tx: self.lightclient_server_tx, shutdown_sender: self .shutdown_sender .ok_or("Cannot build without a shutdown sender.")?, diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 545a28a9651..d333d4e1576 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -838,7 +838,7 @@ where let broadcast_context = runtime_context.service_context("lcserv_bcast".to_string()); let log = broadcast_context.log().clone(); - broadcast_context.executor.spawn( + broadcast_context.executor.spawn_blocking( async move { broadcast_lightclient_updates( &inner_chain,