Skip to content

Commit

Permalink
Return not synced errors for endpoints that require syncing (#5136)
Browse files Browse the repository at this point in the history
* add not synced filter into then blocks

* refactor
  • Loading branch information
eserilev authored Apr 4, 2024
1 parent 7825af4 commit f4cdcea
Showing 1 changed file with 34 additions and 8 deletions.
42 changes: 34 additions & 8 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ use warp::http::StatusCode;
use warp::hyper::Body;
use warp::sse::Event;
use warp::Reply;
use warp::{http::Response, Filter};
use warp::{http::Response, Filter, Rejection};
use warp_utils::{query::multi_key_query, uor::UnifyingOrFilter};

const API_PREFIX: &str = "eth";
Expand Down Expand Up @@ -453,7 +453,7 @@ pub fn serve<T: BeaconChainTypes>(
warp::any()
.and(network_globals.clone())
.and(chain_filter.clone())
.and_then(
.then(
move |network_globals: Arc<NetworkGlobals<T::EthSpec>>,
chain: Arc<BeaconChain<T>>| async move {
match *network_globals.sync_state.read() {
Expand Down Expand Up @@ -488,8 +488,7 @@ pub fn serve<T: BeaconChainTypes>(
)),
}
},
)
.untuple_one();
);

// Create a `warp` filter that provides access to the logger.
let inner_ctx = ctx.clone();
Expand Down Expand Up @@ -3058,10 +3057,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(log_filter.clone())
.then(
|epoch: Epoch,
not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
proposer_duties::proposer_duties(epoch, &chain, &log)
})
},
Expand All @@ -3087,6 +3088,7 @@ pub fn serve<T: BeaconChainTypes>(
|endpoint_version: EndpointVersion,
slot: Slot,
accept_header: Option<api_types::Accept>,
not_synced_filter: Result<(), Rejection>,
query: api_types::ValidatorBlocksQuery,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
Expand All @@ -3098,6 +3100,8 @@ pub fn serve<T: BeaconChainTypes>(
"slot" => slot
);

not_synced_filter?;

if endpoint_version == V3 {
produce_block_v3(accept_header, chain, slot, query).await
} else {
Expand All @@ -3124,11 +3128,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|slot: Slot,
not_synced_filter: Result<(), Rejection>,
query: api_types::ValidatorBlocksQuery,
accept_header: Option<api_types::Accept>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
not_synced_filter?;
produce_blinded_block_v2(EndpointVersion(2), accept_header, chain, slot, query)
.await
})
Expand All @@ -3146,9 +3152,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|query: api_types::ValidatorAttestationDataQuery,
not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;

let current_slot = chain
.slot()
.map_err(warp_utils::reject::beacon_chain_error)?;
Expand Down Expand Up @@ -3181,9 +3190,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|query: api_types::ValidatorAggregateAttestationQuery,
not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
chain
.get_aggregated_attestation_by_slot_and_root(
query.slot,
Expand Down Expand Up @@ -3222,10 +3233,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|epoch: Epoch,
not_synced_filter: Result<(), Rejection>,
indices: api_types::ValidatorIndexData,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
attester_duties::attester_duties(epoch, &indices.0, &chain)
})
},
Expand All @@ -3248,10 +3261,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|epoch: Epoch,
not_synced_filter: Result<(), Rejection>,
indices: api_types::ValidatorIndexData,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
sync_committees::sync_committee_duties(epoch, &indices.0, &chain)
})
},
Expand All @@ -3268,9 +3283,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|sync_committee_data: SyncContributionData,
not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
chain
.get_aggregated_sync_committee_contribution(&sync_committee_data)
.map_err(|e| {
Expand Down Expand Up @@ -3301,11 +3318,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
|not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
aggregates: Vec<SignedAggregateAndProof<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>, log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
let seen_timestamp = timestamp_now();
let mut verified_aggregates = Vec::with_capacity(aggregates.len());
let mut messages = Vec::with_capacity(aggregates.len());
Expand Down Expand Up @@ -3414,12 +3433,14 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter)
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
|not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
contributions: Vec<SignedContributionAndProof<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
sync_committees::process_signed_contribution_and_proofs(
contributions,
network_tx,
Expand Down Expand Up @@ -3494,11 +3515,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(log_filter.clone())
.and(warp_utils::json::json())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
|not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
log: Logger,
preparation_data: Vec<ProposerPreparationData>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
not_synced_filter?;
let execution_layer = chain
.execution_layer
.as_ref()
Expand Down Expand Up @@ -4197,8 +4220,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
|not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
not_synced_filter?;
chain.store_migrator.process_reconstruction();
Ok("success")
})
Expand Down

0 comments on commit f4cdcea

Please sign in to comment.