Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposer and attester slashing sse events #5327

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2422,6 +2422,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
proposer_slashing: ProposerSlashing,
) -> Result<ObservationOutcome<ProposerSlashing, T::EthSpec>, Error> {
let wall_clock_state = self.wall_clock_state()?;

Ok(self.observed_proposer_slashings.lock().verify_and_observe(
proposer_slashing,
&wall_clock_state,
Expand All @@ -2434,6 +2435,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
proposer_slashing: SigVerifiedOp<ProposerSlashing, T::EthSpec>,
) {
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_proposer_slashing_subscribers() {
event_handler.register(EventKind::ProposerSlashing(Box::new(
proposer_slashing.clone().into_inner(),
)));
}
}

if self.eth1_chain.is_some() {
self.op_pool.insert_proposer_slashing(proposer_slashing)
}
Expand All @@ -2445,6 +2454,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
attester_slashing: AttesterSlashing<T::EthSpec>,
) -> Result<ObservationOutcome<AttesterSlashing<T::EthSpec>, T::EthSpec>, Error> {
let wall_clock_state = self.wall_clock_state()?;

Ok(self.observed_attester_slashings.lock().verify_and_observe(
attester_slashing,
&wall_clock_state,
Expand All @@ -2465,6 +2475,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_write_lock()
.on_attester_slashing(attester_slashing.as_inner());

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_attester_slashing_subscribers() {
event_handler.register(EventKind::AttesterSlashing(Box::new(
attester_slashing.clone().into_inner(),
)));
}
}

// Add to the op pool (if we have the ability to propose blocks).
if self.eth1_chain.is_some() {
self.op_pool.insert_attester_slashing(attester_slashing)
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,8 @@ where
.ok_or("Cannot build without a genesis state root")?;
let validator_monitor_config = self.validator_monitor_config.unwrap_or_default();
let head_tracker = Arc::new(self.head_tracker.unwrap_or_default());

let beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>> = <_>::default();

let mut validator_monitor = ValidatorMonitor::new(
validator_monitor_config,
beacon_proposer_cache.clone(),
Expand Down
30 changes: 30 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub struct ServerSentEventHandler<E: EthSpec> {
light_client_finality_update_tx: Sender<EventKind<E>>,
light_client_optimistic_update_tx: Sender<EventKind<E>>,
block_reward_tx: Sender<EventKind<E>>,
proposer_slashing_tx: Sender<EventKind<E>>,
attester_slashing_tx: Sender<EventKind<E>>,
log: Logger,
}

Expand All @@ -45,6 +47,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
let (light_client_finality_update_tx, _) = broadcast::channel(capacity);
let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity);
let (block_reward_tx, _) = broadcast::channel(capacity);
let (proposer_slashing_tx, _) = broadcast::channel(capacity);
let (attester_slashing_tx, _) = broadcast::channel(capacity);

Self {
attestation_tx,
Expand All @@ -60,6 +64,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
light_client_finality_update_tx,
light_client_optimistic_update_tx,
block_reward_tx,
proposer_slashing_tx,
attester_slashing_tx,
log,
}
}
Expand Down Expand Up @@ -126,6 +132,14 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.block_reward_tx
.send(kind)
.map(|count| log_count("block reward", count)),
EventKind::ProposerSlashing(_) => self
.proposer_slashing_tx
.send(kind)
.map(|count| log_count("proposer slashing", count)),
EventKind::AttesterSlashing(_) => self
.attester_slashing_tx
.send(kind)
.map(|count| log_count("attester slashing", count)),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
Expand Down Expand Up @@ -184,6 +198,14 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.block_reward_tx.subscribe()
}

pub fn subscribe_attester_slashing(&self) -> Receiver<EventKind<E>> {
self.attester_slashing_tx.subscribe()
}

pub fn subscribe_proposer_slashing(&self) -> Receiver<EventKind<E>> {
self.proposer_slashing_tx.subscribe()
}

pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0
}
Expand Down Expand Up @@ -227,4 +249,12 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
pub fn has_block_reward_subscribers(&self) -> bool {
self.block_reward_tx.receiver_count() > 0
}

pub fn has_proposer_slashing_subscribers(&self) -> bool {
self.proposer_slashing_tx.receiver_count() > 0
}

pub fn has_attester_slashing_subscribers(&self) -> bool {
self.attester_slashing_tx.receiver_count() > 0
}
}
6 changes: 6 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4358,6 +4358,12 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::BlockReward => {
event_handler.subscribe_block_reward()
}
api_types::EventTopic::AttesterSlashing => {
event_handler.subscribe_attester_slashing()
}
api_types::EventTopic::ProposerSlashing => {
event_handler.subscribe_proposer_slashing()
}
};

receivers.push(
Expand Down
38 changes: 38 additions & 0 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5214,6 +5214,8 @@ impl ApiTester {
EventTopic::Block,
EventTopic::Head,
EventTopic::FinalizedCheckpoint,
EventTopic::AttesterSlashing,
EventTopic::ProposerSlashing,
];
let mut events_future = self
.client
Expand Down Expand Up @@ -5353,6 +5355,42 @@ impl ApiTester {
.await;
assert_eq!(reorg_event.as_slice(), &[expected_reorg]);

// Test attester slashing event
let mut attester_slashing_event_future = self
.client
.get_events::<E>(&[EventTopic::AttesterSlashing])
.await
.unwrap();

self.harness.add_attester_slashing(vec![1, 2, 3]).unwrap();
eserilev marked this conversation as resolved.
Show resolved Hide resolved

let attester_slashing_event = poll_events(
&mut attester_slashing_event_future,
1,
Duration::from_millis(10000),
)
.await;

assert!(attester_slashing_event.len() == 1);

// Test proposer slashing event
let mut proposer_slashing_event_future = self
.client
.get_events::<E>(&[EventTopic::ProposerSlashing])
.await
.unwrap();

self.harness.add_proposer_slashing(1).unwrap();
eserilev marked this conversation as resolved.
Show resolved Hide resolved

let proposer_slashing_event = poll_events(
&mut proposer_slashing_event_future,
1,
Duration::from_millis(10000),
)
.await;

assert!(proposer_slashing_event.len() == 1);

self
}

Expand Down
20 changes: 20 additions & 0 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,8 @@ pub enum EventKind<E: EthSpec> {
#[cfg(feature = "lighthouse")]
BlockReward(BlockReward),
PayloadAttributes(VersionedSsePayloadAttributes),
ProposerSlashing(Box<ProposerSlashing>),
AttesterSlashing(Box<AttesterSlashing<E>>),
}

impl<E: EthSpec> EventKind<E> {
Expand All @@ -1099,6 +1101,8 @@ impl<E: EthSpec> EventKind<E> {
EventKind::LightClientOptimisticUpdate(_) => "light_client_optimistic_update",
#[cfg(feature = "lighthouse")]
EventKind::BlockReward(_) => "block_reward",
EventKind::ProposerSlashing(_) => "proposer_slashing",
EventKind::AttesterSlashing(_) => "attester_slashing",
}
}

Expand Down Expand Up @@ -1179,6 +1183,16 @@ impl<E: EthSpec> EventKind<E> {
"block_reward" => Ok(EventKind::BlockReward(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Block Reward: {:?}", e)),
)?)),
"attester_slashing" => Ok(EventKind::AttesterSlashing(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Attester Slashing: {:?}", e))
})?,
)),
"proposer_slashing" => Ok(EventKind::ProposerSlashing(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Proposer Slashing: {:?}", e))
})?,
)),
_ => Err(ServerError::InvalidServerSentEvent(
"Could not parse event tag".to_string(),
)),
Expand Down Expand Up @@ -1210,6 +1224,8 @@ pub enum EventTopic {
LightClientOptimisticUpdate,
#[cfg(feature = "lighthouse")]
BlockReward,
AttesterSlashing,
ProposerSlashing,
}

impl FromStr for EventTopic {
Expand All @@ -1231,6 +1247,8 @@ impl FromStr for EventTopic {
"light_client_optimistic_update" => Ok(EventTopic::LightClientOptimisticUpdate),
#[cfg(feature = "lighthouse")]
"block_reward" => Ok(EventTopic::BlockReward),
"attester_slashing" => Ok(EventTopic::AttesterSlashing),
"proposer_slashing" => Ok(EventTopic::ProposerSlashing),
_ => Err("event topic cannot be parsed.".to_string()),
}
}
Expand All @@ -1253,6 +1271,8 @@ impl fmt::Display for EventTopic {
EventTopic::LightClientOptimisticUpdate => write!(f, "light_client_optimistic_update"),
#[cfg(feature = "lighthouse")]
EventTopic::BlockReward => write!(f, "block_reward"),
EventTopic::AttesterSlashing => write!(f, "attester_slashing"),
EventTopic::ProposerSlashing => write!(f, "proposer_slashing"),
}
}
}
Expand Down
Loading