From 40cd57cd195e79c96774b284edb3ea0f65df950b Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Fri, 6 Dec 2024 13:36:16 -0800 Subject: [PATCH] fix unit test and address comment on AcquireChan return --- internal/internal_poller_autoscaler.go | 5 +++++ internal/internal_poller_autoscaler_test.go | 5 +++-- internal/internal_worker_base.go | 9 ++++---- internal/worker/concurrency.go | 9 +------- internal/worker/resizable_permit.go | 10 +++++---- internal/worker/resizable_permit_test.go | 24 ++++++++++++++++++--- 6 files changed, 40 insertions(+), 22 deletions(-) diff --git a/internal/internal_poller_autoscaler.go b/internal/internal_poller_autoscaler.go index 9185d5641..2dc81e7ba 100644 --- a/internal/internal_poller_autoscaler.go +++ b/internal/internal_poller_autoscaler.go @@ -135,6 +135,11 @@ func (p *pollerAutoScaler) Start() { p.permit.SetQuota(int(proposedResource)) } p.pollerUsageEstimator.Reset() + + // hooks + for i := range p.onAutoScale { + p.onAutoScale[i]() + } } } }() diff --git a/internal/internal_poller_autoscaler_test.go b/internal/internal_poller_autoscaler_test.go index c1a3dfb4f..4a441b642 100644 --- a/internal/internal_poller_autoscaler_test.go +++ b/internal/internal_poller_autoscaler_test.go @@ -193,7 +193,8 @@ func Test_pollerAutoscaler(t *testing.T) { go func() { defer wg.Done() for pollResult := range pollChan { - pollerScaler.permit.Acquire(context.Background()) + err := pollerScaler.permit.Acquire(context.Background()) + assert.NoError(t, err) pollerScaler.CollectUsage(pollResult) pollerScaler.permit.Release() } @@ -202,7 +203,7 @@ func Test_pollerAutoscaler(t *testing.T) { assert.Eventually(t, func() bool { return autoscalerEpoch.Load() == uint64(tt.args.autoScalerEpoch) - }, tt.args.cooldownTime+20*time.Millisecond, 10*time.Millisecond) + }, tt.args.cooldownTime+100*time.Millisecond, 10*time.Millisecond) pollerScaler.Stop() res := pollerScaler.permit.Quota() - pollerScaler.permit.Count() assert.Equal(t, tt.want, int(res)) diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 79e6b9086..b4bfb0ad6 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -248,14 +248,13 @@ func (bw *baseWorker) runPoller() { bw.metricsScope.Counter(metrics.PollerStartCounter).Inc(1) for { - // permitChannel can be blocking without passing context because shutdownCh is used - permitChannel := bw.concurrency.PollerPermit.AcquireChan(context.Background()) + permitChannel, channelDone := bw.concurrency.TaskPermit.AcquireChan(bw.limiterContext) select { case <-bw.shutdownCh: - permitChannel.Close() + channelDone() return - case <-permitChannel.C(): // don't poll unless there is a task permit - permitChannel.Close() + case <-permitChannel: // don't poll unless there is a task permit + channelDone() // TODO move to a centralized place inside the worker // emit metrics on concurrent task permit quota and current task permit count // NOTE task permit doesn't mean there is a task running, it still needs to poll until it gets a task to process diff --git a/internal/worker/concurrency.go b/internal/worker/concurrency.go index aa246bacf..8d0771b91 100644 --- a/internal/worker/concurrency.go +++ b/internal/worker/concurrency.go @@ -33,16 +33,9 @@ type ConcurrencyLimit struct { // Permit is an adaptive permit issuer to control concurrency type Permit interface { Acquire(context.Context) error - AcquireChan(context.Context) PermitChannel + AcquireChan(context.Context) (channel <-chan struct{}, done func()) Count() int Quota() int Release() SetQuota(int) } - -// PermitChannel is a channel that can be used to wait for a permit to be available -// Remember to call Close() to avoid goroutine leak -type PermitChannel interface { - C() <-chan struct{} - Close() -} diff --git a/internal/worker/resizable_permit.go b/internal/worker/resizable_permit.go index c0ab89d13..31c504f63 100644 --- a/internal/worker/resizable_permit.go +++ b/internal/worker/resizable_permit.go @@ -70,7 +70,7 @@ func (p *resizablePermit) Count() int { // After usage: // 1. avoid goroutine leak by calling permitChannel.Close() // 2. release permit by calling permit.Release() -func (p *resizablePermit) AcquireChan(ctx context.Context) PermitChannel { +func (p *resizablePermit) AcquireChan(ctx context.Context) (<-chan struct{}, func()) { ctx, cancel := context.WithCancel(ctx) pc := &permitChannel{ p: p, @@ -79,8 +79,10 @@ func (p *resizablePermit) AcquireChan(ctx context.Context) PermitChannel { cancel: cancel, wg: &sync.WaitGroup{}, } - pc.start() - return pc + pc.Start() + return pc.C(), func() { + pc.Close() + } } type permitChannel struct { @@ -95,7 +97,7 @@ func (ch *permitChannel) C() <-chan struct{} { return ch.c } -func (ch *permitChannel) start() { +func (ch *permitChannel) Start() { ch.wg.Add(1) go func() { defer ch.wg.Done() diff --git a/internal/worker/resizable_permit_test.go b/internal/worker/resizable_permit_test.go index 4b7d39f01..21903fb91 100644 --- a/internal/worker/resizable_permit_test.go +++ b/internal/worker/resizable_permit_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/atomic" + "go.uber.org/goleak" ) func TestPermit_Simulation(t *testing.T) { @@ -86,6 +87,7 @@ func TestPermit_Simulation(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + defer goleak.VerifyNone(t) wg := &sync.WaitGroup{} permit := NewResizablePermit(tt.capacity[0]) wg.Add(1) @@ -117,15 +119,15 @@ func TestPermit_Simulation(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - permitChan := permit.AcquireChan(ctx) + permitChan, done := permit.AcquireChan(ctx) select { - case <-permitChan.C(): + case <-permitChan: time.Sleep(time.Duration(100+rand.Intn(50)) * time.Millisecond) permit.Release() case <-ctx.Done(): failures.Inc() } - permitChan.Close() + done() }() } @@ -142,3 +144,19 @@ func TestPermit_Simulation(t *testing.T) { }) } } + +func Test_Permit_AcquireChan(t *testing.T) { + permit := NewResizablePermit(2) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + permitChan, done := permit.AcquireChan(ctx) + select { + case <-permitChan: + assert.Equal(t, 2, permit.Quota()) + assert.Equal(t, 1, permit.Count()) + case <-ctx.Done(): + t.Error("unexpected timeout") + } + done() + permit.Release() +}