From 604d1ab7faf7ab928b837fc9470ad48bc0d1301b Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 22 Sep 2023 12:35:41 +1000 Subject: [PATCH] feat(cannon): Wrap GetBeaconBlock in a semaphore (#201) * feat(cannon): Wrap GetBeaconBlock in a semaphore * refactor: Improve GetBeaconBlock function's concurrency limit --- pkg/cannon/ethereum/beacon.go | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/pkg/cannon/ethereum/beacon.go b/pkg/cannon/ethereum/beacon.go index 0177e68d..f93ebca7 100644 --- a/pkg/cannon/ethereum/beacon.go +++ b/pkg/cannon/ethereum/beacon.go @@ -203,21 +203,30 @@ func (b *BeaconNode) Synced(ctx context.Context) error { // GetBeaconBlock returns a beacon block by its identifier. Blocks can be cached internally. func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, ignoreMetrics ...bool) (*spec.VersionedSignedBeaconBlock, error) { - // Use singleflight to ensure we only make one request for a block at a time. - x, err, _ := b.sfGroup.Do(identifier, func() (interface{}, error) { - // Check the cache first. - if item := b.blockCache.Get(identifier); item != nil { - if len(ignoreMetrics) != 0 && ignoreMetrics[0] { - b.metrics.IncBlockCacheHit(string(b.Metadata().Network.Name)) - } + b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name)) - return item.Value(), nil - } + // Create a buffered channel (semaphore) to limit the number of concurrent goroutines. + sem := make(chan struct{}, b.config.BlockPreloadWorkers) + // Check the cache first. + if item := b.blockCache.Get(identifier); item != nil { if len(ignoreMetrics) != 0 && ignoreMetrics[0] { - b.metrics.IncBlockCacheMiss(string(b.Metadata().Network.Name)) + b.metrics.IncBlockCacheHit(string(b.Metadata().Network.Name)) } + return item.Value(), nil + } + + if len(ignoreMetrics) != 0 && ignoreMetrics[0] { + b.metrics.IncBlockCacheMiss(string(b.Metadata().Network.Name)) + } + + // Use singleflight to ensure we only make one request for a block at a time. + x, err, _ := b.sfGroup.Do(identifier, func() (interface{}, error) { + // Acquire a semaphore before proceeding. + sem <- struct{}{} + defer func() { <-sem }() + // Not in the cache, so fetch it. block, err := b.beacon.FetchBlock(ctx, identifier) if err != nil { @@ -237,10 +246,6 @@ func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, igno return nil, err } - if len(ignoreMetrics) != 0 && ignoreMetrics[0] { - b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name)) - } - return x.(*spec.VersionedSignedBeaconBlock), nil }