Skip to content

Commit

Permalink
Fix duties override bug in VC (#5305)
Browse files Browse the repository at this point in the history
* Fix duties override bug in VC

* Use initial request efficiently

* Prevent expired subscriptions by construction

* Clean up selection proof logic

* Add test
  • Loading branch information
michaelsproul authored Mar 4, 2024
1 parent f919f82 commit cff6258
Showing 1 changed file with 127 additions and 68 deletions.
195 changes: 127 additions & 68 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,43 +122,37 @@ pub struct SubscriptionSlots {
slots: Vec<(Slot, AtomicBool)>,
}

impl DutyAndProof {
/// Instantiate `Self`, computing the selection proof as well.
pub async fn new_with_selection_proof<T: SlotClock + 'static, E: EthSpec>(
duty: AttesterData,
validator_store: &ValidatorStore<T, E>,
spec: &ChainSpec,
) -> Result<Self, Error> {
let selection_proof = validator_store
.produce_selection_proof(duty.pubkey, duty.slot)
.await
.map_err(Error::FailedToProduceSelectionProof)?;

let selection_proof = selection_proof
.is_aggregator(duty.committee_length as usize, spec)
.map_err(Error::InvalidModulo)
.map(|is_aggregator| {
if is_aggregator {
Some(selection_proof)
} else {
// Don't bother storing the selection proof if the validator isn't an
// aggregator, we won't need it.
None
}
})?;

let subscription_slots = SubscriptionSlots::new(duty.slot);

Ok(Self {
duty,
selection_proof,
subscription_slots,
/// Create a selection proof for `duty`.
///
/// Return `Ok(None)` if the attesting validator is not an aggregator.
async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
duty: &AttesterData,
validator_store: &ValidatorStore<T, E>,
spec: &ChainSpec,
) -> Result<Option<SelectionProof>, Error> {
let selection_proof = validator_store
.produce_selection_proof(duty.pubkey, duty.slot)
.await
.map_err(Error::FailedToProduceSelectionProof)?;

selection_proof
.is_aggregator(duty.committee_length as usize, spec)
.map_err(Error::InvalidModulo)
.map(|is_aggregator| {
if is_aggregator {
Some(selection_proof)
} else {
// Don't bother storing the selection proof if the validator isn't an
// aggregator, we won't need it.
None
}
})
}
}

impl DutyAndProof {
/// Create a new `DutyAndProof` with the selection proof waiting to be filled in.
pub fn new_without_selection_proof(duty: AttesterData) -> Self {
let subscription_slots = SubscriptionSlots::new(duty.slot);
pub fn new_without_selection_proof(duty: AttesterData, current_slot: Slot) -> Self {
let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
Self {
duty,
selection_proof: None,
Expand All @@ -168,10 +162,13 @@ impl DutyAndProof {
}

impl SubscriptionSlots {
fn new(duty_slot: Slot) -> Arc<Self> {
fn new(duty_slot: Slot, current_slot: Slot) -> Arc<Self> {
let slots = ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.filter_map(|offset| duty_slot.safe_sub(offset).ok())
// Keep only scheduled slots that haven't happened yet. This avoids sending expired
// subscriptions.
.filter(|scheduled_slot| *scheduled_slot > current_slot)
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
.collect();
Arc::new(Self { slots })
Expand Down Expand Up @@ -787,14 +784,14 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
// request for extra data unless necessary in order to save on network bandwidth.
let uninitialized_validators =
get_uninitialized_validators(duties_service, &epoch, local_pubkeys);
let indices_to_request = if !uninitialized_validators.is_empty() {
let initial_indices_to_request = if !uninitialized_validators.is_empty() {
uninitialized_validators.as_slice()
} else {
&local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())]
};

let response =
post_validator_duties_attester(duties_service, epoch, indices_to_request).await?;
post_validator_duties_attester(duties_service, epoch, initial_indices_to_request).await?;
let dependent_root = response.dependent_root;

// Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch.
Expand All @@ -818,24 +815,29 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
return Ok(());
}

// Filter out validators which have already been requested.
let initial_duties = &response.data;
// Make a request for all indices that require updating which we have not already made a request
// for.
let indices_to_request = validators_to_update
.iter()
.filter(|&&&pubkey| !initial_duties.iter().any(|duty| duty.pubkey == pubkey))
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.filter(|validator_index| !initial_indices_to_request.contains(validator_index))
.collect::<Vec<_>>();

let new_duties = if !indices_to_request.is_empty() {
// Filter the initial duties by their relevance so that we don't hit the warning below about
// overwriting duties. There was previously a bug here.
let new_initial_duties = response
.data
.into_iter()
.filter(|duty| validators_to_update.contains(&&duty.pubkey));

let mut new_duties = if !indices_to_request.is_empty() {
post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice())
.await?
.data
.into_iter()
.chain(response.data)
.collect::<Vec<_>>()
} else {
response.data
vec![]
};
new_duties.extend(new_initial_duties);

drop(fetch_timer);

Expand All @@ -854,26 +856,53 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
// Update the duties service with the new `DutyAndProof` messages.
let mut attesters = duties_service.attesters.write();
let mut already_warned = Some(());
let current_slot = duties_service
.slot_clock
.now_or_genesis()
.unwrap_or_default();
for duty in &new_duties {
let attester_map = attesters.entry(duty.pubkey).or_default();

// Create initial entries in the map without selection proofs. We'll compute them in the
// background later to avoid creating a thundering herd of signing threads whenever new
// duties are computed.
let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone());
let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone(), current_slot);

match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut occupied) => {
let mut_value = occupied.get_mut();
let (prior_dependent_root, prior_duty_and_proof) = &mut_value;

// Guard against overwriting an existing value for the same duty. If we did
// overwrite we could lose a selection proof or information from
// `subscription_slots`. Hitting this branch should be prevented by our logic for
// fetching duties only for unknown indices.
if dependent_root == *prior_dependent_root
&& prior_duty_and_proof.duty == duty_and_proof.duty
{
warn!(
log,
"Redundant attester duty update";
"dependent_root" => %dependent_root,
"validator_index" => duty.validator_index,
);
continue;
}

if let Some((prior_dependent_root, _)) =
attester_map.insert(epoch, (dependent_root, duty_and_proof))
{
// Using `already_warned` avoids excessive logs.
if dependent_root != prior_dependent_root && already_warned.take().is_some() {
warn!(
log,
"Attester duties re-org";
"prior_dependent_root" => %prior_dependent_root,
"dependent_root" => %dependent_root,
"msg" => "this may happen from time to time"
)
// Using `already_warned` avoids excessive logs.
if dependent_root != *prior_dependent_root && already_warned.take().is_some() {
warn!(
log,
"Attester duties re-org";
"prior_dependent_root" => %prior_dependent_root,
"dependent_root" => %dependent_root,
"msg" => "this may happen from time to time"
)
}
*mut_value = (dependent_root, duty_and_proof);
}
hash_map::Entry::Vacant(vacant) => {
vacant.insert((dependent_root, duty_and_proof));
}
}
}
Expand Down Expand Up @@ -1030,20 +1059,21 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
// Sign selection proofs (serially).
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
.then(|duty| async {
DutyAndProof::new_with_selection_proof(
duty,
let opt_selection_proof = make_selection_proof(
&duty,
&duties_service.validator_store,
&duties_service.spec,
)
.await
.await?;
Ok((duty, opt_selection_proof))
})
.collect::<Vec<_>>()
.await;

// Add to attesters store.
let mut attesters = duties_service.attesters.write();
for result in duty_and_proof_results {
let duty_and_proof = match result {
let (duty, selection_proof) = match result {
Ok(duty_and_proof) => duty_and_proof,
Err(Error::FailedToProduceSelectionProof(
ValidatorStoreError::UnknownPubkey(pubkey),
Expand Down Expand Up @@ -1071,12 +1101,12 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
}
};

let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default();
let epoch = duty_and_proof.duty.slot.epoch(E::slots_per_epoch());
let attester_map = attesters.entry(duty.pubkey).or_default();
let epoch = duty.slot.epoch(E::slots_per_epoch());
match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed.
let Some(selection_proof) = duty_and_proof.selection_proof else {
let Some(selection_proof) = selection_proof else {
continue;
};

Expand All @@ -1097,6 +1127,14 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
}
}
hash_map::Entry::Vacant(entry) => {
// This probably shouldn't happen, but we have enough info to fill in the
// entry so we may as well.
let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
let duty_and_proof = DutyAndProof {
duty,
selection_proof,
subscription_slots,
};
entry.insert((dependent_root, duty_and_proof));
}
}
Expand Down Expand Up @@ -1320,13 +1358,15 @@ mod test {

#[test]
fn subscription_slots_exact() {
// Set current slot in the past so no duties are considered expired.
let current_slot = Slot::new(0);
for duty_slot in [
Slot::new(32),
Slot::new(33),
Slot::new(47),
Slot::new(99),
Slot::new(1002003),
] {
let subscription_slots = SubscriptionSlots::new(duty_slot);
let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot);

// Run twice to check idempotence (subscription slots shouldn't be marked as done until
// we mark them manually).
Expand Down Expand Up @@ -1360,8 +1400,9 @@ mod test {
#[test]
fn subscription_slots_mark_multiple() {
for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
let current_slot = Slot::new(0);
let duty_slot = Slot::new(64);
let subscription_slots = SubscriptionSlots::new(duty_slot);
let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot);

subscription_slots.record_successful_subscription_at(duty_slot - offset);

Expand All @@ -1376,4 +1417,22 @@ mod test {
}
}
}

/// Test the boundary condition where all subscription slots are *just* expired.
#[test]
fn subscription_slots_expired() {
let current_slot = Slot::new(100);
let duty_slot = current_slot + ATTESTATION_SUBSCRIPTION_OFFSETS[0];
let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot);
for offset in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter() {
let slot = duty_slot - offset;
assert!(!subscription_slots.should_send_subscription_at(slot));
}
assert!(subscription_slots.slots.is_empty());

// If the duty slot is 1 later, we get a non-empty set of duties.
let subscription_slots = SubscriptionSlots::new(duty_slot + 1, current_slot);
assert_eq!(subscription_slots.slots.len(), 1);
assert!(subscription_slots.should_send_subscription_at(current_slot + 1),);
}
}

0 comments on commit cff6258

Please sign in to comment.