Skip to content

Commit

Permalink
feat(cannon): update to shared blob fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Savid committed Mar 8, 2024
1 parent 6f89dae commit 7dfa90a
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 23 deletions.
57 changes: 52 additions & 5 deletions pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,31 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) {

time.Sleep(100 * time.Millisecond)

span.AddEvent("Checking if beacon node is synced")

if err := b.beacon.Synced(ctx); err != nil {
span.SetStatus(codes.Error, err.Error())

return err
}

span.AddEvent("Grabbing next location")

// Get the next slot
location, _, err := b.iterator.Next(ctx)
location, lookAhead, err := b.iterator.Next(ctx)
if err != nil {
span.SetStatus(codes.Error, err.Error())

return err
}

span.AddEvent("Obtained next location, looking ahead...", trace.WithAttributes(attribute.Int64("location", int64(location.GetEthV1BeaconBlobSidecar().GetEpoch()))))

// Look ahead
b.lookAheadAtLocation(ctx, lookAhead)

span.AddEvent("Look ahead complete. Processing epoch...")

// Process the epoch
events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV1BeaconBlobSidecar().GetEpoch()))
if err != nil {
Expand All @@ -132,6 +143,8 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) {
return err
}

span.AddEvent("Epoch processing complete. Sending events...")

// Send the events
for _, fn := range b.onEventsCallbacks {
if err := fn(ctx, events); err != nil {
Expand All @@ -141,13 +154,17 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) {
}
}

span.AddEvent("Events sent. Updating location...")

// Update our location
if err := b.iterator.UpdateLocation(ctx, location); err != nil {
span.SetStatus(codes.Error, err.Error())

return err
}

span.AddEvent("Location updated. Done.")

bo.Reset()

return nil
Expand All @@ -162,6 +179,37 @@ func (b *BeaconBlobDeriver) run(rctx context.Context) {
}
}

// lookAheadAtLocation takes the upcoming locations and looks ahead to do any pre-processing that might be required.
func (b *BeaconBlobDeriver) lookAheadAtLocation(ctx context.Context, locations []*xatu.CannonLocation) {
_, span := observability.Tracer().Start(ctx,
"BeaconBlobDeriver.lookAheadAtLocations",
)
defer span.End()

if locations == nil {
return
}

for _, location := range locations {
// Get the next look ahead epoch
epoch := phase0.Epoch(location.GetEthV1BeaconBlobSidecar().GetEpoch())

sp, err := b.beacon.Node().Spec()
if err != nil {
b.log.WithError(err).WithField("epoch", epoch).Warn("Failed to look ahead at epoch")

return
}

for i := uint64(0); i <= uint64(sp.SlotsPerEpoch-1); i++ {
slot := phase0.Slot(i + uint64(epoch)*uint64(sp.SlotsPerEpoch))

// Add the block sidecars to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlobSidecars(xatuethv1.SlotAsString(slot))
}
}
}

func (b *BeaconBlobDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
ctx, span := observability.Tracer().Start(ctx,
"BeaconBlobDeriver.processEpoch",
Expand Down Expand Up @@ -197,8 +245,7 @@ func (b *BeaconBlobDeriver) processSlot(ctx context.Context, slot phase0.Slot) (
)
defer span.End()

// Get the block
blobs, err := b.beacon.Node().FetchBeaconBlockBlobs(ctx, xatuethv1.SlotAsString(slot))
blobs, err := b.beacon.GetBeaconBlobSidecars(ctx, xatuethv1.SlotAsString(slot))
if err != nil {
var apiErr *api.Error
if errors.As(err, &apiErr) {
Expand All @@ -210,7 +257,7 @@ func (b *BeaconBlobDeriver) processSlot(ctx context.Context, slot phase0.Slot) (
}
}

return nil, errors.Wrapf(err, "failed to get beacon block for slot %d", slot)
return nil, errors.Wrapf(err, "failed to get beacon blob sidecars for slot %d", slot)
}

if blobs == nil {
Expand All @@ -222,7 +269,7 @@ func (b *BeaconBlobDeriver) processSlot(ctx context.Context, slot phase0.Slot) (
for _, blob := range blobs {
event, err := b.createEventFromBlob(ctx, blob)
if err != nil {
return nil, errors.Wrapf(err, "failed to create event from block for slot %d", slot)
return nil, errors.Wrapf(err, "failed to create event from blob sidecars for slot %d", slot)
}

events = append(events, event)
Expand Down
73 changes: 65 additions & 8 deletions pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"strconv"
"time"

"github.com/attestantio/go-eth2-client/api"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
backoff "github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -204,6 +206,9 @@ func (b *ExecutionTransactionDeriver) lookAheadAtLocation(ctx context.Context, l

// Add the block to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlock(xatuethv1.SlotAsString(slot))

// Add the blob sidecars to the preload queue so it's available when we need it
b.beacon.LazyLoadBeaconBlobSidecars(xatuethv1.SlotAsString(slot))
}
}
}
Expand All @@ -230,6 +235,28 @@ func (b *ExecutionTransactionDeriver) processSlot(ctx context.Context, slot phas
return nil, errors.Wrapf(err, "failed to get block identifier for slot %d", slot)
}

blobSidecars, err := b.beacon.GetBeaconBlobSidecars(ctx, xatuethv1.SlotAsString(slot))
if err != nil {
var apiErr *api.Error
if errors.As(err, &apiErr) {
switch apiErr.StatusCode {
case 404:
b.log.WithError(err).WithField("slot", slot).Debug("no beacon block blob sidecars found for slot")
case 503:
return nil, errors.New("beacon node is syncing")
default:
return nil, errors.Wrapf(err, "failed to get beacon block blob sidecars for slot %d", slot)
}
}
}

blobSidecarsMap := map[string]*deneb.BlobSidecar{}

for _, blobSidecar := range blobSidecars {
versionedHash := ethereum.ConvertKzgCommitmentToVersionedHash(blobSidecar.KZGCommitment[:])
blobSidecarsMap[versionedHash.String()] = blobSidecar
}

events := []*xatu.DecoratedEvent{}

transactions, err := b.getExecutionTransactions(ctx, block)
Expand Down Expand Up @@ -282,25 +309,31 @@ func (b *ExecutionTransactionDeriver) processSlot(ctx context.Context, slot phas
if transaction.Type() == 3 {
blobHashes := make([]string, len(transaction.BlobHashes()))

sidecarsEmptySize := 0
sidecarsSize := 0

for i := 0; i < len(transaction.BlobHashes()); i++ {
hash := transaction.BlobHashes()[i]
blobHashes[i] = hash.String()
sidecar := blobSidecarsMap[hash.String()]

if sidecar != nil {
sidecarSize := len(sidecar.Blob)
sidecarsSize += sidecarSize
sidecarsEmptySize += countConsecutiveEmptyBytes(sidecar.Blob[:], 4)
} else {
b.log.WithField("versioned hash", hash.String()).WithField("transaction hash", transaction.Hash().Hex()).Warn("Missing blob sidecar")
}
}

tx.BlobGas = wrapperspb.UInt64(transaction.BlobGas())
tx.BlobGasFeeCap = transaction.BlobGasFeeCap().String()
tx.BlobHashes = blobHashes
sidecarsEmptySize := 0
sidecarsSize := 0

for i := 0; i < len(transaction.BlobTxSidecar().Blobs); i++ {
sidecar := transaction.BlobTxSidecar().Blobs[i][:]
sidecarsSize += len(sidecar)
sidecarsEmptySize += ethereum.CountConsecutiveEmptyBytes(sidecar, 4)
}

tx.BlobSidecarsSize = fmt.Sprint(sidecarsSize)
tx.BlobSidecarsEmptySize = fmt.Sprint(sidecarsEmptySize)
} else {
continue
}

event, err := b.createEvent(ctx, tx, uint64(index), blockIdentifier, transaction)
Expand All @@ -316,6 +349,30 @@ func (b *ExecutionTransactionDeriver) processSlot(ctx context.Context, slot phas
return events, nil
}

func countConsecutiveEmptyBytes(byteArray []byte, threshold int) int {
count := 0
consecutiveZeros := 0

for _, b := range byteArray {
if b == 0 {
consecutiveZeros++
} else {
if consecutiveZeros > threshold {
count += consecutiveZeros
}

consecutiveZeros = 0
}
}

// Check if the last sequence in the array is longer than the threshold and hasn't been counted yet
if consecutiveZeros > threshold {
count += consecutiveZeros
}

return count
}

func (b *ExecutionTransactionDeriver) getExecutionTransactions(ctx context.Context, block *spec.VersionedSignedBeaconBlock) ([]*types.Transaction, error) {
transactions := []*types.Transaction{}

Expand Down
99 changes: 91 additions & 8 deletions pkg/cannon/ethereum/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/ethpandaops/beacon/pkg/beacon"
"github.com/ethpandaops/xatu/pkg/cannon/ethereum/services"
"github.com/ethpandaops/xatu/pkg/networks"
Expand All @@ -32,10 +33,13 @@ type BeaconNode struct {

onReadyCallbacks []func(ctx context.Context) error

sfGroup *singleflight.Group
blockCache *ttlcache.Cache[string, *spec.VersionedSignedBeaconBlock]
blockPreloadChan chan string
blockPreloadSem chan struct{}
sfGroup *singleflight.Group
blockCache *ttlcache.Cache[string, *spec.VersionedSignedBeaconBlock]
blockPreloadChan chan string
blockPreloadSem chan struct{}
blobSidecarsCache *ttlcache.Cache[string, []*deneb.BlobSidecar]
blobSidecarsPreloadChan chan string
blobSidecarsPreloadSem chan struct{}
}

func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger) (*BeaconNode, error) {
Expand Down Expand Up @@ -71,21 +75,28 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.
}

// Create a buffered channel (semaphore) to limit the number of concurrent goroutines.
sem := make(chan struct{}, config.BlockPreloadWorkers)
blockSem := make(chan struct{}, config.BlockPreloadWorkers)
blobSidecarsSem := make(chan struct{}, config.BlobSidecarsPreloadWorkers)

return &BeaconNode{
config: config,
log: log.WithField("module", "cannon/ethereum/beacon"),
beacon: node,
services: svcs,
sfGroup: &singleflight.Group{},
blockCache: ttlcache.New(
ttlcache.WithTTL[string, *spec.VersionedSignedBeaconBlock](config.BlockCacheTTL.Duration),
ttlcache.WithCapacity[string, *spec.VersionedSignedBeaconBlock](config.BlockCacheSize),
),
sfGroup: &singleflight.Group{},
blockPreloadChan: make(chan string, config.BlockPreloadQueueSize),
blockPreloadSem: sem,
metrics: NewMetrics(namespace, name),
blockPreloadSem: blockSem,
blobSidecarsCache: ttlcache.New(
ttlcache.WithTTL[string, []*deneb.BlobSidecar](config.BlockCacheTTL.Duration),
ttlcache.WithCapacity[string, []*deneb.BlobSidecar](config.BlockCacheSize),
),
blobSidecarsPreloadChan: make(chan string, config.BlobSidecarsPreloadQueueSize),
blobSidecarsPreloadSem: blobSidecarsSem,
metrics: NewMetrics(namespace, name),
}, nil
}

Expand Down Expand Up @@ -307,3 +318,75 @@ func (b *BeaconNode) LazyLoadBeaconBlock(identifier string) {

b.blockPreloadChan <- identifier
}

// GetBeaconBlobSidecars returns a block's blob sidecars.
func (b *BeaconNode) GetBeaconBlobSidecars(ctx context.Context, identifier string, ignoreMetrics ...bool) ([]*deneb.BlobSidecar, error) {
ctx, span := observability.Tracer().Start(ctx, "ethereum.beacon.GetBeaconBlobSidecars", trace.WithAttributes(attribute.String("identifier", identifier)))

defer span.End()

b.metrics.IncBlobSidecarsFetched(string(b.Metadata().Network.Name))

// Check the cache first.
if item := b.blobSidecarsCache.Get(identifier); item != nil {
if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlobSidecarsCacheHit(string(b.Metadata().Network.Name))
}

span.SetAttributes(attribute.Bool("cached", true))

return item.Value(), nil
}

span.SetAttributes(attribute.Bool("cached", false))

if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlobSidecarsCacheMiss(string(b.Metadata().Network.Name))
}

// Use singleflight to ensure we only make one request for a block at a time.
x, err, shared := b.sfGroup.Do(identifier, func() (interface{}, error) {
span.AddEvent("Acquiring semaphore...")

// Acquire a semaphore before proceeding.
b.blobSidecarsPreloadSem <- struct{}{}
defer func() { <-b.blobSidecarsPreloadSem }()

span.AddEvent("Semaphore acquired. Fetching blob sidecars from beacon api...")

// Not in the cache, so fetch it.
blobSidecars, err := b.beacon.FetchBeaconBlockBlobs(ctx, identifier)
if err != nil {
return nil, err
}

span.AddEvent("BlobSidecar fetched from beacon node.")

// Add it to the cache.
b.blobSidecarsCache.Set(identifier, blobSidecars, time.Hour)

return blobSidecars, nil
})
if err != nil {
span.SetStatus(codes.Error, err.Error())

if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlobSidecarsFetchErrors(string(b.Metadata().Network.Name))
}

return nil, err
}

span.AddEvent("Block fetching complete.", trace.WithAttributes(attribute.Bool("shared", shared)))

return x.([]*deneb.BlobSidecar), nil
}

func (b *BeaconNode) LazyLoadBeaconBlobSidecars(identifier string) {
// Don't add the blob sidecars to the preload queue if it's already in the cache.
if item := b.blobSidecarsCache.Get(identifier); item != nil {
return
}

b.blobSidecarsPreloadChan <- identifier
}
Loading

0 comments on commit 7dfa90a

Please sign in to comment.