From 19d1d6b9df169d38489da37aaefb78e269607ff4 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 5 Nov 2024 09:51:11 -1000 Subject: [PATCH] Simplify provide woker goroutines --- bitswap/server/server.go | 72 +++++++++++++++------------------------- 1 file changed, 26 insertions(+), 46 deletions(-) diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 2b45b324b..2cc3e2474 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -299,12 +299,7 @@ func (bs *Server) startWorkers(ctx context.Context) { if bs.provideEnabled { bs.waitWorkers.Add(1) go bs.provideCollector(ctx) - - // Spawn up multiple workers to handle incoming blocks - // consider increasing number if providing blocks bottlenecks - // file transfers - bs.waitWorkers.Add(1) - go bs.provideWorker(ctx) + bs.startProvideWorkers(ctx) } } @@ -501,48 +496,33 @@ func (bs *Server) provideCollector(ctx context.Context) { } } -func (bs *Server) provideWorker(ctx context.Context) { - limit := make(chan struct{}, provideWorkerMax) - defer func() { - // Wait until all limitGoProvide goroutines are done before declaring - // this worker as done. - for i := 0; i < provideWorkerMax; i++ { - limit <- struct{}{} - } - bs.waitWorkers.Done() - }() - - limitedGoProvide := func(k cid.Cid, wid int) { - defer func() { - // replace token when done - <-limit - }() - - log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k) - defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k) - - ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx - defer cancel() - - if err := bs.network.Provide(ctx, k); err != nil { - log.Warn(err) - } - } +// startProvideWorkers starts provide worker goroutines that provide CID +// supplied by provideCollector. +// +// If providing blocks bottlenecks file transfers then consider increasing +// provideWorkerMax, +func (bs *Server) startProvideWorkers(ctx context.Context) { + bs.waitWorkers.Add(provideWorkerMax) + for id := 0; id < provideWorkerMax; id++ { + go func(wid int) { + defer bs.waitWorkers.Done() + + var runCount int + // Read bs.proviudeKeys until closed, when provideCollector exits. + for k := range bs.provideKeys { + runCount++ + log.Debugw("Bitswap provider worker start", "ID", wid, "run", runCount, "cid", k) + + ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) + if err := bs.network.Provide(ctx, k); err != nil { + log.Warn(err) + } + cancel() - // worker spawner, reads from bs.provideKeys until it closes, spawning a - // _ratelimited_ number of workers to handle each key. - wid := 2 - for k := range bs.provideKeys { - log.Debug("Bitswap.ProvideWorker.Loop") - select { - case limit <- struct{}{}: - go limitedGoProvide(k, wid) - case <-ctx.Done(): - return - } - wid++ + log.Debugw("Bitswap provider worker done", "ID", wid, "run", runCount, "cid", k) + } + }(id) } - log.Debug("provideKeys channel closed") } func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {