Skip to content

Commit

Permalink
Add retry middleware to sync committee prover (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wizdave97 authored Mar 9, 2024
1 parent b9fbaef commit e4db05c
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 47 deletions.
51 changes: 51 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions evm/abi/src/generated/host_manager.rs

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions modules/consensus/sync-committee/prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ sync-committee-primitives = { path= "../primitives" }
sync-committee-verifier = { path= "../verifier" }
ssz-rs = { git = "https://github.com/polytope-labs/ssz-rs", branch = "main" }
reqwest = {version="0.11.14", features=["json"]}
reqwest-middleware = "0.2.4"
reqwest-chain = "0.1.0"
serde = { version = "1.0.185", features = ["derive"] }
serde_json = { version = "1.0.81"}
anyhow = "1.0.68"
Expand All @@ -24,6 +26,7 @@ bls_on_arkworks = { version = "0.2.2" }
primitive-types = { version = "0.12.1", features = ["serde_no_std", "impl-codec"] }
log = "0.4.20"
hex = "0.4.3"
async-trait = "0.1.77"


[dev-dependencies]
Expand Down
120 changes: 80 additions & 40 deletions modules/consensus/sync-committee/prover/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod middleware;
#[warn(unused_imports)]
#[warn(unused_variables)]
mod responses;
Expand All @@ -6,26 +7,25 @@ mod routes;
#[cfg(test)]
mod test;

use anyhow::anyhow;
use bls_on_arkworks::{point_to_pubkey, types::G1ProjectivePoint};
use log::debug;
use reqwest::Client;
use std::marker::PhantomData;
use sync_committee_primitives::{
consensus_types::{BeaconBlock, BeaconBlockHeader, BeaconState, Checkpoint, Validator},
types::VerifierState,
};

use crate::{
middleware::SwitchProviderMiddleware,
responses::{
finality_checkpoint_response::FinalityCheckpoint,
sync_committee_response::NodeSyncCommittee,
},
routes::*,
};
use anyhow::anyhow;
use bls_on_arkworks::{point_to_pubkey, types::G1ProjectivePoint};
use log::trace;
use primitive_types::H256;
use reqwest::{Client, Url};
use reqwest_chain::ChainMiddleware;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use ssz_rs::{Merkleized, Node};
use std::marker::PhantomData;
use sync_committee_primitives::{
consensus_types::{BeaconBlock, BeaconBlockHeader, BeaconState, Checkpoint, Validator},
constants::{
BlsPublicKey, Config, Root, BLOCK_ROOTS_INDEX, BYTES_PER_LOGS_BLOOM,
EPOCHS_PER_HISTORICAL_VECTOR, EPOCHS_PER_SLASHINGS_VECTOR, ETH1_DATA_VOTES_BOUND,
Expand All @@ -39,7 +39,7 @@ use sync_committee_primitives::{
deneb::MAX_BLOB_COMMITMENTS_PER_BLOCK,
types::{
AncestryProof, BlockRootsProof, ExecutionPayloadProof, FinalityProof, SyncCommitteeUpdate,
VerifierStateUpdate,
VerifierState, VerifierStateUpdate,
},
util::{
compute_epoch_at_slot, compute_sync_committee_period_at_slot,
Expand All @@ -63,43 +63,68 @@ pub type BeaconStateType = BeaconState<
>;

pub struct SyncCommitteeProver<C: Config> {
pub node_url: String,
pub client: Client,
pub primary_url: String,
pub providers: Vec<String>,
pub client: ClientWithMiddleware,
pub phantom: PhantomData<C>,
}

impl<C: Config> Clone for SyncCommitteeProver<C> {
fn clone(&self) -> Self {
Self { node_url: self.node_url.clone(), client: self.client.clone(), phantom: PhantomData }
Self {
primary_url: self.primary_url.clone(),
client: self.client.clone(),
providers: self.providers.clone(),
phantom: PhantomData,
}
}
}

impl<C: Config> SyncCommitteeProver<C> {
pub fn new(node_url: String) -> Self {
let client = Client::new();

SyncCommitteeProver::<C> { node_url, client, phantom: PhantomData }
pub fn new(providers: Vec<String>) -> Self {
let client = ClientBuilder::new(Client::new())
.with(ChainMiddleware::new(SwitchProviderMiddleware::_new(providers.clone())))
.build();

SyncCommitteeProver::<C> {
primary_url: providers.get(0).expect("There must be atleast one provider").clone(),
providers,
client,
phantom: PhantomData,
}
}

pub async fn fetch_finalized_checkpoint(
&self,
state_id: Option<&str>,
) -> Result<FinalityCheckpoint, anyhow::Error> {
let full_url = self.generate_route(&finality_checkpoints(state_id.unwrap_or("head")));
let response = self.client.get(full_url).send().await?;

let response_data =
response.json::<responses::finality_checkpoint_response::Response>().await?;
let full_url = self.generate_route(&finality_checkpoints(state_id.unwrap_or("head")))?;
let response = self
.client
.get(full_url)
.send()
.await
.map_err(|e| anyhow!("Failed to fetch finalized checkpoint due to error {e:?}"))?;

let response_data = response
.json::<responses::finality_checkpoint_response::Response>()
.await
.map_err(|e| anyhow!("Failed to fetch finalized checkpoint due to error {e:?}"))?;
Ok(response_data.data)
}

pub async fn fetch_header(&self, block_id: &str) -> Result<BeaconBlockHeader, anyhow::Error> {
let path = header_route(block_id);
let full_url = self.generate_route(&path);
let response = self.client.get(full_url).send().await?;
let full_url = self.generate_route(&path)?;
let response =
self.client.get(full_url).send().await.map_err(|e| {
anyhow!("Failed to fetch header with id {block_id} due to error {e:?}")
})?;

let response_data =
response.json::<responses::beacon_block_header_response::Response>().await?;
let response_data = response
.json::<responses::beacon_block_header_response::Response>()
.await
.map_err(|e| anyhow!("Failed to fetch header with id {block_id} due to error {e:?}"))?;

let beacon_block_header = response_data.data.header.message;

Expand Down Expand Up @@ -129,11 +154,17 @@ impl<C: Config> SyncCommitteeProver<C> {
anyhow::Error,
> {
let path = block_route(block_id);
let full_url = self.generate_route(&path);
let full_url = self.generate_route(&path)?;

let response = self.client.get(full_url).send().await?;
let response =
self.client.get(full_url).send().await.map_err(|e| {
anyhow!("Failed to fetch block with id {block_id} due to error {e:?}")
})?;

let response_data = response.json::<responses::beacon_block_response::Response>().await?;
let response_data = response
.json::<responses::beacon_block_response::Response>()
.await
.map_err(|e| anyhow!("Failed to fetch block with id {block_id} due to error {e:?}"))?;

let beacon_block = response_data.data.message;

Expand All @@ -145,7 +176,7 @@ impl<C: Config> SyncCommitteeProver<C> {
state_id: &str,
) -> Result<NodeSyncCommittee, anyhow::Error> {
let path = sync_committee_route(state_id);
let full_url = self.generate_route(&path);
let full_url = self.generate_route(&path)?;

let response = self.client.get(full_url).send().await?;

Expand All @@ -162,7 +193,7 @@ impl<C: Config> SyncCommitteeProver<C> {
validator_index: &str,
) -> Result<Validator, anyhow::Error> {
let path = validator_route(state_id, validator_index);
let full_url = self.generate_route(&path);
let full_url = self.generate_route(&path)?;

let response = self.client.get(full_url).send().await?;

Expand All @@ -178,19 +209,27 @@ impl<C: Config> SyncCommitteeProver<C> {
state_id: &str,
) -> Result<BeaconStateType, anyhow::Error> {
let path = beacon_state_route(state_id);
let full_url = self.generate_route(&path);
let full_url = self.generate_route(&path)?;

let response = self.client.get(full_url).send().await?;
let response = self.client.get(full_url).send().await.map_err(|e| {
anyhow!("Failed to fetch beacon state with id {state_id} due to error {e:?}")
})?;

let response_data = response.json::<responses::beacon_state_response::Response>().await?;
let response_data = response
.json::<responses::beacon_state_response::Response>()
.await
.map_err(|e| {
anyhow!("Failed to fetch beacon state with id {state_id} due to error {e:?}")
})?;

let beacon_state = response_data.data;

Ok(beacon_state)
}

fn generate_route(&self, path: &str) -> String {
format!("{}{}", self.node_url.clone(), path)
fn generate_route(&self, path: &str) -> Result<Url, anyhow::Error> {
let url = Url::parse(&format!("{}{}", self.primary_url.clone(), path))?;
Ok(url)
}

/// Fetches the latest finality update that can be verified by (state_period..=state_period+1)
Expand All @@ -201,15 +240,15 @@ impl<C: Config> SyncCommitteeProver<C> {
mut client_state: VerifierState,
finality_checkpoint: Checkpoint,
latest_block_id: Option<&str>,
debug_target: &str,
) -> Result<Option<VerifierStateUpdate>, anyhow::Error> {
if finality_checkpoint.root == Node::default() ||
client_state.latest_finalized_epoch >= finality_checkpoint.epoch
{
trace!(target: "sync-committee-prover", "No new epoch finalized yet {}", finality_checkpoint.epoch);
return Ok(None);
}

debug!(target: debug_target, "A new epoch has been finalized {}", finality_checkpoint.epoch);
trace!(target: "sync-committee-prover", "A new epoch has been finalized {}", finality_checkpoint.epoch);
// Find the highest block with the a threshhold number of sync committee signatures
let latest_header = self.fetch_header(latest_block_id.unwrap_or("head")).await?;
let latest_root = latest_header.clone().hash_tree_root()?;
Expand All @@ -230,7 +269,7 @@ impl<C: Config> SyncCommitteeProver<C> {
let parent_block_finality_checkpoint =
self.fetch_finalized_checkpoint(Some(&parent_state_id)).await?.finalized;
if parent_block_finality_checkpoint.epoch <= client_state.latest_finalized_epoch {
debug!(target: "prover", "Signature block search has reached an invalid epoch {} latest finalized_block_epoch {}", parent_block_finality_checkpoint.epoch, client_state.latest_finalized_epoch);
trace!(target: "sync-committee-prover", "Search for a block with a valid sync committee signature has reached an invalid epoch {} latest_finalized_block_epoch: {}", parent_block_finality_checkpoint.epoch, client_state.latest_finalized_epoch);
return Ok(None);
}

Expand Down Expand Up @@ -309,6 +348,7 @@ impl<C: Config> SyncCommitteeProver<C> {
let mut block = loop {
// Prevent an infinite loop
if count == 100 {
log::trace!("Prover could not find a suitable block for the sync committee: {period}, syncing will fail");
return Err(anyhow!("Error fetching blocks from selected epoch"));
}

Expand Down
Loading

0 comments on commit e4db05c

Please sign in to comment.