Skip to content

Commit

Permalink
feat(sentry): Fetch proposer duties
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Apr 2, 2024
1 parent 7555492 commit f86a6ae
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 2 deletions.
2 changes: 2 additions & 0 deletions deploy/migrations/clickhouse/030_v1_proposer_duties.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS beacon_api_eth_v1_proposer_duty ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS default.beacon_api_eth_v1_proposer_duty_local ON CLUSTER '{cluster}';
73 changes: 73 additions & 0 deletions deploy/migrations/clickhouse/030_v1_proposer_duties.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
CREATE TABLE default.beacon_api_eth_v1_proposer_duty_local on cluster '{cluster}'
(
unique_key Int64,
updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)),
event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)),
slot UInt32 CODEC(DoubleDelta, ZSTD(1)),
slot_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)),
epoch UInt32 CODEC(DoubleDelta, ZSTD(1)),
epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)),
proposer_validator_index UInt32 CODEC(ZSTD(1)),
proposer_pubkey String CODEC(ZSTD(1)),
meta_client_name LowCardinality(String),
meta_client_id String CODEC(ZSTD(1)),
meta_client_version LowCardinality(String),
meta_client_implementation LowCardinality(String),
meta_client_os LowCardinality(String),
meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)),
meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)),
meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)),
meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)),
meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)),
meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)),
meta_network_name LowCardinality(String),
meta_consensus_version LowCardinality(String),
meta_consensus_version_major LowCardinality(String),
meta_consensus_version_minor LowCardinality(String),
meta_consensus_version_patch LowCardinality(String),
meta_consensus_implementation LowCardinality(String),
meta_labels Map(String, String) CODEC(ZSTD(1))
) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time)
PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY (slot_start_date_time, unique_key, meta_network_name);

ALTER TABLE default.beacon_api_eth_v1_proposer_duty_local ON CLUSTER '{cluster}'
MODIFY COMMENT 'Contains a proposer duty from a beacon block.',
COMMENT COLUMN unique_key 'Unique key for the row generated from seahash',
COMMENT COLUMN updated_date_time 'When this row was last updated',
COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node',
COMMENT COLUMN slot 'The slot number from beacon block payload',
COMMENT COLUMN slot_start_date_time 'The wall clock time when the slot started',
COMMENT COLUMN epoch 'The epoch number from beacon block payload',
COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started',
COMMENT COLUMN proposer_validator_index 'The validator index from the proposer duty payload',
COMMENT COLUMN proposer_pubkey 'The BLS public key of the validator from the proposer duty payload',
COMMENT COLUMN meta_client_name 'Name of the client that generated the event',
COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.',
COMMENT COLUMN meta_client_version 'Version of the client that generated the event',
COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event',
COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event',
COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event',
COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event',
COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event',
COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event',
COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event',
COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event',
COMMENT COLUMN meta_network_id 'Ethereum network ID',
COMMENT COLUMN meta_network_name 'Ethereum network name',
COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event',
COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event',
COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event',
COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event',
COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event',
COMMENT COLUMN meta_labels 'Labels associated with the event';

CREATE TABLE beacon_api_eth_v1_proposer_duty on cluster '{cluster}' AS beacon_api_eth_v1_proposer_duty_local
ENGINE = Distributed('{cluster}', default, beacon_api_eth_v1_proposer_duty_local, rand());
7 changes: 7 additions & 0 deletions pkg/proto/eth/v1/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package v1

var (
StateIDFinalized = "finalized"
StateIDHead = "head"
StateIDJustified = "justified"
)
5 changes: 3 additions & 2 deletions pkg/sentry/ethereum/services/duties.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,13 @@ func (m *DutiesService) Start(ctx context.Context) error {

// Anticipate the next epoch and fetch the next epoch's beacon committees.
m.metadata.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) {
// Sleep until just before the start of the next epoch to fetch the next epoch's duties.
time.Sleep(epoch.TimeWindow().EndsIn() - 2*time.Second)

m.log.
WithField("current_epoch", epoch.Number()).
WithField("next_epoch", epoch.Number()+1).
Debug("Fetching beacon committees for next epoch")
// Sleep until just before the start of the next epoch to fetch the next epoch's duties.
time.Sleep(epoch.TimeWindow().EndsIn() - 2*time.Second)

if err := m.fetchBeaconCommittee(ctx, phase0.Epoch(epoch.Number()+1), true); err != nil {
m.log.WithError(err).Warn("Failed to fetch required epoch duties in anticipation of an epoch change")
Expand Down
125 changes: 125 additions & 0 deletions pkg/sentry/event/beacon/eth/v1/proposer_duty.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package event

import (
"context"
"encoding/hex"
"fmt"
"time"

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/ethpandaops/xatu/pkg/sentry/ethereum"
"github.com/google/uuid"
ttlcache "github.com/jellydator/ttlcache/v3"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

type ProposerDuty struct {
log logrus.FieldLogger

now time.Time

duty *v1.ProposerDuty
epoch phase0.Epoch

beacon *ethereum.BeaconNode
clientMeta *xatu.ClientMeta
duplicateCache *ttlcache.Cache[string, time.Time]
id uuid.UUID
}

func NewProposerDuty(log logrus.FieldLogger, duty *v1.ProposerDuty, epoch phase0.Epoch, now time.Time, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta, duplicateCache *ttlcache.Cache[string, time.Time]) *ProposerDuty {
return &ProposerDuty{
log: log.WithField("event", "BEACON_API_ETH_V1_PROPOSER_DUTY"),
now: now,
duty: duty,
epoch: epoch,
duplicateCache: duplicateCache,
beacon: beacon,
clientMeta: clientMeta,
id: uuid.New(),
}
}

func (e *ProposerDuty) Decorate(ctx context.Context) (*xatu.DecoratedEvent, error) {
// Make a clone of the metadata
metadata, ok := proto.Clone(e.clientMeta).(*xatu.ClientMeta)
if !ok {
return nil, fmt.Errorf("failed to clone client metadata")
}

decoratedEvent := &xatu.DecoratedEvent{
Event: &xatu.Event{
Name: xatu.Event_BEACON_API_ETH_V1_PROPOSER_DUTY,
DateTime: timestamppb.New(e.now),
Id: e.id.String(),
},
Meta: &xatu.Meta{
Client: metadata,
},
Data: &xatu.DecoratedEvent_EthV1ProposerDuty{
EthV1ProposerDuty: &xatuethv1.ProposerDuty{
Slot: wrapperspb.UInt64(uint64(e.duty.Slot)),
Pubkey: fmt.Sprintf("0x%s", hex.EncodeToString(e.duty.PubKey[:])),
ValidatorIndex: wrapperspb.UInt64(uint64(e.duty.ValidatorIndex)),
},
},
}

additionalData, err := e.getAdditionalData(ctx)
if err != nil {
e.log.WithError(err).Error("Failed to get extra proposer duty data")
} else {
decoratedEvent.Meta.Client.AdditionalData = &xatu.ClientMeta_EthV1ProposerDuty{
EthV1ProposerDuty: additionalData,
}
}

return decoratedEvent, nil
}

func (e *ProposerDuty) ShouldIgnore(ctx context.Context) (bool, error) {
if err := e.beacon.Synced(ctx); err != nil {
return true, err
}

key := fmt.Sprintf("%d-%d", e.epoch, e.duty.Slot)

item, retrieved := e.duplicateCache.GetOrSet(key, e.now, ttlcache.WithTTL[string, time.Time](ttlcache.DefaultTTL))
if retrieved {
e.log.WithFields(logrus.Fields{
"epoch": e.epoch,
"time_since_first_item": time.Since(item.Value()),
}).Debug("Duplicate proposer duty event received")

return true, nil
}

return false, nil
}

func (e *ProposerDuty) getAdditionalData(_ context.Context) (*xatu.ClientMeta_AdditionalEthV1ProposerDutyData, error) {
extra := &xatu.ClientMeta_AdditionalEthV1ProposerDutyData{
StateId: xatuethv1.StateIDHead,
}

slot := e.beacon.Metadata().Wallclock().Slots().FromNumber(uint64(e.duty.Slot))
epoch := e.beacon.Metadata().Wallclock().Epochs().FromSlot(uint64(e.duty.Slot))

extra.Slot = &xatu.SlotV2{
StartDateTime: timestamppb.New(slot.TimeWindow().Start()),
Number: &wrapperspb.UInt64Value{Value: uint64(e.duty.Slot)},
}

extra.Epoch = &xatu.EpochV2{
Number: &wrapperspb.UInt64Value{Value: epoch.Number()},
StartDateTime: timestamppb.New(epoch.TimeWindow().Start()),
}

return extra, nil
}
79 changes: 79 additions & 0 deletions pkg/sentry/proposer_duty.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package sentry

import (
"context"
"time"

eth2v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
v1 "github.com/ethpandaops/xatu/pkg/sentry/event/beacon/eth/v1"
)

func (s *Sentry) startProposerDutyWatcher(ctx context.Context) error {
if !s.Config.BeaconCommittees.Enabled {
return nil
}

s.beacon.OnReady(ctx, func(ctx context.Context) error {
// Subscribe to future proposer duty events.
s.beacon.Duties().OnProposerDuties(func(epoch phase0.Epoch, duties []*eth2v1.ProposerDuty) error {
if err := s.createNewProposerDutyEvent(ctx, epoch, duties); err != nil {
s.log.WithError(err).Error("Failed to create new proposer duties event")
}

return nil
})

// Grab the current epoch duties.
now := s.beacon.Metadata().Wallclock().Epochs().Current()
epochs := []phase0.Epoch{
phase0.Epoch(now.Number()),
}

for _, epoch := range epochs {
duties, err := s.beacon.Duties().GetProposerDuties(epoch)
if err != nil {
s.log.WithError(err).Error("Failed to obtain proposer duties")

continue
}

if err := s.createNewProposerDutyEvent(ctx, epoch, duties); err != nil {
s.log.WithError(err).Error("Failed to create new proposer duties event")
}
}

return nil
})

return nil
}

func (s *Sentry) createNewProposerDutyEvent(ctx context.Context, epoch phase0.Epoch, duties []*eth2v1.ProposerDuty) error {
now := time.Now()

// Create an event for every duty.
for _, duty := range duties {
meta, err := s.createNewClientMeta(ctx)
if err != nil {
s.log.WithError(err).Error("Failed to create client meta when handling beacon committee event")

continue
}

event := v1.NewProposerDuty(s.log, duty, epoch, now, s.beacon, meta, s.duplicateCache.BeaconEthV1BeaconCommittee)

decoratedEvent, err := event.Decorate(ctx)
if err != nil {
s.log.WithError(err).Error("Failed to decorate beacon committee event")

return err
}

if err := s.handleNewDecoratedEvent(ctx, decoratedEvent); err != nil {
s.log.WithError(err).Error("Failed to handle decorated beacon committee event")
}
}

return nil
}

0 comments on commit f86a6ae

Please sign in to comment.