From dfd04a91e13cb714713d24d9c878e11c4e6eee80 Mon Sep 17 00:00:00 2001 From: Tumas Date: Wed, 30 Oct 2024 11:49:43 +0200 Subject: [PATCH] Add BlobSidecar events --- fork_choice_control/src/messages.rs | 31 ++++++++++++++++++++++++++++- fork_choice_control/src/mutator.rs | 14 ++++++++----- http_api/src/events.rs | 4 ++++ http_api/src/task.rs | 5 +++++ 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/fork_choice_control/src/messages.rs b/fork_choice_control/src/messages.rs index 7af28216..a2775cab 100644 --- a/fork_choice_control/src/messages.rs +++ b/fork_choice_control/src/messages.rs @@ -18,7 +18,10 @@ use serde::Serialize; use tap::Pipe as _; use types::{ combined::{Attestation, BeaconState, SignedAggregateAndProof, SignedBeaconBlock}, - deneb::containers::BlobIdentifier, + deneb::{ + containers::{BlobIdentifier, BlobSidecar}, + primitives::{BlobIndex, KzgCommitment, VersionedHash}, + }, phase0::{ containers::Checkpoint, primitives::{DepositIndex, Epoch, ExecutionBlockHash, Slot, ValidatorIndex, H256}, @@ -213,6 +216,7 @@ impl ValidatorMessage { #[derive(Debug)] pub enum ApiMessage { AttestationEvent(Arc>), + BlobSidecarEvent(BlobSidecarEvent), BlockEvent(BlockEvent), ChainReorgEvent(ChainReorgEvent), FinalizedCheckpoint(FinalizedCheckpointEvent), @@ -255,6 +259,31 @@ impl SyncMessage

{ } } +#[derive(Debug, Serialize)] +pub struct BlobSidecarEvent { + pub block_root: H256, + #[serde(with = "serde_utils::string_or_native")] + pub index: BlobIndex, + #[serde(with = "serde_utils::string_or_native")] + pub slot: Slot, + pub kzg_commitment: KzgCommitment, + pub versioned_hash: VersionedHash, +} + +impl BlobSidecarEvent { + pub fn new(block_root: H256, blob_sidecar: &BlobSidecar

) -> Self { + let kzg_commitment = blob_sidecar.kzg_commitment; + + Self { + block_root, + index: blob_sidecar.index, + slot: blob_sidecar.slot(), + kzg_commitment, + versioned_hash: misc::kzg_commitment_to_versioned_hash(kzg_commitment), + } + } +} + #[derive(Debug, Serialize)] pub struct BlockEvent { #[serde(with = "serde_utils::string_or_native")] diff --git a/fork_choice_control/src/mutator.rs b/fork_choice_control/src/mutator.rs index 1887ea44..a8178a94 100644 --- a/fork_choice_control/src/mutator.rs +++ b/fork_choice_control/src/mutator.rs @@ -59,8 +59,8 @@ use types::{ use crate::{ block_processor::BlockProcessor, messages::{ - AttestationVerifierMessage, MutatorMessage, P2pMessage, PoolMessage, SubnetMessage, - SyncMessage, ValidatorMessage, + AttestationVerifierMessage, BlobSidecarEvent, MutatorMessage, P2pMessage, PoolMessage, + SubnetMessage, SyncMessage, ValidatorMessage, }, misc::{ Delayed, MutatorRejectionReason, PendingAggregateAndProof, PendingAttestation, @@ -1055,7 +1055,7 @@ where reply_to_http_api(sender, Ok(ValidationOutcome::Accept)); - self.accept_blob_sidecar(&wait_group, blob_sidecar); + self.accept_blob_sidecar(&wait_group, &blob_sidecar); } Ok(BlobSidecarAction::Ignore(publishable)) => { let (gossip_id, sender) = origin.split(); @@ -1609,12 +1609,13 @@ where Ok(()) } - fn accept_blob_sidecar(&mut self, wait_group: &W, blob_sidecar: Arc>) { + fn accept_blob_sidecar(&mut self, wait_group: &W, blob_sidecar: &Arc>) { let old_head = self.store.head().clone(); let head_was_optimistic = old_head.is_optimistic(); let block_root = blob_sidecar.signed_block_header.message.hash_tree_root(); - self.store_mut().apply_blob_sidecar(blob_sidecar); + self.store_mut() + .apply_blob_sidecar(blob_sidecar.clone_arc()); self.update_store_snapshot(); @@ -1622,6 +1623,9 @@ where self.retry_block(wait_group.clone(), pending_block.clone()); } + ApiMessage::BlobSidecarEvent(BlobSidecarEvent::new(block_root, blob_sidecar)) + .send(&self.api_tx); + self.spawn(PersistBlobSidecarsTask { store_snapshot: self.owned_store(), storage: self.storage.clone_arc(), diff --git a/http_api/src/events.rs b/http_api/src/events.rs index 488f8cc2..d8a326ce 100644 --- a/http_api/src/events.rs +++ b/http_api/src/events.rs @@ -9,6 +9,7 @@ use tokio::sync::broadcast::{self, Receiver, Sender}; pub enum Topic { Attestation, AttesterSlashing, + BlobSidecar, Block, BlsToExecutionChange, ChainReorg, @@ -28,6 +29,7 @@ impl Topic { pub struct EventChannels { pub attestations: Sender, pub attester_slashings: Sender, + pub blob_sidecars: Sender, pub blocks: Sender, pub bls_to_execution_changes: Sender, pub chain_reorgs: Sender, @@ -43,6 +45,7 @@ impl EventChannels { Self { attestations: broadcast::channel(max_events).0, attester_slashings: broadcast::channel(max_events).0, + blob_sidecars: broadcast::channel(max_events).0, blocks: broadcast::channel(max_events).0, bls_to_execution_changes: broadcast::channel(max_events).0, chain_reorgs: broadcast::channel(max_events).0, @@ -58,6 +61,7 @@ impl EventChannels { match topic { Topic::Attestation => &self.attestations, Topic::AttesterSlashing => &self.attester_slashings, + Topic::BlobSidecar => &self.blob_sidecars, Topic::Block => &self.blocks, Topic::BlsToExecutionChange => &self.bls_to_execution_changes, Topic::ChainReorg => &self.chain_reorgs, diff --git a/http_api/src/task.rs b/http_api/src/task.rs index 07b7a0c5..f112829f 100644 --- a/http_api/src/task.rs +++ b/http_api/src/task.rs @@ -188,6 +188,7 @@ async fn handle_events( let EventChannels { attestations, attester_slashings, + blob_sidecars, blocks, bls_to_execution_changes, chain_reorgs, @@ -237,6 +238,10 @@ async fn handle_events( let event = Topic::Attestation.build(attestation)?; attestations.send(event).unwrap_or_default() } + ApiMessage::BlobSidecarEvent(blob_sidecar) => { + let event = Topic::BlobSidecar.build(blob_sidecar)?; + blob_sidecars.send(event).unwrap_or_default() + } ApiMessage::BlockEvent(block_event) => { let event = Topic::Block.build(block_event)?; blocks.send(event).unwrap_or_default()