From fcb948f8a50a99ec2cd0cac5bb1dff7ecebe4af8 Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Fri, 8 Mar 2024 22:46:14 +1000 Subject: [PATCH] feat(cannon): update to shared blob fetcher (#284) * feat(cannon): update to shared blob fetcher * cleanup * adjust config ttl * cleanup --- .../deriver/beacon/eth/v1/beacon_blob.go | 57 ++++++++++- .../beacon/eth/v2/execution_transaction.go | 48 +++++++-- pkg/cannon/ethereum/beacon.go | 99 +++++++++++++++++-- pkg/cannon/ethereum/config.go | 12 ++- pkg/cannon/ethereum/metrics.go | 60 +++++++++++ 5 files changed, 253 insertions(+), 23 deletions(-) diff --git a/pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go b/pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go index 0ca21749..3eee2aab 100644 --- a/pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go +++ b/pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go @@ -108,20 +108,31 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) { time.Sleep(100 * time.Millisecond) + span.AddEvent("Checking if beacon node is synced") + if err := b.beacon.Synced(ctx); err != nil { span.SetStatus(codes.Error, err.Error()) return err } + span.AddEvent("Grabbing next location") + // Get the next slot - location, _, err := b.iterator.Next(ctx) + location, lookAhead, err := b.iterator.Next(ctx) if err != nil { span.SetStatus(codes.Error, err.Error()) return err } + span.AddEvent("Obtained next location, looking ahead...", trace.WithAttributes(attribute.Int64("location", int64(location.GetEthV1BeaconBlobSidecar().GetEpoch())))) + + // Look ahead + b.lookAheadAtLocation(ctx, lookAhead) + + span.AddEvent("Look ahead complete. Processing epoch...") + // Process the epoch events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV1BeaconBlobSidecar().GetEpoch())) if err != nil { @@ -132,6 +143,8 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) { return err } + span.AddEvent("Epoch processing complete. Sending events...") + // Send the events for _, fn := range b.onEventsCallbacks { if err := fn(ctx, events); err != nil { @@ -141,6 +154,8 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) { } } + span.AddEvent("Events sent. Updating location...") + // Update our location if err := b.iterator.UpdateLocation(ctx, location); err != nil { span.SetStatus(codes.Error, err.Error()) @@ -148,6 +163,8 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) { return err } + span.AddEvent("Location updated. Done.") + bo.Reset() return nil @@ -162,6 +179,37 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) { } } +// lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required. +func (b *BeaconBlobDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) { + _, span := observability.Tracer().Start(ctx, + "BeaconBlobDeriver.lookAheadAtLocations", + ) + defer span.End() + + if locations == nil { + return + } + + for _, location := range locations { + // Get the next look ahead epoch + epoch := phase0.Epoch(location.GetEthV1BeaconBlobSidecar().GetEpoch()) + + sp, err := b.beacon.Node().Spec() + if err != nil { + b.log.WithError(err).WithField("epoch", epoch).Warn("Failed to look ahead at epoch") + + return + } + + for i := uint64(0); i <= uint64(sp.SlotsPerEpoch-1); i++ { + slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch)) + + // Add the block sidecars to the preload queue so it's available when we need it + b.beacon.LazyLoadBeaconBlobSidecars(xatuethv1.SlotAsString(slot)) + } + } +} + func (b *BeaconBlobDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { ctx, span := observability.Tracer().Start(ctx, "BeaconBlobDeriver.processEpoch", @@ -197,8 +245,7 @@ func (b *BeaconBlobDeriver) processSlot(ctx context.Context, slot phase0.Slot) ( ) defer span.End() - // Get the block - blobs, err := b.beacon.Node().FetchBeaconBlockBlobs(ctx, xatuethv1.SlotAsString(slot)) + blobs, err := b.beacon.GetBeaconBlobSidecars(ctx, xatuethv1.SlotAsString(slot)) if err != nil { var apiErr *api.Error if errors.As(err, &apiErr) { @@ -210,7 +257,7 @@ func (b *BeaconBlobDeriver) processSlot(ctx context.Context, slot phase0.Slot) ( } } - return nil, errors.Wrapf(err, "failed to get beacon block for slot %d", slot) + return nil, errors.Wrapf(err, "failed to get beacon blob sidecars for slot %d", slot) } if blobs == nil { @@ -222,7 +269,7 @@ func (b *BeaconBlobDeriver) processSlot(ctx context.Context, slot phase0.Slot) ( for _, blob := range blobs { event, err := b.createEventFromBlob(ctx, blob) if err != nil { - return nil, errors.Wrapf(err, "failed to create event from block for slot %d", slot) + return nil, errors.Wrapf(err, "failed to create event from blob sidecars for slot %d", slot) } events = append(events, event) diff --git a/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go b/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go index e8c39f13..7ba80cec 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go +++ b/pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go @@ -7,7 +7,9 @@ import ( "strconv" "time" + "github.com/attestantio/go-eth2-client/api" "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/deneb" "github.com/attestantio/go-eth2-client/spec/phase0" backoff "github.com/cenkalti/backoff/v4" "github.com/ethereum/go-ethereum/core/types" @@ -204,6 +206,9 @@ func (b *ExecutionTransactionDeriver) lookAheadAtLocation(ctx context.Context, l // Add the block to the preload queue so it's available when we need it b.beacon.LazyLoadBeaconBlock(xatuethv1.SlotAsString(slot)) + + // Add the blob sidecars to the preload queue so it's available when we need it + b.beacon.LazyLoadBeaconBlobSidecars(xatuethv1.SlotAsString(slot)) } } } @@ -230,6 +235,30 @@ func (b *ExecutionTransactionDeriver) processSlot(ctx context.Context, slot phas return nil, errors.Wrapf(err, "failed to get block identifier for slot %d", slot) } + blobSidecars, err := b.beacon.GetBeaconBlobSidecars(ctx, xatuethv1.SlotAsString(slot)) + if err != nil { + var apiErr *api.Error + if errors.As(err, &apiErr) { + switch apiErr.StatusCode { + case 404: + b.log.WithError(err).WithField("slot", slot).Debug("no beacon block blob sidecars found for slot") + case 503: + return nil, errors.New("beacon node is syncing") + default: + return nil, errors.Wrapf(err, "failed to get beacon block blob sidecars for slot %d", slot) + } + } else { + return nil, errors.Wrapf(err, "failed to get beacon block blob sidecars for slot %d", slot) + } + } + + blobSidecarsMap := map[string]*deneb.BlobSidecar{} + + for _, blobSidecar := range blobSidecars { + versionedHash := ethereum.ConvertKzgCommitmentToVersionedHash(blobSidecar.KZGCommitment[:]) + blobSidecarsMap[versionedHash.String()] = blobSidecar + } + events := []*xatu.DecoratedEvent{} transactions, err := b.getExecutionTransactions(ctx, block) @@ -282,22 +311,25 @@ func (b *ExecutionTransactionDeriver) processSlot(ctx context.Context, slot phas if transaction.Type() == 3 { blobHashes := make([]string, len(transaction.BlobHashes())) + sidecarsEmptySize := 0 + sidecarsSize := 0 + for i := 0; i < len(transaction.BlobHashes()); i++ { hash := transaction.BlobHashes()[i] blobHashes[i] = hash.String() + sidecar := blobSidecarsMap[hash.String()] + + if sidecar != nil { + sidecarsSize += len(sidecar.Blob) + sidecarsEmptySize += ethereum.CountConsecutiveEmptyBytes(sidecar.Blob[:], 4) + } else { + b.log.WithField("versioned hash", hash.String()).WithField("transaction", transaction.Hash().Hex()).Warn("missing blob sidecar") + } } tx.BlobGas = wrapperspb.UInt64(transaction.BlobGas()) tx.BlobGasFeeCap = transaction.BlobGasFeeCap().String() tx.BlobHashes = blobHashes - sidecarsEmptySize := 0 - sidecarsSize := 0 - - for i := 0; i < len(transaction.BlobTxSidecar().Blobs); i++ { - sidecar := transaction.BlobTxSidecar().Blobs[i][:] - sidecarsSize += len(sidecar) - sidecarsEmptySize += ethereum.CountConsecutiveEmptyBytes(sidecar, 4) - } tx.BlobSidecarsSize = fmt.Sprint(sidecarsSize) tx.BlobSidecarsEmptySize = fmt.Sprint(sidecarsEmptySize) diff --git a/pkg/cannon/ethereum/beacon.go b/pkg/cannon/ethereum/beacon.go index 8de40063..7504e715 100644 --- a/pkg/cannon/ethereum/beacon.go +++ b/pkg/cannon/ethereum/beacon.go @@ -7,6 +7,7 @@ import ( "time" "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/deneb" "github.com/ethpandaops/beacon/pkg/beacon" "github.com/ethpandaops/xatu/pkg/cannon/ethereum/services" "github.com/ethpandaops/xatu/pkg/networks" @@ -32,10 +33,13 @@ type BeaconNode struct { onReadyCallbacks []func(ctx context.Context) error - sfGroup *singleflight.Group - blockCache *ttlcache.Cache[string, *spec.VersionedSignedBeaconBlock] - blockPreloadChan chan string - blockPreloadSem chan struct{} + sfGroup *singleflight.Group + blockCache *ttlcache.Cache[string, *spec.VersionedSignedBeaconBlock] + blockPreloadChan chan string + blockPreloadSem chan struct{} + blobSidecarsCache *ttlcache.Cache[string, []*deneb.BlobSidecar] + blobSidecarsPreloadChan chan string + blobSidecarsPreloadSem chan struct{} } func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger) (*BeaconNode, error) { @@ -71,21 +75,28 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus. } // Create a buffered channel (semaphore) to limit the number of concurrent goroutines. - sem := make(chan struct{}, config.BlockPreloadWorkers) + blockSem := make(chan struct{}, config.BlockPreloadWorkers) + blobSidecarsSem := make(chan struct{}, config.BlobSidecarsPreloadWorkers) return &BeaconNode{ config: config, log: log.WithField("module", "cannon/ethereum/beacon"), beacon: node, services: svcs, + sfGroup: &singleflight.Group{}, blockCache: ttlcache.New( ttlcache.WithTTL[string, *spec.VersionedSignedBeaconBlock](config.BlockCacheTTL.Duration), ttlcache.WithCapacity[string, *spec.VersionedSignedBeaconBlock](config.BlockCacheSize), ), - sfGroup: &singleflight.Group{}, blockPreloadChan: make(chan string, config.BlockPreloadQueueSize), - blockPreloadSem: sem, - metrics: NewMetrics(namespace, name), + blockPreloadSem: blockSem, + blobSidecarsCache: ttlcache.New( + ttlcache.WithTTL[string, []*deneb.BlobSidecar](config.BlobSidecarsCacheTTL.Duration), + ttlcache.WithCapacity[string, []*deneb.BlobSidecar](config.BlobSidecarsCacheSize), + ), + blobSidecarsPreloadChan: make(chan string, config.BlobSidecarsPreloadQueueSize), + blobSidecarsPreloadSem: blobSidecarsSem, + metrics: NewMetrics(namespace, name), }, nil } @@ -307,3 +318,75 @@ func (b *BeaconNode) LazyLoadBeaconBlock(identifier string) { b.blockPreloadChan <- identifier } + +// GetBeaconBlobSidecars returns a block's blob sidecars. +func (b *BeaconNode) GetBeaconBlobSidecars(ctx context.Context, identifier string, ignoreMetrics ...bool) ([]*deneb.BlobSidecar, error) { + ctx, span := observability.Tracer().Start(ctx, "ethereum.beacon.GetBeaconBlobSidecars", trace.WithAttributes(attribute.String("identifier", identifier))) + + defer span.End() + + b.metrics.IncBlobSidecarsFetched(string(b.Metadata().Network.Name)) + + // Check the cache first. + if item := b.blobSidecarsCache.Get(identifier); item != nil { + if len(ignoreMetrics) != 0 && ignoreMetrics[0] { + b.metrics.IncBlobSidecarsCacheHit(string(b.Metadata().Network.Name)) + } + + span.SetAttributes(attribute.Bool("cached", true)) + + return item.Value(), nil + } + + span.SetAttributes(attribute.Bool("cached", false)) + + if len(ignoreMetrics) != 0 && ignoreMetrics[0] { + b.metrics.IncBlobSidecarsCacheMiss(string(b.Metadata().Network.Name)) + } + + // Use singleflight to ensure we only make one request for a block at a time. + x, err, shared := b.sfGroup.Do(identifier, func() (interface{}, error) { + span.AddEvent("Acquiring semaphore...") + + // Acquire a semaphore before proceeding. + b.blobSidecarsPreloadSem <- struct{}{} + defer func() { <-b.blobSidecarsPreloadSem }() + + span.AddEvent("Semaphore acquired. Fetching blob sidecars from beacon api...") + + // Not in the cache, so fetch it. + blobSidecars, err := b.beacon.FetchBeaconBlockBlobs(ctx, identifier) + if err != nil { + return nil, err + } + + span.AddEvent("BlobSidecar fetched from beacon node.") + + // Add it to the cache. + b.blobSidecarsCache.Set(identifier, blobSidecars, time.Hour) + + return blobSidecars, nil + }) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + + if len(ignoreMetrics) != 0 && ignoreMetrics[0] { + b.metrics.IncBlobSidecarsFetchErrors(string(b.Metadata().Network.Name)) + } + + return nil, err + } + + span.AddEvent("Block fetching complete.", trace.WithAttributes(attribute.Bool("shared", shared))) + + return x.([]*deneb.BlobSidecar), nil +} + +func (b *BeaconNode) LazyLoadBeaconBlobSidecars(identifier string) { + // Don't add the blob sidecars to the preload queue if it's already in the cache. + if item := b.blobSidecarsCache.Get(identifier); item != nil { + return + } + + b.blobSidecarsPreloadChan <- identifier +} diff --git a/pkg/cannon/ethereum/config.go b/pkg/cannon/ethereum/config.go index 90b476f7..7d2fc122 100644 --- a/pkg/cannon/ethereum/config.go +++ b/pkg/cannon/ethereum/config.go @@ -12,16 +12,24 @@ type Config struct { // OverrideNetworkName is the name of the network to use for the sentry. // If not set, the network name will be retrieved from the beacon node. OverrideNetworkName string `yaml:"overrideNetworkName" default:""` + // BeaconNodeHeaders is a map of headers to send to the beacon node. + BeaconNodeHeaders map[string]string `yaml:"beaconNodeHeaders"` // BlockCacheSize is the number of blocks to cache. BlockCacheSize uint64 `yaml:"blockCacheSize" default:"1000"` // BlockCacheTTL is the time to live for blocks in the cache. BlockCacheTTL human.Duration `yaml:"blockCacheTtl" default:"1h"` - // BeaconNodeHeaders is a map of headers to send to the beacon node. - BeaconNodeHeaders map[string]string `yaml:"beaconNodeHeaders"` // BlockPreloadWorkers is the number of workers to use for preloading blocks. BlockPreloadWorkers uint64 `yaml:"blockPreloadWorkers" default:"5"` // BlockPreloadQueueSize is the size of the queue for preloading blocks. BlockPreloadQueueSize uint64 `yaml:"blockPreloadQueueSize" default:"5000"` + // BlobSidecarsCacheSize is the number of blob sidecars to cache. + BlobSidecarsCacheSize uint64 `yaml:"blobSidecarsCacheSize" default:"500"` + // BlobSidecarsCacheTTL is the time to live for blob sidecars in the cache. + BlobSidecarsCacheTTL human.Duration `yaml:"blobSidecarsCacheTtl" default:"10m"` + // BlobSidecarsPreloadWorkers is the number of workers to use for preloading blob sidecars. + BlobSidecarsPreloadWorkers uint64 `yaml:"blobSidecarsPreloadWorkers" default:"5"` + // BlobSidecarsPreloadQueueSize is the size of the queue for preloading blob sidecars. + BlobSidecarsPreloadQueueSize uint64 `yaml:"blobSidecarsPreloadQueueSize" default:"5000"` } func (c *Config) Validate() error { diff --git a/pkg/cannon/ethereum/metrics.go b/pkg/cannon/ethereum/metrics.go index 9e0c9cf1..3a440360 100644 --- a/pkg/cannon/ethereum/metrics.go +++ b/pkg/cannon/ethereum/metrics.go @@ -14,6 +14,16 @@ type Metrics struct { blockCacheMiss *prometheus.CounterVec // PreloadBlockQueueSize is the number of blocks in the preload queue. preloadBlockQueueSize *prometheus.GaugeVec + // The number of blob sidecars that have been fetched. + blobSidecarsFetched *prometheus.CounterVec + // The number of blob sidecars fetches that have failed. + blobSidecarsFetchErrors *prometheus.CounterVec + // blobSidecarsCacheHit is the number of times a blob sidecars was found in the cache. + blobSidecarsCacheHit *prometheus.CounterVec + // blobSidecarsCacheMiss is the number of times a blob sidecars was not found in the cache. + blobSidecarsCacheMiss *prometheus.CounterVec + // preloadBlobSidecarsQueueSize is the number of blob sidecars in the preload queue. + preloadBlobSidecarsQueueSize *prometheus.GaugeVec } func NewMetrics(namespace, beaconNodeName string) *Metrics { @@ -46,6 +56,31 @@ func NewMetrics(namespace, beaconNodeName string) *Metrics { Name: "preload_block_queue_size", Help: "The number of blocks in the preload queue", }, []string{"network", "beacon"}), + blobSidecarsFetched: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "blob_sidecars_fetched_total", + Help: "The number of blob sidecars that have been fetched", + }, []string{"network", "beacon"}), + blobSidecarsFetchErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "blob_sidecars_fetch_errors_total", + Help: "The number of blob sidecars that have failed to be fetched", + }, []string{"network", "beacon"}), + blobSidecarsCacheHit: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "blob_sidecars_cache_hit_total", + Help: "The number of times a blob sidecars from a block was found in the cache", + }, []string{"network", "beacon"}), + blobSidecarsCacheMiss: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "blob_sidecars_cache_miss_total", + Help: "The number of times a blob sidecars from a block was not found in the cache", + }, []string{"network", "beacon"}), + preloadBlobSidecarsQueueSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "preload_blob_sidecars_queue_size", + Help: "The number of blob sidecars in the preload queue", + }, []string{"network", "beacon"}), } prometheus.MustRegister(m.blocksFetched) @@ -53,6 +88,11 @@ func NewMetrics(namespace, beaconNodeName string) *Metrics { prometheus.MustRegister(m.blockCacheHit) prometheus.MustRegister(m.blockCacheMiss) prometheus.MustRegister(m.preloadBlockQueueSize) + prometheus.MustRegister(m.blobSidecarsFetched) + prometheus.MustRegister(m.blobSidecarsFetchErrors) + prometheus.MustRegister(m.blobSidecarsCacheHit) + prometheus.MustRegister(m.blobSidecarsCacheMiss) + prometheus.MustRegister(m.preloadBlobSidecarsQueueSize) return m } @@ -76,3 +116,23 @@ func (m *Metrics) IncBlockCacheMiss(network string) { func (m *Metrics) SetPreloadBlockQueueSize(network string, size int) { m.preloadBlockQueueSize.WithLabelValues(network, m.beacon).Set(float64(size)) } + +func (m *Metrics) IncBlobSidecarsFetched(network string) { + m.blobSidecarsFetched.WithLabelValues(network, m.beacon).Inc() +} + +func (m *Metrics) IncBlobSidecarsFetchErrors(network string) { + m.blobSidecarsFetchErrors.WithLabelValues(network, m.beacon).Inc() +} + +func (m *Metrics) IncBlobSidecarsCacheHit(network string) { + m.blobSidecarsCacheHit.WithLabelValues(network, m.beacon).Inc() +} + +func (m *Metrics) IncBlobSidecarsCacheMiss(network string) { + m.blobSidecarsCacheMiss.WithLabelValues(network, m.beacon).Inc() +} + +func (m *Metrics) SetPreloadBlobSidecarsQueueSize(network string, size int) { + m.preloadBlobSidecarsQueueSize.WithLabelValues(network, m.beacon).Set(float64(size)) +}