Skip to content

Commit

Permalink
use dedicated channel
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Nov 15, 2023
1 parent 02d06d1 commit f1c28f4
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
14 changes: 14 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ struct PartialBeaconBlock<E: EthSpec, Payload: AbstractExecPayload<E>> {
bls_to_execution_changes: Vec<SignedBlsToExecutionChange>,
}

pub type LightclientProducerEvent<T: EthSpec> = (Hash256, Slot, SyncAggregate<T>);

pub type BeaconForkChoice<T> = ForkChoice<
BeaconForkChoiceStore<
<T as BeaconChainTypes>::EthSpec,
Expand Down Expand Up @@ -410,6 +412,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub pre_finalization_block_cache: PreFinalizationBlockCache,
/// A cache used to produce lightclient server messages
pub lightclient_server_cache: LightclientServerCache<T>,
/// Sender to signal the lightclient server to produce new updates
pub lightclient_server_tx: Option<Sender<LightclientProducerEvent<T::EthSpec>>>,
/// Sender given to tasks, so that if they encounter a state in which execution cannot
/// continue they can request that everything shuts down.
pub shutdown_sender: Sender<ShutdownReason>,
Expand Down Expand Up @@ -3477,6 +3481,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}));
}
}

if let Some(lightclient_server_tx) = self.lightclient_server_tx {
if let Ok(sync_aggregate) = block.body().sync_aggregate() {
lightclient_server_tx.try_send((
block.parent_root(),
block.slot(),
sync_aggregate.clone(),
));
}
}
}

// For the current and next epoch of this state, ensure we have the shuffling from this
Expand Down
16 changes: 15 additions & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY};
use crate::beacon_chain::{
CanonicalHead, LightclientProducerEvent, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY,
};
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
use crate::eth1_finalization_cache::Eth1FinalizationCache;
use crate::fork_choice_signal::ForkChoiceSignalTx;
Expand Down Expand Up @@ -84,6 +86,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
slot_clock: Option<T::SlotClock>,
shutdown_sender: Option<Sender<ShutdownReason>>,
lightclient_server_tx: Option<Sender<LightclientProducerEvent<T::EthSpec>>>,
head_tracker: Option<HeadTracker>,
validator_pubkey_cache: Option<ValidatorPubkeyCache<T>>,
spec: ChainSpec,
Expand Down Expand Up @@ -125,6 +128,7 @@ where
event_handler: None,
slot_clock: None,
shutdown_sender: None,
lightclient_server_tx: None,
head_tracker: None,
validator_pubkey_cache: None,
spec: TEthSpec::default_spec(),
Expand Down Expand Up @@ -561,6 +565,15 @@ where
self
}

/// Sets a `Sender` to allow the beacon chain to send shutdown signals.
pub fn lightclient_server_tx(
mut self,
sender: Sender<LightclientProducerEvent<TEthSpec>>,
) -> Self {
self.lightclient_server_tx = Some(sender);
self
}

/// Creates a new, empty operation pool.
fn empty_op_pool(mut self) -> Self {
self.op_pool = Some(OperationPool::new());
Expand Down Expand Up @@ -861,6 +874,7 @@ where
attester_cache: <_>::default(),
early_attester_cache: <_>::default(),
lightclient_server_cache: LightclientServerCache::new(),
lightclient_server_tx: self.lightclient_server_tx,
shutdown_sender: self
.shutdown_sender
.ok_or("Cannot build without a shutdown sender.")?,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ where
let broadcast_context =
runtime_context.service_context("lcserv_bcast".to_string());
let log = broadcast_context.log().clone();
broadcast_context.executor.spawn(
broadcast_context.executor.spawn_blocking(
async move {
broadcast_lightclient_updates(
&inner_chain,
Expand Down

0 comments on commit f1c28f4

Please sign in to comment.