Skip to content

Commit

Permalink
feat(cannon): Wrap GetBeaconBlock in a semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Sep 22, 2023
1 parent 2da39d4 commit f8d8eba
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions pkg/cannon/ethereum/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,22 +202,32 @@ func (b *BeaconNode) Synced(ctx context.Context) error {
}

// GetBeaconBlock returns a beacon block by its identifier. Blocks can be cached internally.
// This function has been updated to use a semaphore to limit the number of concurrent goroutines.
func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, ignoreMetrics ...bool) (*spec.VersionedSignedBeaconBlock, error) {
// Use singleflight to ensure we only make one request for a block at a time.
x, err, _ := b.sfGroup.Do(identifier, func() (interface{}, error) {
// Check the cache first.
if item := b.blockCache.Get(identifier); item != nil {
if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlockCacheHit(string(b.Metadata().Network.Name))
}
b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name))

return item.Value(), nil
}
// Create a buffered channel (semaphore) to limit the number of concurrent goroutines.
sem := make(chan struct{}, b.config.BlockPreloadWorkers)

// Check the cache first.
if item := b.blockCache.Get(identifier); item != nil {
if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlockCacheMiss(string(b.Metadata().Network.Name))
b.metrics.IncBlockCacheHit(string(b.Metadata().Network.Name))
}

return item.Value(), nil
}

if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlockCacheMiss(string(b.Metadata().Network.Name))
}

// Use singleflight to ensure we only make one request for a block at a time.
x, err, _ := b.sfGroup.Do(identifier, func() (interface{}, error) {
// Acquire a semaphore before proceeding.
sem <- struct{}{}
defer func() { <-sem }()

// Not in the cache, so fetch it.
block, err := b.beacon.FetchBlock(ctx, identifier)
if err != nil {
Expand All @@ -237,10 +247,6 @@ func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, igno
return nil, err
}

if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name))
}

return x.(*spec.VersionedSignedBeaconBlock), nil
}

Expand Down

0 comments on commit f8d8eba

Please sign in to comment.