Skip to content

Commit

Permalink
Simplify provide woker goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Nov 7, 2024
1 parent 4a077bc commit 19d1d6b
Showing 1 changed file with 26 additions and 46 deletions.
72 changes: 26 additions & 46 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,7 @@ func (bs *Server) startWorkers(ctx context.Context) {
if bs.provideEnabled {
bs.waitWorkers.Add(1)
go bs.provideCollector(ctx)

// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
bs.waitWorkers.Add(1)
go bs.provideWorker(ctx)
bs.startProvideWorkers(ctx)
}
}

Expand Down Expand Up @@ -501,48 +496,33 @@ func (bs *Server) provideCollector(ctx context.Context) {
}
}

func (bs *Server) provideWorker(ctx context.Context) {
limit := make(chan struct{}, provideWorkerMax)
defer func() {
// Wait until all limitGoProvide goroutines are done before declaring
// this worker as done.
for i := 0; i < provideWorkerMax; i++ {
limit <- struct{}{}
}
bs.waitWorkers.Done()
}()

limitedGoProvide := func(k cid.Cid, wid int) {
defer func() {
// replace token when done
<-limit
}()

log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k)
defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k)

ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx
defer cancel()

if err := bs.network.Provide(ctx, k); err != nil {
log.Warn(err)
}
}
// startProvideWorkers starts provide worker goroutines that provide CID
// supplied by provideCollector.
//
// If providing blocks bottlenecks file transfers then consider increasing
// provideWorkerMax,
func (bs *Server) startProvideWorkers(ctx context.Context) {
bs.waitWorkers.Add(provideWorkerMax)
for id := 0; id < provideWorkerMax; id++ {
go func(wid int) {
defer bs.waitWorkers.Done()

var runCount int
// Read bs.proviudeKeys until closed, when provideCollector exits.
for k := range bs.provideKeys {
runCount++
log.Debugw("Bitswap provider worker start", "ID", wid, "run", runCount, "cid", k)

ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout)
if err := bs.network.Provide(ctx, k); err != nil {
log.Warn(err)
}

Check warning on line 519 in bitswap/server/server.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/server.go#L518-L519

Added lines #L518 - L519 were not covered by tests
cancel()

// worker spawner, reads from bs.provideKeys until it closes, spawning a
// _ratelimited_ number of workers to handle each key.
wid := 2
for k := range bs.provideKeys {
log.Debug("Bitswap.ProvideWorker.Loop")
select {
case limit <- struct{}{}:
go limitedGoProvide(k, wid)
case <-ctx.Done():
return
}
wid++
log.Debugw("Bitswap provider worker done", "ID", wid, "run", runCount, "cid", k)
}
}(id)
}
log.Debug("provideKeys channel closed")
}

func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {
Expand Down

0 comments on commit 19d1d6b

Please sign in to comment.