Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cannon): Create WITHDRAWAL event #191

Merged
merged 2 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,23 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
c.beacon,
clientMeta,
),
v2.NewWithdrawalDeriver(
c.log,
&c.Config.Derivers.WithdrawalConfig,
iterator.NewCheckpointIterator(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_WITHDRAWAL,
c.coordinatorClient,
wallclock,
&checkpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
),
c.beacon,
clientMeta,
),
}

c.eventDerivers = eventDerivers
Expand Down
3 changes: 1 addition & 2 deletions pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ const (
)

type AttesterSlashingDeriverConfig struct {
Enabled bool `yaml:"enabled" default:"true"`
HeadSlotLag *uint64 `yaml:"headSlotLag" default:"5"`
Enabled bool `yaml:"enabled" default:"true"`
}

type AttesterSlashingDeriver struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ const (
)

type BLSToExecutionChangeDeriverConfig struct {
Enabled bool `yaml:"enabled" default:"true"`
HeadSlotLag *uint64 `yaml:"headSlotLag" default:"5"`
Enabled bool `yaml:"enabled" default:"true"`
}

type BLSToExecutionChangeDeriver struct {
Expand Down
3 changes: 1 addition & 2 deletions pkg/cannon/deriver/beacon/eth/v2/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ const (
)

type DepositDeriverConfig struct {
Enabled bool `yaml:"enabled" default:"true"`
HeadSlotLag *uint64 `yaml:"headSlotLag" default:"5"`
Enabled bool `yaml:"enabled" default:"true"`
}

type DepositDeriver struct {
Expand Down
3 changes: 1 addition & 2 deletions pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ type ExecutionTransactionDeriver struct {
}

type ExecutionTransactionDeriverConfig struct {
Enabled bool `yaml:"enabled" default:"true"`
HeadSlotLag *uint64 `yaml:"headSlotLag" default:"5"`
Enabled bool `yaml:"enabled" default:"true"`
}

const (
Expand Down
3 changes: 1 addition & 2 deletions pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ const (
)

type ProposerSlashingDeriverConfig struct {
Enabled bool `yaml:"enabled" default:"true"`
HeadSlotLag *uint64 `yaml:"headSlotLag" default:"5"`
Enabled bool `yaml:"enabled" default:"true"`
}

type ProposerSlashingDeriver struct {
Expand Down
3 changes: 1 addition & 2 deletions pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ const (
)

type VoluntaryExitDeriverConfig struct {
Enabled bool `yaml:"enabled" default:"true"`
HeadSlotLag *uint64 `yaml:"headSlotLag" default:"5"`
Enabled bool `yaml:"enabled" default:"true"`
}

type VoluntaryExitDeriver struct {
Expand Down
278 changes: 278 additions & 0 deletions pkg/cannon/deriver/beacon/eth/v2/withdrawal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
package v2

import (
"context"
"time"

"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/phase0"
backoff "github.com/cenkalti/backoff/v4"
"github.com/ethpandaops/xatu/pkg/cannon/ethereum"
"github.com/ethpandaops/xatu/pkg/cannon/iterator"
xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

const (
WithdrawalDeriverName = xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_WITHDRAWAL
)

type WithdrawalDeriverConfig struct {
Enabled bool `yaml:"enabled" default:"true"`
}

type WithdrawalDeriver struct {
log logrus.FieldLogger
cfg *WithdrawalDeriverConfig
iterator *iterator.CheckpointIterator
onEventsCallbacks []func(ctx context.Context, events []*xatu.DecoratedEvent) error
onLocationCallbacks []func(ctx context.Context, location uint64) error
beacon *ethereum.BeaconNode
clientMeta *xatu.ClientMeta
}

func NewWithdrawalDeriver(log logrus.FieldLogger, config *WithdrawalDeriverConfig, iter *iterator.CheckpointIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *WithdrawalDeriver {
return &WithdrawalDeriver{
log: log.WithField("module", "cannon/event/beacon/eth/v2/withdrawal"),
cfg: config,
iterator: iter,
beacon: beacon,
clientMeta: clientMeta,
}
}

func (b *WithdrawalDeriver) CannonType() xatu.CannonType {
return WithdrawalDeriverName
}

func (b *WithdrawalDeriver) Name() string {
return WithdrawalDeriverName.String()
}

func (b *WithdrawalDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) {
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}

func (b *WithdrawalDeriver) OnLocationUpdated(ctx context.Context, fn func(ctx context.Context, location uint64) error) {
b.onLocationCallbacks = append(b.onLocationCallbacks, fn)
}

func (b *WithdrawalDeriver) Start(ctx context.Context) error {
if !b.cfg.Enabled {
b.log.Info("Withdrawal deriver disabled")

return nil
}

b.log.Info("Withdrawal deriver enabled")

// Start our main loop
go b.run(ctx)

return nil
}

func (b *WithdrawalDeriver) Stop(ctx context.Context) error {
return nil
}

func (b *WithdrawalDeriver) run(ctx context.Context) {
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = 3 * time.Minute

for {
select {
case <-ctx.Done():
return
default:
operation := func() error {
time.Sleep(100 * time.Millisecond)

if err := b.beacon.Synced(ctx); err != nil {
return err
}

// Get the next slot
location, lookAhead, err := b.iterator.Next(ctx)
if err != nil {
return err
}

// Look ahead
b.lookAheadAtLocation(ctx, lookAhead)

for _, fn := range b.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockWithdrawal().GetEpoch()); errr != nil {
b.log.WithError(errr).Error("Failed to send location")
}
}

// Process the epoch
events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV2BeaconBlockWithdrawal().GetEpoch()))
if err != nil {
b.log.WithError(err).Error("Failed to process epoch")

return err
}

for _, fn := range b.onEventsCallbacks {
if errr := fn(ctx, events); errr != nil {
return errors.Wrapf(errr, "failed to send events")
}
}

// Update our location
if err := b.iterator.UpdateLocation(ctx, location); err != nil {
return err
}

bo.Reset()

return nil
}

if err := backoff.Retry(operation, bo); err != nil {
b.log.WithError(err).Error("Failed to process epoch")
}
}
}
}

func (b *WithdrawalDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
sp, err := b.beacon.Node().Spec()
if err != nil {
return nil, errors.Wrap(err, "failed to obtain spec")
}

allEvents := []*xatu.DecoratedEvent{}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch))

events, err := b.processSlot(ctx, slot)
if err != nil {
return nil, errors.Wrapf(err, "failed to process slot %d", slot)
}

allEvents = append(allEvents, events...)
}

return allEvents, nil
}

func (b *WithdrawalDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) {
// Get the block
block, err := b.beacon.GetBeaconBlock(ctx, xatuethv1.SlotAsString(slot))
if err != nil {
return nil, errors.Wrapf(err, "failed to get beacon block for slot %d", slot)
}

if block == nil {
return []*xatu.DecoratedEvent{}, nil
}

blockIdentifier, err := GetBlockIdentifier(block, b.beacon.Metadata().Wallclock())
if err != nil {
return nil, errors.Wrapf(err, "failed to get block identifier for slot %d", slot)
}

events := []*xatu.DecoratedEvent{}

withdrawals, err := b.getWithdrawals(ctx, block)
if err != nil {
return nil, errors.Wrap(err, "failed to get withdrawals")
}

for _, withdrawal := range withdrawals {
event, err := b.createEvent(ctx, withdrawal, blockIdentifier)
if err != nil {
return nil, errors.Wrapf(err, "failed to create event for withdrawal %s", withdrawal.String())
}

events = append(events, event)
}

return events, nil
}

// lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required.
func (b *WithdrawalDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) {
if locations == nil {
return
}

for _, location := range locations {
// Get the next look ahead epoch
epoch := phase0.Epoch(location.GetEthV2BeaconBlockVoluntaryExit().GetEpoch())

sp, err := b.beacon.Node().Spec()
if err != nil {
b.log.WithError(err).WithField("epoch", epoch).Warn("Failed to look ahead at epoch")

return
}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch); i++ {
slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch))

// Add the block to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlock(xatuethv1.SlotAsString(slot))
}
}
}

func (b *WithdrawalDeriver) getWithdrawals(ctx context.Context, block *spec.VersionedSignedBeaconBlock) ([]*xatuethv1.WithdrawalV2, error) {
withdrawals := []*xatuethv1.WithdrawalV2{}

switch block.Version {
case spec.DataVersionPhase0, spec.DataVersionAltair, spec.DataVersionBellatrix:
return withdrawals, nil
}

for _, withdrawal := range block.Capella.Message.Body.ExecutionPayload.Withdrawals {
withdrawals = append(withdrawals, &xatuethv1.WithdrawalV2{
Index: &wrapperspb.UInt64Value{Value: uint64(withdrawal.Index)},
ValidatorIndex: &wrapperspb.UInt64Value{Value: uint64(withdrawal.ValidatorIndex)},
Address: withdrawal.Address.String(),
Amount: &wrapperspb.UInt64Value{Value: uint64(withdrawal.Amount)},
})
}

return withdrawals, nil
}

func (b *WithdrawalDeriver) createEvent(ctx context.Context, withdrawal *xatuethv1.WithdrawalV2, identifier *xatu.BlockIdentifier) (*xatu.DecoratedEvent, error) {
// Make a clone of the metadata
metadata, ok := proto.Clone(b.clientMeta).(*xatu.ClientMeta)
if !ok {
return nil, errors.New("failed to clone client metadata")
}

decoratedEvent := &xatu.DecoratedEvent{
Event: &xatu.Event{
Name: xatu.Event_BEACON_API_ETH_V2_BEACON_BLOCK_WITHDRAWAL,
DateTime: timestamppb.New(time.Now()),
Id: uuid.New().String(),
},
Meta: &xatu.Meta{
Client: metadata,
},
Data: &xatu.DecoratedEvent_EthV2BeaconBlockWithdrawal{
EthV2BeaconBlockWithdrawal: withdrawal,
},
}

decoratedEvent.Meta.Client.AdditionalData = &xatu.ClientMeta_EthV2BeaconBlockWithdrawal{
EthV2BeaconBlockWithdrawal: &xatu.ClientMeta_AdditionalEthV2BeaconBlockWithdrawalData{
Block: identifier,
},
}

return decoratedEvent, nil
}
1 change: 1 addition & 0 deletions pkg/cannon/deriver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Config struct {
ExecutionTransactionConfig v2.ExecutionTransactionDeriverConfig `yaml:"executionTransaction"`
ProposerSlashingConfig v2.ProposerSlashingDeriverConfig `yaml:"proposerSlashing"`
VoluntaryExitConfig v2.VoluntaryExitDeriverConfig `yaml:"voluntaryExit"`
WithdrawalConfig v2.WithdrawalDeriverConfig `yaml:"withdrawal"`
}

func (c *Config) Validate() error {
Expand Down
1 change: 1 addition & 0 deletions pkg/cannon/deriver/event_deriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ var _ EventDeriver = &v2.DepositDeriver{}
var _ EventDeriver = &v2.VoluntaryExitDeriver{}
var _ EventDeriver = &v2.ExecutionTransactionDeriver{}
var _ EventDeriver = &v2.BLSToExecutionChangeDeriver{}
var _ EventDeriver = &v2.WithdrawalDeriver{}
8 changes: 8 additions & 0 deletions pkg/cannon/iterator/checkpoint_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ func (c *CheckpointIterator) getEpochFromLocation(location *xatu.CannonLocation)
return phase0.Epoch(location.GetEthV2BeaconBlockVoluntaryExit().Epoch), nil
case xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_DEPOSIT:
return phase0.Epoch(location.GetEthV2BeaconBlockDeposit().Epoch), nil
case xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_WITHDRAWAL:
return phase0.Epoch(location.GetEthV2BeaconBlockWithdrawal().Epoch), nil
default:
return 0, errors.Errorf("unknown cannon type %s", location.Type)
}
Expand Down Expand Up @@ -226,6 +228,12 @@ func (c *CheckpointIterator) createLocationFromEpochNumber(epoch phase0.Epoch) (
Epoch: uint64(epoch),
},
}
case xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK_WITHDRAWAL:
location.Data = &xatu.CannonLocation_EthV2BeaconBlockWithdrawal{
EthV2BeaconBlockWithdrawal: &xatu.CannonLocationEthV2BeaconBlockWithdrawal{
Epoch: uint64(epoch),
},
}
default:
return location, errors.Errorf("unknown cannon type %s", location.Type)
}
Expand Down
Loading
Loading