Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cannon): Add Inclusion Distance to elaborated attestation #270

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,8 @@ transforms:
.committee_index = .data.data.index
.beacon_block_root = .data.data.beacon_block_root
.slot = .data.data.slot

.inclusion_distance = .meta.client.additional_data.inclusion_distance

slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
Expand Down Expand Up @@ -1693,6 +1695,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.target_root = .data.data.target.root

key, err = .block_root + .block_slot + .position_in_block + .signature + .beacon_block_root + .slot + .committee_index + .source_root + .target_root
if err != null {
.error = err
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE canonical_beacon_elaborated_attestation_local on cluster '{cluster}'
DROP COLUMN inclusion_distance;

ALTER TABLE canonical_beacon_elaborated_attestation on cluster '{cluster}'
DROP COLUMN inclusion_distance;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE canonical_beacon_elaborated_attestation_local on cluster '{cluster}'
ADD COLUMN inclusion_distance UInt16 CODEC(DoubleDelta, ZSTD(1)) AFTER position_in_block;

ALTER TABLE canonical_beacon_elaborated_attestation on cluster '{cluster}'
ADD COLUMN inclusion_distance UInt16 CODEC(DoubleDelta, ZSTD(1)) AFTER position_in_block;
27 changes: 23 additions & 4 deletions pkg/cannon/deriver/beacon/eth/v2/elaborated_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,15 @@ func (b *ElaboratedAttestationDeriver) processSlot(ctx context.Context, slot pha
return nil, errors.Wrapf(err, "failed to get elaborated attestations for slot %d", slot)
}

attestations, err := block.Attestations()
if err != nil {
return nil, errors.Wrapf(err, "failed to get attestations for block %s", block.String())
}

if len(events) != len(attestations) {
return nil, errors.Errorf("number of events (%d) does not match number of attestations (%d)", len(events), len(attestations))
}

return events, nil
}

Expand Down Expand Up @@ -309,6 +318,15 @@ func (b *ElaboratedAttestationDeriver) getElaboratedAttestations(ctx context.Con
indexes = append(indexes, wrapperspb.UInt64(uint64(validatorIndex)))
}

duty, err := b.beacon.Duties().GetBeaconCommitteeByEpochAndIndex(ctx, phase0.Epoch(epoch.Number()), attestation.Data.Index)
if err != nil {
return nil, errors.Wrapf(err, "failed to get beacon committee for epoch %d and index %d", epoch.Number(), attestation.Data.Index)
}

// Calculate the inclusion distance of the attestation by comparing the slot of the attestation with the slot of when the validator
// was scheduled to attest.
inclusionDistance := uint64(attestation.Data.Slot) - uint64(duty.Slot)

elaboratedAttestation := &xatuethv1.ElaboratedAttestation{
Signature: attestation.Signature.String(),
Data: &xatuethv1.AttestationDataV2{
Expand All @@ -327,7 +345,7 @@ func (b *ElaboratedAttestationDeriver) getElaboratedAttestations(ctx context.Con
ValidatorIndexes: indexes,
}

event, err := b.createEventFromElaboratedAttestation(ctx, elaboratedAttestation, uint64(positionInBlock), blockIdentifier)
event, err := b.createEventFromElaboratedAttestation(ctx, elaboratedAttestation, uint64(positionInBlock), blockIdentifier, inclusionDistance)
if err != nil {
return nil, errors.Wrapf(err, "failed to create event for attestation %s", attestation.String())
}
Expand All @@ -338,7 +356,7 @@ func (b *ElaboratedAttestationDeriver) getElaboratedAttestations(ctx context.Con
return events, nil
}

func (b *ElaboratedAttestationDeriver) createEventFromElaboratedAttestation(ctx context.Context, attestation *xatuethv1.ElaboratedAttestation, positionInBlock uint64, blockIdentifier *xatu.BlockIdentifier) (*xatu.DecoratedEvent, error) {
func (b *ElaboratedAttestationDeriver) createEventFromElaboratedAttestation(ctx context.Context, attestation *xatuethv1.ElaboratedAttestation, positionInBlock uint64, blockIdentifier *xatu.BlockIdentifier, inclusionDistance uint64) (*xatu.DecoratedEvent, error) {
// Make a clone of the metadata
metadata, ok := proto.Clone(b.clientMeta).(*xatu.ClientMeta)
if !ok {
Expand Down Expand Up @@ -392,8 +410,9 @@ func (b *ElaboratedAttestationDeriver) createEventFromElaboratedAttestation(ctx
Number: &wrapperspb.UInt64Value{Value: epoch.Number()},
StartDateTime: timestamppb.New(epoch.TimeWindow().Start()),
},
Source: source,
Target: target,
Source: source,
Target: target,
InclusionDistance: wrapperspb.UInt64(inclusionDistance),
},
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/cannon/ethereum/services/duties.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,21 @@ func (m *DutiesService) FetchBeaconCommittee(ctx context.Context, stateID string
return committees, nil
}

func (m *DutiesService) GetBeaconCommitteeByEpochAndIndex(ctx context.Context, epoch phase0.Epoch, commiteeIndex phase0.CommitteeIndex) (*v1.BeaconCommittee, error) {
duties, err := m.FetchBeaconCommittee(context.Background(), "head", epoch)
if err != nil {
return nil, fmt.Errorf("error fetching beacon committee for epoch %d: %w", epoch, err)
}

for _, duty := range duties {
if duty.Index == commiteeIndex {
return duty, nil
}
}

return nil, fmt.Errorf("beacon committee not found")
}

func (m *DutiesService) GetValidatorIndex(epoch phase0.Epoch, slot phase0.Slot, committeeIndex phase0.CommitteeIndex, position uint64) (phase0.ValidatorIndex, error) {
if _, err := m.FetchBeaconCommittee(context.Background(), "head", epoch); err != nil {
return 0, fmt.Errorf("error fetching beacon committee for epoch %d: %w", epoch, err)
Expand Down
Loading
Loading