Skip to content

Commit

Permalink
Thread control utility (#10560)
Browse files Browse the repository at this point in the history
* utility for managing group of goroutines

* refactor context to StopChan

* remove limits

* leftovers

* leftovers round #2

* lint
  • Loading branch information
amirylm authored Sep 8, 2023
1 parent 5dbc5dd commit cc85898
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
44 changes: 44 additions & 0 deletions core/utils/thread_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package utils

import (
"context"
"sync"
)

var _ ThreadControl = &threadControl{}

// ThreadControl is a helper for managing a group of goroutines.
type ThreadControl interface {
// Go starts a goroutine and tracks the lifetime of the goroutine.
Go(fn func(context.Context))
// Close cancels the goroutines and waits for all of them to exit.
Close()
}

func NewThreadControl() *threadControl {
tc := &threadControl{
stop: make(chan struct{}),
}

return tc
}

type threadControl struct {
threadsWG sync.WaitGroup
stop StopChan
}

func (tc *threadControl) Go(fn func(context.Context)) {
tc.threadsWG.Add(1)
go func() {
defer tc.threadsWG.Done()
ctx, cancel := tc.stop.NewCtx()
defer cancel()
fn(ctx)
}()
}

func (tc *threadControl) Close() {
close(tc.stop)
tc.threadsWG.Wait()
}
27 changes: 27 additions & 0 deletions core/utils/thread_control_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package utils

import (
"context"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
)

func TestThreadControl_Close(t *testing.T) {
n := 10
tc := NewThreadControl()

finished := atomic.Int32{}

for i := 0; i < n; i++ {
tc.Go(func(ctx context.Context) {
<-ctx.Done()
finished.Add(1)
})
}

tc.Close()

require.Equal(t, int32(n), finished.Load())
}

0 comments on commit cc85898

Please sign in to comment.