Skip to content

Commit

Permalink
feat(cannon): Wrap GetBeaconBlock in a semaphore (#201)
Browse files Browse the repository at this point in the history
* feat(cannon): Wrap GetBeaconBlock in a semaphore

* refactor: Improve GetBeaconBlock function's concurrency limit
  • Loading branch information
samcm authored Sep 22, 2023
1 parent 687d680 commit 604d1ab
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions pkg/cannon/ethereum/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,21 +203,30 @@ func (b *BeaconNode) Synced(ctx context.Context) error {

// GetBeaconBlock returns a beacon block by its identifier. Blocks can be cached internally.
func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, ignoreMetrics ...bool) (*spec.VersionedSignedBeaconBlock, error) {
// Use singleflight to ensure we only make one request for a block at a time.
x, err, _ := b.sfGroup.Do(identifier, func() (interface{}, error) {
// Check the cache first.
if item := b.blockCache.Get(identifier); item != nil {
if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlockCacheHit(string(b.Metadata().Network.Name))
}
b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name))

return item.Value(), nil
}
// Create a buffered channel (semaphore) to limit the number of concurrent goroutines.
sem := make(chan struct{}, b.config.BlockPreloadWorkers)

// Check the cache first.
if item := b.blockCache.Get(identifier); item != nil {
if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlockCacheMiss(string(b.Metadata().Network.Name))
b.metrics.IncBlockCacheHit(string(b.Metadata().Network.Name))
}

return item.Value(), nil
}

if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlockCacheMiss(string(b.Metadata().Network.Name))
}

// Use singleflight to ensure we only make one request for a block at a time.
x, err, _ := b.sfGroup.Do(identifier, func() (interface{}, error) {
// Acquire a semaphore before proceeding.
sem <- struct{}{}
defer func() { <-sem }()

// Not in the cache, so fetch it.
block, err := b.beacon.FetchBlock(ctx, identifier)
if err != nil {
Expand All @@ -237,10 +246,6 @@ func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, igno
return nil, err
}

if len(ignoreMetrics) != 0 && ignoreMetrics[0] {
b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name))
}

return x.(*spec.VersionedSignedBeaconBlock), nil
}

Expand Down

0 comments on commit 604d1ab

Please sign in to comment.