diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7b457fed370..aceb0178c23 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2551,6 +2551,24 @@ impl BeaconChain { } } + /// Verify and import a consolidation and queue it for inclusion in an appropriate block. + pub fn verify_and_import_consolidation( + &self, + consolidation: SignedConsolidation, + ) -> Result<(), Error> { + // Add to the op pool (if we have the ability to propose blocks). + if self.eth1_chain.is_some() { + let head_snapshot = self.head().snapshot; + let head_state = &head_snapshot.beacon_state; + + // TODO(maxeb): may double insert, since there's no observable cache + self.op_pool + .insert_consolidation(consolidation.validate(head_state, &self.spec)?) + } + + Ok(()) + } + /// Attempt to obtain sync committee duties from the head. pub fn sync_committee_duties_from_head( &self, diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 9c1ba06f853..1ea100c3dad 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -21,8 +21,8 @@ use state_processing::{ block_signature_verifier::Error as BlockSignatureVerifierError, per_block_processing::errors::{ AttestationValidationError, AttesterSlashingValidationError, - BlsExecutionChangeValidationError, ExitValidationError, ProposerSlashingValidationError, - SyncCommitteeMessageValidationError, + BlsExecutionChangeValidationError, ConsolidationValidationError, ExitValidationError, + ProposerSlashingValidationError, SyncCommitteeMessageValidationError, }, signature_sets::Error as SignatureSetError, state_advance::Error as StateAdvanceError, @@ -75,6 +75,7 @@ pub enum BeaconChainError { ProposerSlashingValidationError(ProposerSlashingValidationError), AttesterSlashingValidationError(AttesterSlashingValidationError), BlsExecutionChangeValidationError(BlsExecutionChangeValidationError), + ConsolidationValidationError(ConsolidationValidationError), StateSkipTooLarge { start_slot: Slot, requested_slot: Slot, @@ -233,6 +234,7 @@ easy_from_to!(ExitValidationError, BeaconChainError); easy_from_to!(ProposerSlashingValidationError, BeaconChainError); easy_from_to!(AttesterSlashingValidationError, BeaconChainError); easy_from_to!(BlsExecutionChangeValidationError, BeaconChainError); +easy_from_to!(ConsolidationValidationError, BeaconChainError); easy_from_to!(SszTypesError, BeaconChainError); easy_from_to!(OpPoolError, BeaconChainError); easy_from_to!(NaiveAggregationError, BeaconChainError); diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 91f9047a62d..3b5880b5825 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -82,8 +82,8 @@ use types::{ AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, - SyncCommitteeMessage, SyncContributionData, + SignedConsolidation, SignedContributionAndProof, SignedValidatorRegistrationData, + SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -2226,6 +2226,47 @@ pub fn serve( }, ); + // GET beacon/pool/consolidations + let get_beacon_pool_consolidations = beacon_pool_path + .clone() + .and(warp::path("consolidations")) + .and(warp::path::end()) + .then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let consolidations = chain.op_pool.get_all_consolidations(); + Ok(api_types::GenericResponse::from(consolidations)) + }) + }, + ); + + // POST beacon/pool/consolidations + let post_beacon_pool_consolidations = beacon_pool_path + .clone() + .and(warp::path("consolidations")) + .and(warp::path::end()) + .and(warp::body::json()) + .then( + |task_spawner: TaskSpawner, + chain: Arc>, + consolidation: SignedConsolidation| { + task_spawner.blocking_json_task(Priority::P0, move || { + chain + .verify_and_import_consolidation(consolidation) + .map_err(|e| { + warp_utils::reject::object_invalid(format!( + "gossip verification failed: {:?}", + e + )) + })?; + + // TODO(maxeb): register to validator monitor? + + Ok(()) + }) + }, + ); + // GET beacon/deposit_snapshot let get_beacon_deposit_snapshot = eth_v1 .and(warp::path("beacon")) @@ -4567,6 +4608,7 @@ pub fn serve( .uor(get_beacon_pool_proposer_slashings) .uor(get_beacon_pool_voluntary_exits) .uor(get_beacon_pool_bls_to_execution_changes) + .uor(get_beacon_pool_consolidations) .uor(get_beacon_deposit_snapshot) .uor(get_beacon_rewards_blocks) .uor(get_config_fork_schedule) @@ -4645,6 +4687,7 @@ pub fn serve( .uor(post_beacon_pool_voluntary_exits) .uor(post_beacon_pool_sync_committees) .uor(post_beacon_pool_bls_to_execution_changes) + .uor(post_beacon_pool_consolidations) .uor(post_beacon_state_validators) .uor(post_beacon_state_validator_balances) .uor(post_beacon_rewards_attestations) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 16801be8e23..1eb004a4f3b 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -1372,6 +1372,39 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST beacon/pool/consolidations` + pub async fn post_beacon_pool_consolidations( + &self, + consolidation: &SignedConsolidation, + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("consolidations"); + + self.post(path, &consolidation).await?; + + Ok(()) + } + + /// `GET beacon/pool/consolidations` + pub async fn get_beacon_pool_consolidations( + &self, + ) -> Result>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("consolidations"); + + self.get(path).await + } + /// `GET beacon/deposit_snapshot` pub async fn get_deposit_snapshot(&self) -> Result, Error> { let mut path = self.eth_path(V1)?;