From f8d8eba2f0450ec5a0a6058c88e36d8d906f6e06 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 22 Sep 2023 12:31:46 +1000 Subject: [PATCH] feat(cannon): Wrap GetBeaconBlock in a semaphore --- pkg/cannon/ethereum/beacon.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/pkg/cannon/ethereum/beacon.go b/pkg/cannon/ethereum/beacon.go index 0177e68d..49342aff 100644 --- a/pkg/cannon/ethereum/beacon.go +++ b/pkg/cannon/ethereum/beacon.go @@ -202,22 +202,32 @@ func (b *BeaconNode) Synced(ctx context.Context) error { } // GetBeaconBlock returns a beacon block by its identifier. Blocks can be cached internally. +// This function has been updated to use a semaphore to limit the number of concurrent goroutines. func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, ignoreMetrics ...bool) (*spec.VersionedSignedBeaconBlock, error) { - // Use singleflight to ensure we only make one request for a block at a time. - x, err, _ := b.sfGroup.Do(identifier, func() (interface{}, error) { - // Check the cache first. - if item := b.blockCache.Get(identifier); item != nil { - if len(ignoreMetrics) != 0 && ignoreMetrics[0] { - b.metrics.IncBlockCacheHit(string(b.Metadata().Network.Name)) - } + b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name)) - return item.Value(), nil - } + // Create a buffered channel (semaphore) to limit the number of concurrent goroutines. + sem := make(chan struct{}, b.config.BlockPreloadWorkers) + // Check the cache first. + if item := b.blockCache.Get(identifier); item != nil { if len(ignoreMetrics) != 0 && ignoreMetrics[0] { - b.metrics.IncBlockCacheMiss(string(b.Metadata().Network.Name)) + b.metrics.IncBlockCacheHit(string(b.Metadata().Network.Name)) } + return item.Value(), nil + } + + if len(ignoreMetrics) != 0 && ignoreMetrics[0] { + b.metrics.IncBlockCacheMiss(string(b.Metadata().Network.Name)) + } + + // Use singleflight to ensure we only make one request for a block at a time. + x, err, _ := b.sfGroup.Do(identifier, func() (interface{}, error) { + // Acquire a semaphore before proceeding. + sem <- struct{}{} + defer func() { <-sem }() + // Not in the cache, so fetch it. block, err := b.beacon.FetchBlock(ctx, identifier) if err != nil { @@ -237,10 +247,6 @@ func (b *BeaconNode) GetBeaconBlock(ctx context.Context, identifier string, igno return nil, err } - if len(ignoreMetrics) != 0 && ignoreMetrics[0] { - b.metrics.IncBlocksFetched(string(b.Metadata().Network.Name)) - } - return x.(*spec.VersionedSignedBeaconBlock), nil }