Skip to content

Commit

Permalink
fix leaked goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
shijiesheng committed Nov 22, 2024
1 parent 385935a commit 69ec07e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
2 changes: 1 addition & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (bw *baseWorker) runPoller() {
select {
case <-bw.shutdownCh:
return
case <-bw.dynamic.TaskPermit.AcquireChan(bw.limiterContext): // don't poll unless there is a task permit
case <-bw.dynamic.TaskPermit.AcquireChan(bw.limiterContext, &bw.shutdownWG): // don't poll unless there is a task permit
// TODO move to a centralized place inside the worker
// emit metrics on concurrent task permit quota and current task permit count
// NOTE task permit doesn't mean there is a task running, it still needs to poll until it gets a task to process
Expand Down
13 changes: 9 additions & 4 deletions internal/worker/dynamic_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package worker
import (
"context"
"fmt"
"sync"

"github.com/marusama/semaphore/v2"
)
Expand All @@ -38,7 +39,7 @@ type DynamicParams struct {
// Permit is an adaptive
type Permit interface {
Acquire(context.Context, int) error
AcquireChan(context.Context) <-chan struct{}
AcquireChan(context.Context, *sync.WaitGroup) <-chan struct{}
Quota() int
SetQuota(int)
Count() int
Expand All @@ -63,15 +64,19 @@ func (p *permit) Acquire(ctx context.Context, count int) error {
}

// AcquireChan returns a permit ready channel. It's closed then permit is acquired
func (p *permit) AcquireChan(ctx context.Context) <-chan struct{} {
func (p *permit) AcquireChan(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} {
ch := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
if err := p.sem.Acquire(ctx, 1); err != nil {
close(ch)
return
}
ch <- struct{}{}
close(ch)
select { // try to send to channel, but don't block if listener is gone
case ch <- struct{}{}:
default:
}
}()
return ch
}
Expand Down

0 comments on commit 69ec07e

Please sign in to comment.