diff --git a/gc/reaper/reaper.go b/gc/reaper/reaper.go index 96518cf26..eeaf5f58b 100644 --- a/gc/reaper/reaper.go +++ b/gc/reaper/reaper.go @@ -664,7 +664,8 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error { }() segSize := s.reaper.segmentSize - segment := deque.New[adInfo](segSize, segSize) + segment := new(deque.Deque[adInfo]) + segment.SetBaseCap(segSize) for segEnd := gcState.LastProcessedAdCid; segEnd != latestAdCid; { for adCid := latestAdCid; adCid != segEnd; { diff --git a/go.mod b/go.mod index 936b43279..766919957 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,8 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.30.2 github.com/aws/smithy-go v1.13.5 github.com/cockroachdb/pebble v0.0.0-20240822181941-1b4021bcfe22 - github.com/gammazero/channelqueue v0.2.2 - github.com/gammazero/deque v0.2.1 + github.com/gammazero/chanqueue v1.0.0 + github.com/gammazero/deque v1.0.0 github.com/gammazero/targz v0.0.3 github.com/ipfs/boxo v0.24.2 github.com/ipfs/go-cid v0.4.1 @@ -91,6 +91,7 @@ require ( github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/gammazero/channelqueue v0.2.2 // indirect github.com/gammazero/radixtree v0.3.1 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-kit/log v0.2.1 // indirect diff --git a/go.sum b/go.sum index e75e51def..0bb465b8b 100644 --- a/go.sum +++ b/go.sum @@ -216,8 +216,10 @@ github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4 github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/gammazero/channelqueue v0.2.2 h1:ufNzIbeDBxNfHj0m5uwUfOwvTmHF/O40hu2ZNnvF+/8= github.com/gammazero/channelqueue v0.2.2/go.mod h1:824o5HHE+yO1xokh36BIuSv8YWwXW0364ku91eRMFS4= -github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= -github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= +github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o= +github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc= +github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= +github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/gammazero/radixtree v0.3.1 h1:kYVZUH/XgIktiGjUBERExxP6Ock7epWuCAlrB2FKsmk= github.com/gammazero/radixtree v0.3.1/go.mod h1:wcCpc8wCLVlPg2FVRYQLtBCnj9MNHQI1qB1hVrHvEDs= github.com/gammazero/targz v0.0.3 h1:vlODyfU9QdRgD1AgdEgLQ7lhg+rc4eDQVoSoMoC1ruc= diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index 6f1c8c806..74201a5cc 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -11,7 +11,7 @@ import ( "sync/atomic" "time" - "github.com/gammazero/channelqueue" + "github.com/gammazero/chanqueue" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" @@ -587,7 +587,7 @@ func (ing *Ingester) onAdProcessed(peerID peer.ID) (<-chan adProcessedEvent, con // before being read. If this channel blocked and then caused // distributeEvents to block, that could prevent this channel from being // read, causing deadlock. - cq := channelqueue.New[adProcessedEvent](-1) + cq := chanqueue.New[adProcessedEvent]() events := cq.In() cancel := func() { // Drain channel to prevent deadlock if blocked writes are preventing