Skip to content

Commit

Permalink
API for LightClientBootstrap, LightClientFinalityUpdate, `LightCl…
Browse files Browse the repository at this point in the history
…ientOptimisticUpdate` and light client events (#3954)

* rebase and add comment

* conditional test

* test

* optimistic chould be working now

* finality should be working now

* try again

* try again

* clippy fix

* add lc bootstrap beacon api

* add lc optimistic/finality update to events

* fmt

* That error isn't occuring on my computer but I think this should fix it

* Add missing test file

* Update light client types to comply with Altair light client spec.

* Fix test compilation

* Support deserializing light client structures for the Bellatrix fork

* Move `get_light_client_bootstrap` logic to `BeaconChain`. `LightClientBootstrap` API to return `ForkVersionedResponse`.

* Misc fixes.
- log cleanup
- move http_api config mutation to `config::get_config` for consistency
- fix light client API responses

* Add light client bootstrap API test and fix existing ones.

* Fix test for `light-client-server` http api config.

* Appease clippy

* Efficiency improvement when retrieving beacon state.

---------

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>
  • Loading branch information
GeemoCandama and jimmygchen authored Nov 28, 2023
1 parent 44c1817 commit 8a599ec
Show file tree
Hide file tree
Showing 24 changed files with 629 additions and 108 deletions.
33 changes: 33 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6446,6 +6446,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn data_availability_boundary(&self) -> Option<Epoch> {
self.data_availability_checker.data_availability_boundary()
}

/// Gets the `LightClientBootstrap` object for a requested block root.
///
/// Returns `None` when the state or block is not found in the database.
#[allow(clippy::type_complexity)]
pub fn get_light_client_bootstrap(
&self,
block_root: &Hash256,
) -> Result<Option<(LightClientBootstrap<T::EthSpec>, ForkName)>, Error> {
let Some((state_root, slot)) = self
.get_blinded_block(block_root)?
.map(|block| (block.state_root(), block.slot()))
else {
return Ok(None);
};

let Some(mut state) = self.get_state(&state_root, Some(slot))? else {
return Ok(None);
};

let fork_name = state
.fork_name(&self.spec)
.map_err(Error::InconsistentFork)?;

match fork_name {
ForkName::Altair | ForkName::Merge => {
LightClientBootstrap::from_beacon_state(&mut state)
.map(|bootstrap| Some((bootstrap, fork_name)))
.map_err(Error::LightClientError)
}
ForkName::Base | ForkName::Capella | ForkName::Deneb => Err(Error::UnsupportedFork),
}
}
}

impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ pub enum BeaconChainError {
ProposerHeadForkChoiceError(fork_choice::Error<proto_array::Error>),
UnableToPublish,
AvailabilityCheckError(AvailabilityCheckError),
LightClientError(LightClientError),
UnsupportedFork,
}

easy_from_to!(SlotProcessingError, BeaconChainError);
Expand Down
22 changes: 22 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub struct ServerSentEventHandler<T: EthSpec> {
contribution_tx: Sender<EventKind<T>>,
payload_attributes_tx: Sender<EventKind<T>>,
late_head: Sender<EventKind<T>>,
light_client_finality_update_tx: Sender<EventKind<T>>,
light_client_optimistic_update_tx: Sender<EventKind<T>>,
block_reward_tx: Sender<EventKind<T>>,
log: Logger,
}
Expand All @@ -40,6 +42,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
let (contribution_tx, _) = broadcast::channel(capacity);
let (payload_attributes_tx, _) = broadcast::channel(capacity);
let (late_head, _) = broadcast::channel(capacity);
let (light_client_finality_update_tx, _) = broadcast::channel(capacity);
let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity);
let (block_reward_tx, _) = broadcast::channel(capacity);

Self {
Expand All @@ -53,6 +57,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
contribution_tx,
payload_attributes_tx,
late_head,
light_client_finality_update_tx,
light_client_optimistic_update_tx,
block_reward_tx,
log,
}
Expand Down Expand Up @@ -108,6 +114,14 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
.late_head
.send(kind)
.map(|count| log_count("late head", count)),
EventKind::LightClientFinalityUpdate(_) => self
.light_client_finality_update_tx
.send(kind)
.map(|count| log_count("light client finality update", count)),
EventKind::LightClientOptimisticUpdate(_) => self
.light_client_optimistic_update_tx
.send(kind)
.map(|count| log_count("light client optimistic update", count)),
EventKind::BlockReward(_) => self
.block_reward_tx
.send(kind)
Expand Down Expand Up @@ -158,6 +172,14 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.late_head.subscribe()
}

pub fn subscribe_light_client_finality_update(&self) -> Receiver<EventKind<T>> {
self.light_client_finality_update_tx.subscribe()
}

pub fn subscribe_light_client_optimistic_update(&self) -> Receiver<EventKind<T>> {
self.light_client_optimistic_update_tx.subscribe()
}

pub fn subscribe_block_reward(&self) -> Receiver<EventKind<T>> {
self.block_reward_tx.subscribe()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientFinalityUpdate<T> {
chain: &BeaconChain<T>,
seen_timestamp: Duration,
) -> Result<Self, Error> {
let gossiped_finality_slot = light_client_finality_update.finalized_header.slot;
let gossiped_finality_slot = light_client_finality_update.finalized_header.beacon.slot;
let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0);
let signature_slot = light_client_finality_update.signature_slot;
let start_time = chain.slot_clock.start_of(signature_slot);
Expand All @@ -88,7 +88,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientFinalityUpdate<T> {
.get_blinded_block(&finalized_block_root)?
.ok_or(Error::FailedConstructingUpdate)?;
let latest_seen_finality_update_slot = match latest_seen_finality_update.as_ref() {
Some(update) => update.finalized_header.slot,
Some(update) => update.finalized_header.beacon.slot,
None => Slot::new(0),
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
chain: &BeaconChain<T>,
seen_timestamp: Duration,
) -> Result<Self, Error> {
let gossiped_optimistic_slot = light_client_optimistic_update.attested_header.slot;
let gossiped_optimistic_slot = light_client_optimistic_update.attested_header.beacon.slot;
let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0);
let signature_slot = light_client_optimistic_update.signature_slot;
let start_time = chain.slot_clock.start_of(signature_slot);
Expand All @@ -88,7 +88,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
.get_state(&attested_block.state_root(), Some(attested_block.slot()))?
.ok_or(Error::FailedConstructingUpdate)?;
let latest_seen_optimistic_update_slot = match latest_seen_optimistic_update.as_ref() {
Some(update) => update.attested_header.slot,
Some(update) => update.attested_header.beacon.slot,
None => Slot::new(0),
};

Expand All @@ -114,6 +114,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
// otherwise queue
let canonical_root = light_client_optimistic_update
.attested_header
.beacon
.canonical_root();

if canonical_root != head_block.message().parent_root() {
Expand Down
197 changes: 194 additions & 3 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ use tokio_stream::{
use types::{
Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError,
BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName,
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData,
SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData,
ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBlsToExecutionChange, SignedContributionAndProof,
SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{
Expand Down Expand Up @@ -143,6 +144,7 @@ pub struct Config {
pub enable_beacon_processor: bool,
#[serde(with = "eth2::types::serde_status_code")]
pub duplicate_block_status_code: StatusCode,
pub enable_light_client_server: bool,
}

impl Default for Config {
Expand All @@ -159,6 +161,7 @@ impl Default for Config {
sse_capacity_multiplier: 1,
enable_beacon_processor: true,
duplicate_block_status_code: StatusCode::ACCEPTED,
enable_light_client_server: false,
}
}
}
Expand Down Expand Up @@ -279,6 +282,18 @@ pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::lo
})
}

fn enable(is_enabled: bool) -> impl Filter<Extract = (), Error = warp::Rejection> + Clone {
warp::any()
.and_then(move || async move {
if is_enabled {
Ok(())
} else {
Err(warp::reject::not_found())
}
})
.untuple_one()
}

/// Creates a server that will serve requests using information from `ctx`.
///
/// The server will shut down gracefully when the `shutdown` future resolves.
Expand Down Expand Up @@ -2379,6 +2394,164 @@ pub fn serve<T: BeaconChainTypes>(
},
);

/*
* beacon/light_client
*/

let beacon_light_client_path = eth_v1
.and(warp::path("beacon"))
.and(warp::path("light_client"))
.and(chain_filter.clone());

// GET beacon/light_client/bootstrap/{block_root}
let get_beacon_light_client_bootstrap = beacon_light_client_path
.clone()
.and(task_spawner_filter.clone())
.and(warp::path("bootstrap"))
.and(warp::path::param::<Hash256>().or_else(|_| async {
Err(warp_utils::reject::custom_bad_request(
"Invalid block root value".to_string(),
))
}))
.and(warp::path::end())
.and(warp::header::optional::<api_types::Accept>("accept"))
.then(
|chain: Arc<BeaconChain<T>>,
task_spawner: TaskSpawner<T::EthSpec>,
block_root: Hash256,
accept_header: Option<api_types::Accept>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let (bootstrap, fork_name) = match chain.get_light_client_bootstrap(&block_root)
{
Ok(Some(res)) => res,
Ok(None) => {
return Err(warp_utils::reject::custom_not_found(
"Light client bootstrap unavailable".to_string(),
));
}
Err(e) => {
return Err(warp_utils::reject::custom_server_error(format!(
"Unable to obtain LightClientBootstrap instance: {e:?}"
)));
}
};

match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
.header("Content-Type", "application/octet-stream")
.body(bootstrap.as_ssz_bytes().into())
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to create response: {}",
e
))
}),
_ => Ok(warp::reply::json(&ForkVersionedResponse {
version: Some(fork_name),
data: bootstrap,
})
.into_response()),
}
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
);

// GET beacon/light_client/optimistic_update
let get_beacon_light_client_optimistic_update = beacon_light_client_path
.clone()
.and(task_spawner_filter.clone())
.and(warp::path("optimistic_update"))
.and(warp::path::end())
.and(warp::header::optional::<api_types::Accept>("accept"))
.then(
|chain: Arc<BeaconChain<T>>,
task_spawner: TaskSpawner<T::EthSpec>,
accept_header: Option<api_types::Accept>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let update = chain
.latest_seen_optimistic_update
.lock()
.clone()
.ok_or_else(|| {
warp_utils::reject::custom_not_found(
"No LightClientOptimisticUpdate is available".to_string(),
)
})?;

let fork_name = chain
.spec
.fork_name_at_slot::<T::EthSpec>(update.signature_slot);
match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
.header("Content-Type", "application/octet-stream")
.body(update.as_ssz_bytes().into())
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to create response: {}",
e
))
}),
_ => Ok(warp::reply::json(&ForkVersionedResponse {
version: Some(fork_name),
data: update,
})
.into_response()),
}
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
);

// GET beacon/light_client/finality_update
let get_beacon_light_client_finality_update = beacon_light_client_path
.clone()
.and(task_spawner_filter.clone())
.and(warp::path("finality_update"))
.and(warp::path::end())
.and(warp::header::optional::<api_types::Accept>("accept"))
.then(
|chain: Arc<BeaconChain<T>>,
task_spawner: TaskSpawner<T::EthSpec>,
accept_header: Option<api_types::Accept>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let update = chain
.latest_seen_finality_update
.lock()
.clone()
.ok_or_else(|| {
warp_utils::reject::custom_not_found(
"No LightClientFinalityUpdate is available".to_string(),
)
})?;

let fork_name = chain
.spec
.fork_name_at_slot::<T::EthSpec>(update.signature_slot);
match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
.header("Content-Type", "application/octet-stream")
.body(update.as_ssz_bytes().into())
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to create response: {}",
e
))
}),
_ => Ok(warp::reply::json(&ForkVersionedResponse {
version: Some(fork_name),
data: update,
})
.into_response()),
}
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
);

/*
* beacon/rewards
*/
Expand Down Expand Up @@ -4339,6 +4512,12 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::LateHead => {
event_handler.subscribe_late_head()
}
api_types::EventTopic::LightClientFinalityUpdate => {
event_handler.subscribe_light_client_finality_update()
}
api_types::EventTopic::LightClientOptimisticUpdate => {
event_handler.subscribe_light_client_optimistic_update()
}
api_types::EventTopic::BlockReward => {
event_handler.subscribe_block_reward()
}
Expand Down Expand Up @@ -4492,6 +4671,18 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_lighthouse_database_info)
.uor(get_lighthouse_block_rewards)
.uor(get_lighthouse_attestation_performance)
.uor(
enable(ctx.config.enable_light_client_server)
.and(get_beacon_light_client_optimistic_update),
)
.uor(
enable(ctx.config.enable_light_client_server)
.and(get_beacon_light_client_finality_update),
)
.uor(
enable(ctx.config.enable_light_client_server)
.and(get_beacon_light_client_bootstrap),
)
.uor(get_lighthouse_block_packing_efficiency)
.uor(get_lighthouse_merge_readiness)
.uor(get_events)
Expand Down
Loading

0 comments on commit 8a599ec

Please sign in to comment.