Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ConcurrencyLimit to worker to enable dynamic tuning of concurrencies #1410

Merged
merged 5 commits into from
Dec 6, 2024

Conversation

shijiesheng
Copy link
Member

@shijiesheng shijiesheng commented Dec 5, 2024

What changed?

[High Risk]

  • replaced buffered channel with resizable semaphore to control task concurrency

[Low Risk]

  • added worker package for modularity
  • added ConcurrencyLimit entity to worker
  • removed unused methods of autoscaler interface

Why?

needed as first step to enable dynamic tuning of poller and task concurrencies

How did you test it?

Unit Test
[WIP] Canary Test + Bench Test

Potential risks


// AcquireChan creates a PermitChannel that can be used to wait for a permit
// After usage:
// 1. avoid goroutine leak by calling permitChannel.Close()
Copy link
Member

@Groxx Groxx Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this is also "or cancel the context".
A cancel-helper does ensure waiting + need fewer temp-vars though, so I kinda like it 👍. also easier to ensure it happens because it's special.


It's somewhat more common to return a <-chan, cancel func() tuple instead of an interface though (like context.WithCancel), and a bit more forget-resistant because you're forced to notice that there are two values instead of one value with an unknown number of methods.

I don't feel too strongly, but if ^ that's convincing to you I'd be happy to re-stamp. Changes should be pretty simple.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. I've changed to use this approach and removed the interface.


// Release release one permit
func (p *resizablePermit) Release() {
p.sem.Release(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does semaphore behave if Release() is called multiple times? does it increase the capacity?

Copy link
Member

@Groxx Groxx Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll (essentially) increase capacity, it's not a sync.Once or equivalent that can guarantee at-most-once-per-acquire.

When it exceeds the limit by counting negative use (more releases than acquires, possibly/probably at some later time), it panics: https://github.com/marusama/semaphore/blob/master/semaphore.go#L170

Copy link
Member

@Groxx Groxx Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a bit of a fundamental tradeoff between "ignore misuse, allow multiple calls" and "require exactly correct use, but it might be hard to find the cause".

tbh personally I prefer "require exactly correct use" in most cases, because the alternative might be releasing too early.
plus it's easy to convert "exactly correct" to "ignore misuse" with a sync.Once.Do(cancel) wrapper, e.g. for stuff like https://github.com/cadence-workflow/cadence/blob/master/common/clock/ratelimiter.go#L354 where it's convenient to use both defer and early-release to guarantee it happens in a func, and not have to worry about the many combinations possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. But due to the complexity of our poller task processing logic, we will have to pass the issued permit from poller goroutine to task goroutine. This will further complicate things with require exactly correct use.

task permit acquire -> poll goroutine finish -> pass to task processing goroutine -> release task permit when task goroutine finish

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, there are a few unfortunately-complicated delayed-release things in the client :\ I'd love to get rid of as many as possible, or make them much clearer if not possible.

Comment on lines 69 to 72
// AcquireChan creates a PermitChannel that can be used to wait for a permit
// After usage:
// 1. avoid goroutine leak by calling permitChannel.Close()
// 2. release permit by calling permit.Release()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// AcquireChan creates a PermitChannel that can be used to wait for a permit
// After usage:
// 1. avoid goroutine leak by calling permitChannel.Close()
// 2. release permit by calling permit.Release()
// AcquireChan creates a PermitChannel that can be used to wait for a single permit
// After usage:
// 1. avoid goroutine leak by calling permitChannel.Close()
// 2. if the read succeeded, release permit by calling permit.Release()

^ single-space indentation is so it's a list in godoc: https://tip.golang.org/doc/comment#lists (unordered lists use 2 spaces, dunno why it's different)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

maxTestDuration: 250 * time.Millisecond, // at least need 100ms * 1000 / 200 = 500ms to acquire all permit
capacity: []int{200},
goroutines: 1000,
expectFailuresRange: []int{1, 999}, // should at least pass some acquires
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably means we can make the bounds more precise, like 400,600? Loose bounds make sense to avoid noise, but it's targeting 500 and tends to be fairly close.

or maybe just a "should be ~500" comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. I've loosen the bound

Comment on lines 73 to 76
maxTestDuration: 250 * time.Millisecond,
capacity: []int{600, 400, 200},
goroutines: 1000,
expectFailuresRange: []int{1, 999},
Copy link
Member

@Groxx Groxx Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should expect...

  • 0ms: 1k enter, 600 accepted, 400 waiting
  • 50ms: resized down to 400, 400 still waiting because none have released
  • 100ms: resized down to 200, some of the original 600 begin releasing (100-150ms)
  • 150ms: all 600 released, 200 additional ones acquired (waiting until at lesat 200ms more before any release, from earliest at 100ms)
  • 200ms: earliest 100+100ms start releasing (upper bound is 800+200=1000, if no jitter, and the last 200 are still holding when it times out)
  • 250ms: times out, any with >100+100+(50ms total rand) time out and fail

so... expecting 800-1000 success? ignoring jitter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just say at most 500 failures. Jitter is making things more complicated

"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the simulation's great but it does hide any bugs that'd occur from using only one or the other mode, and that's how this is going to be used AFAICT.

seems worth some very simple tests for "can get" + "can fail from timeout/cancel", but I believe they'll all pass.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. Added unit test

Copy link
Member

@Groxx Groxx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests are failing (extra release) and minor comments (mostly on comments), but 👍 looks good AFAICT and let me know if you tackle other tests / changing the return format.

@Groxx Groxx self-requested a review December 5, 2024 22:40
@@ -190,9 +193,9 @@ func Test_pollerAutoscaler(t *testing.T) {
go func() {
defer wg.Done()
for pollResult := range pollChan {
pollerScaler.Acquire(1)
pollerScaler.permit.Acquire(context.Background())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acquire returns error which should be handled. otherwise Release might be called without acquire succeeding which leads to panic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I have been searching for the issue for hours.

Copy link

codecov bot commented Dec 6, 2024

Codecov Report

Attention: Patch coverage is 98.55072% with 1 line in your changes missing coverage. Please review.

Project coverage is 82.58%. Comparing base (641e4a7) to head (6342ff8).
Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
internal/internal_worker_base.go 95.45% 1 Missing ⚠️
Files with missing lines Coverage Δ
internal/internal_poller_autoscaler.go 92.22% <100.00%> (-0.49%) ⬇️
internal/worker/resizable_permit.go 100.00% <100.00%> (ø)
internal/internal_worker_base.go 82.62% <95.45%> (-0.79%) ⬇️

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 641e4a7...6342ff8. Read the comment docs.

@shijiesheng shijiesheng merged commit 9ffbb1f into cadence-workflow:master Dec 6, 2024
10 checks passed
Comment on lines +159 to +176
t.Run("acquire timeout", func(t *testing.T) {
permit := NewResizablePermit(1)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
time.Sleep(100 * time.Millisecond)
err := permit.Acquire(ctx)
assert.ErrorContains(t, err, "context deadline exceeded")
assert.Empty(t, permit.Count())
})

t.Run("cancel acquire", func(t *testing.T) {
permit := NewResizablePermit(1)
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := permit.Acquire(ctx)
assert.ErrorContains(t, err, "canceled")
assert.Empty(t, permit.Count())
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two tests are the same fwiw - you probably want to have one that does

  • acquire (use up the whole semaphore)
  • acquire again (blocks until timeout)
  • make sure it didn't return immediately (elapsed time > like 5ms)

permit := NewResizablePermit(1)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
time.Sleep(100 * time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, this makes it identical to the cancel case below (the chan is closed before it starts)

@Groxx
Copy link
Member

Groxx commented Dec 7, 2024

minor test gap but 👍 looks good.
did you find out what the double-release earlier failure was coming from, or was that maybe just a flaky test?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants