Skip to content

Commit

Permalink
feat(cannon): Add BEACON_API_ETH_V1_BEACON_BLOB_SIDECAR
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Oct 10, 2023
1 parent 3c0a14c commit 8a61ca1
Show file tree
Hide file tree
Showing 16 changed files with 2,564 additions and 1,899 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9
github.com/creasty/defaults v1.7.0
github.com/ethereum/go-ethereum v1.12.0
github.com/ethpandaops/beacon v0.29.0
github.com/ethpandaops/beacon v0.30.0
github.com/ethpandaops/ethcore v0.0.0-20230804013106-6453c36c8c30
github.com/ethpandaops/ethwallclock v0.3.0
github.com/go-co-op/gocron v1.27.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/ethereum/go-ethereum v1.12.0 h1:bdnhLPtqETd4m3mS8BGMNvBTf36bO5bx/hxE2zljOa0=
github.com/ethereum/go-ethereum v1.12.0/go.mod h1:/oo2X/dZLJjf2mJ6YT9wcWxa4nNJDBKDBU6sFIpx1Gs=
github.com/ethpandaops/beacon v0.29.0 h1:sUNcr2BAoqUQgKyg0PuQwH4wCiDj33n13AG6JlsV7bs=
github.com/ethpandaops/beacon v0.29.0/go.mod h1:m4zoHS/lI/g4rdGEPeXRIJdsC0HkhJmMjmNgzABfuV4=
github.com/ethpandaops/beacon v0.30.0 h1:bDzadzl7z8fE9/PCAss+BC3z/1dG2PI3dsNOEBtJVZE=
github.com/ethpandaops/beacon v0.30.0/go.mod h1:m4zoHS/lI/g4rdGEPeXRIJdsC0HkhJmMjmNgzABfuV4=
github.com/ethpandaops/ethcore v0.0.0-20230804013106-6453c36c8c30 h1:xziHHIPT9iHoocM91UXwxICQoF121cdfontVAFwtapM=
github.com/ethpandaops/ethcore v0.0.0-20230804013106-6453c36c8c30/go.mod h1:5UfUQ+9IBe1iZsk3KBoF/jtnRv7WVnMjebik4EW6ULE=
github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4=
Expand Down
18 changes: 18 additions & 0 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
aBlockprint "github.com/ethpandaops/xatu/pkg/cannon/blockprint"
"github.com/ethpandaops/xatu/pkg/cannon/coordinator"
"github.com/ethpandaops/xatu/pkg/cannon/deriver"
v1 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v1"
v2 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v2"
"github.com/ethpandaops/xatu/pkg/cannon/deriver/blockprint"
"github.com/ethpandaops/xatu/pkg/cannon/ethereum"
Expand Down Expand Up @@ -492,6 +493,23 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
clientMeta,
blockprintClient,
),
v1.NewBeaconBlobDeriver(
c.log,
&c.Config.Derivers.BeaconBlobSidecarConfig,
iterator.NewCheckpointIterator(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V1_BEACON_BLOB_SIDECAR,
c.coordinatorClient,
wallclock,
&checkpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
),
c.beacon,
clientMeta,
),
}

c.eventDerivers = eventDerivers
Expand Down
279 changes: 279 additions & 0 deletions pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
package v1

import (
"context"
"fmt"
"time"

"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
backoff "github.com/cenkalti/backoff/v4"
"github.com/ethpandaops/xatu/pkg/cannon/ethereum"
"github.com/ethpandaops/xatu/pkg/cannon/iterator"
"github.com/ethpandaops/xatu/pkg/observability"
xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

const (
BeaconBlobDeriverName = xatu.CannonType_BEACON_API_ETH_V1_BEACON_BLOB_SIDECAR
)

type BeaconBlobDeriverConfig struct {
Enabled bool `yaml:"enabled" default:"false"`
}

type BeaconBlobDeriver struct {
log logrus.FieldLogger
cfg *BeaconBlobDeriverConfig
iterator *iterator.CheckpointIterator
onEventsCallbacks []func(ctx context.Context, events []*xatu.DecoratedEvent) error
beacon *ethereum.BeaconNode
clientMeta *xatu.ClientMeta
}

func NewBeaconBlobDeriver(log logrus.FieldLogger, config *BeaconBlobDeriverConfig, iter *iterator.CheckpointIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *BeaconBlobDeriver {
return &BeaconBlobDeriver{
log: log.WithField("module", "cannon/event/beacon/eth/v1/beacon_blob"),
cfg: config,
iterator: iter,
beacon: beacon,
clientMeta: clientMeta,
}
}

func (b *BeaconBlobDeriver) CannonType() xatu.CannonType {
return BeaconBlobDeriverName
}

func (b *BeaconBlobDeriver) Name() string {
return BeaconBlobDeriverName.String()
}

func (b *BeaconBlobDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) {
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}

func (b *BeaconBlobDeriver) Start(ctx context.Context) error {
if !b.cfg.Enabled {
b.log.Info("Beacon blob deriver disabled")

return nil
}

b.log.Info("Beacon blob deriver enabled")

// Start our main loop
go b.run(ctx)

return nil
}

func (b *BeaconBlobDeriver) Stop(ctx context.Context) error {
return nil
}

func (b *BeaconBlobDeriver) run(rctx context.Context) {
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = 3 * time.Minute

tracer := observability.Tracer()

for {
select {
case <-rctx.Done():
return
default:
operation := func() error {
ctx, span := tracer.Start(rctx, fmt.Sprintf("Derive %s", b.Name()),
trace.WithAttributes(
attribute.String("network", string(b.beacon.Metadata().Network.Name))),
)
defer span.End()

time.Sleep(100 * time.Millisecond)

if err := b.beacon.Synced(ctx); err != nil {
span.SetStatus(codes.Error, err.Error())

return err
}

// Get the next slot
location, _, err := b.iterator.Next(ctx)
if err != nil {
span.SetStatus(codes.Error, err.Error())

return err
}

// Process the epoch
events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV1BeaconBlobSidecar().GetEpoch()))
if err != nil {
b.log.WithError(err).Error("Failed to process epoch")

span.SetStatus(codes.Error, err.Error())

return err
}

// Send the events
for _, fn := range b.onEventsCallbacks {
if err := fn(ctx, events); err != nil {
span.SetStatus(codes.Error, err.Error())

return errors.Wrap(err, "failed to send events")
}
}

// Update our location
if err := b.iterator.UpdateLocation(ctx, location); err != nil {
span.SetStatus(codes.Error, err.Error())

return err
}

bo.Reset()

return nil
}

if err := backoff.Retry(operation, bo); err != nil {
b.log.WithError(err).Error("Failed to process location")
}
}
}
}

func (b *BeaconBlobDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
ctx, span := observability.Tracer().Start(ctx,
"BeaconBlobDeriver.processEpoch",
trace.WithAttributes(attribute.Int64("epoch", int64(epoch))),
)
defer span.End()

sp, err := b.beacon.Node().Spec()
if err != nil {
return nil, errors.Wrap(err, "failed to obtain spec")
}

allEvents := []*xatu.DecoratedEvent{}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch-1); i++ {
slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch))

events, err := b.processSlot(ctx, slot)
if err != nil {
return nil, errors.Wrapf(err, "failed to process slot %d", slot)
}

allEvents = append(allEvents, events...)
}

return allEvents, nil
}

func (b *BeaconBlobDeriver) processSlot(ctx context.Context, slot phase0.Slot) ([]*xatu.DecoratedEvent, error) {
ctx, span := observability.Tracer().Start(ctx,
"BeaconBlobDeriver.processSlot",
trace.WithAttributes(attribute.Int64("slot", int64(slot))),
)
defer span.End()

// Get the block
blobs, err := b.beacon.Node().FetchBeaconBlockBlobs(ctx, xatuethv1.SlotAsString(slot))
if err != nil {
return nil, errors.Wrapf(err, "failed to get beacon block for slot %d", slot)
}

if blobs == nil {
return []*xatu.DecoratedEvent{}, nil
}

events := []*xatu.DecoratedEvent{}

for _, blob := range blobs {
event, err := b.createEventFromBlob(ctx, blob)
if err != nil {
return nil, errors.Wrapf(err, "failed to create event from block for slot %d", slot)
}

events = append(events, event)
}

return events, nil
}

func (b *BeaconBlobDeriver) createEventFromBlob(ctx context.Context, blob *deneb.BlobSidecar) (*xatu.DecoratedEvent, error) {
// Make a clone of the metadata
metadata, ok := proto.Clone(b.clientMeta).(*xatu.ClientMeta)
if !ok {
return nil, errors.New("failed to clone client metadata")
}

decoratedEvent := &xatu.DecoratedEvent{
Event: &xatu.Event{
Name: xatu.Event_BEACON_API_ETH_V1_BEACON_BLOB_SIDECAR,
DateTime: timestamppb.New(time.Now()),
Id: uuid.New().String(),
},
Meta: &xatu.Meta{
Client: metadata,
},
Data: &xatu.DecoratedEvent_EthV1BeaconBlockBlobSidecar{
EthV1BeaconBlockBlobSidecar: &xatuethv1.BlobSidecar{
Slot: &wrapperspb.UInt64Value{Value: uint64(blob.Slot)},
Data: blob.Blob[:],
Index: &wrapperspb.UInt64Value{Value: uint64(blob.Index)},
BlockRoot: blob.BlockRoot.String(),
BlockParentRoot: blob.BlockParentRoot.String(),
ProposerIndex: &wrapperspb.UInt64Value{Value: uint64(blob.ProposerIndex)},
KzgCommitment: blob.KzgCommitment.String(),
KzgProof: blob.KzgProof.String(),
},
},
}

additionalData, err := b.getAdditionalData(ctx, blob)
if err != nil {
b.log.WithError(err).Error("Failed to get extra beacon blob data")

return nil, err
} else {
decoratedEvent.Meta.Client.AdditionalData = &xatu.ClientMeta_EthV1BeaconBlobSidecar{
EthV1BeaconBlobSidecar: additionalData,
}
}

return decoratedEvent, nil
}

func (b *BeaconBlobDeriver) getAdditionalData(_ context.Context, blob *deneb.BlobSidecar) (*xatu.ClientMeta_AdditionalEthV1BeaconBlobSidecarData, error) {
extra := &xatu.ClientMeta_AdditionalEthV1BeaconBlobSidecarData{
DataSizeKb: &wrapperspb.UInt64Value{Value: uint64(len(blob.Blob) / 1024)},
}

slot := b.beacon.Metadata().Wallclock().Slots().FromNumber(uint64(blob.Slot))
epoch := b.beacon.Metadata().Wallclock().Epochs().FromSlot(uint64(blob.Slot))

extra.Slot = &xatu.SlotV2{
StartDateTime: timestamppb.New(slot.TimeWindow().Start()),
Number: &wrapperspb.UInt64Value{Value: uint64(blob.Slot)},
}

extra.Epoch = &xatu.EpochV2{
Number: &wrapperspb.UInt64Value{Value: epoch.Number()},
StartDateTime: timestamppb.New(epoch.TimeWindow().Start()),
}

return extra, nil
}
2 changes: 2 additions & 0 deletions pkg/cannon/deriver/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deriver

import (
v1 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v1"
v2 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v2"
"github.com/ethpandaops/xatu/pkg/cannon/deriver/blockprint"
"github.com/pkg/errors"
Expand All @@ -16,6 +17,7 @@ type Config struct {
WithdrawalConfig v2.WithdrawalDeriverConfig `yaml:"withdrawal"`
BeaconBlockConfig v2.BeaconBlockDeriverConfig `yaml:"beaconBlock"`
BlockClassificationConfig blockprint.BlockClassificationDeriverConfig `yaml:"blockClassification"`
BeaconBlobSidecarConfig v1.BeaconBlobDeriverConfig `yaml:"beaconBlobSidecar"`
}

func (c *Config) Validate() error {
Expand Down
10 changes: 9 additions & 1 deletion pkg/cannon/iterator/checkpoint_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c *CheckpointIterator) Next(ctx context.Context) (next *xatu.CannonLocatio
// If location is empty we haven't started yet, start at the network default for the type. If the network default
// is empty, we'll start at epoch 0.
if location == nil {
location, err = c.createLocationFromEpochNumber(phase0.Epoch(GetDefaultSlotLocationForNetworkAndType(c.networkName, c.cannonType) / 32))
location, err = c.createLocationFromEpochNumber(phase0.Epoch(GetDefaultSlotLocation(c.beaconNode.Metadata().Spec.ForkEpochs, c.cannonType) / 32))
if err != nil {
return nil, []*xatu.CannonLocation{}, errors.Wrap(err, "failed to create location from slot number 0")
}
Expand Down Expand Up @@ -211,6 +211,8 @@ func (c *CheckpointIterator) getEpochFromLocation(location *xatu.CannonLocation)
return phase0.Epoch(location.GetEthV2BeaconBlockWithdrawal().Epoch), nil
case xatu.CannonType_BEACON_API_ETH_V2_BEACON_BLOCK:
return phase0.Epoch(location.GetEthV2BeaconBlock().Epoch), nil
case xatu.CannonType_BEACON_API_ETH_V1_BEACON_BLOB_SIDECAR:
return phase0.Epoch(location.GetEthV1BeaconBlobSidecar().Epoch), nil
default:
return 0, errors.Errorf("unknown cannon type %s", location.Type)
}
Expand Down Expand Up @@ -271,6 +273,12 @@ func (c *CheckpointIterator) createLocationFromEpochNumber(epoch phase0.Epoch) (
Epoch: uint64(epoch),
},
}
case xatu.CannonType_BEACON_API_ETH_V1_BEACON_BLOB_SIDECAR:
location.Data = &xatu.CannonLocation_EthV1BeaconBlobSidecar{
EthV1BeaconBlobSidecar: &xatu.CannonLocationEthV1BeaconBlobSidecar{
Epoch: uint64(epoch),
},
}
default:
return location, errors.Errorf("unknown cannon type %s", location.Type)
}
Expand Down
Loading

0 comments on commit 8a61ca1

Please sign in to comment.