-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add fixed window count rate limiter and retry on 429 status code
- Loading branch information
1 parent
5191907
commit 72650ed
Showing
7 changed files
with
303 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package internal | ||
|
||
type Log interface { | ||
Println(v ...interface{}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package internal | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
) | ||
|
||
type RateLimiter interface { | ||
// Wait will verify one request can be sent or wait if it can't. | ||
Wait(ctx context.Context) error | ||
// Update the rate limiter when the server returns more information about the current limits. | ||
Update(remaining int) error | ||
} | ||
|
||
// A fixedWindowCountRateLimiter is a rate limiter that will count the number of requests within a period (or window) | ||
// and block the caller for the expected remaining period in the window. | ||
// | ||
// The window will start again after the last one closes and the count will be reset. | ||
// Since other requests can happen outside the SDK, callers can calls the Update() function to update the remaining | ||
// event in the window. | ||
// | ||
// This rate limiter tries to model the server-side behaviour as best it can, however, it doesn't know exactly when | ||
// the server-side window starts or ends, so it can be misaligned. Therefore, the callers still need to retry requests | ||
// if a status code 429 (Too Many Requests) is received. | ||
type fixedWindowCountRateLimiter struct { | ||
limit int | ||
period time.Duration | ||
windowStart *time.Time | ||
count int | ||
mu *sync.Mutex | ||
} | ||
|
||
func newFixedWindowCountRateLimiter(limit int, period time.Duration) *fixedWindowCountRateLimiter { | ||
return &fixedWindowCountRateLimiter{ | ||
limit: limit, | ||
period: period, | ||
mu: &sync.Mutex{}, | ||
} | ||
} | ||
|
||
// Wait will block the caller when the number of requests has exceeded the limit in the current window. | ||
// This function allows bursting so it will only block when the limit is reached. | ||
func (rl *fixedWindowCountRateLimiter) Wait(ctx context.Context) error { | ||
rl.mu.Lock() | ||
defer rl.mu.Unlock() | ||
|
||
// Start window on first requests | ||
if rl.windowStart == nil { | ||
now := time.Now() | ||
rl.windowStart = &now | ||
} | ||
|
||
windowEnd := rl.windowStart.Add(rl.period) | ||
if time.Now().After(windowEnd) { | ||
rl.count = 0 | ||
rl.windowStart = &windowEnd | ||
windowEnd = rl.windowStart.Add(rl.period) | ||
} | ||
|
||
if rl.count == rl.limit { | ||
delay := windowEnd.Sub(time.Now()) | ||
err := sleepWithContext(ctx, delay) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
rl.count++ | ||
return nil | ||
} | ||
|
||
func (rl *fixedWindowCountRateLimiter) Update(remaining int) error { | ||
rl.mu.Lock() | ||
defer rl.mu.Unlock() | ||
rl.count = rl.limit - remaining | ||
return nil | ||
} | ||
|
||
func sleepWithContext(ctx context.Context, d time.Duration) error { | ||
timer := time.NewTimer(d) | ||
select { | ||
case <-ctx.Done(): | ||
if !timer.Stop() { | ||
return fmt.Errorf("context expired before timer stopped") | ||
} | ||
case <-timer.C: | ||
} | ||
return nil | ||
} | ||
|
||
var _ RateLimiter = &fixedWindowCountRateLimiter{} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package internal | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestFixedWindowCountRateLimiter_Wait(t *testing.T) { | ||
windowSize := 2 * time.Second | ||
windowLimit := 10 | ||
|
||
limiter := newFixedWindowCountRateLimiter(windowLimit, windowSize) | ||
|
||
ctx := context.Background() | ||
start := time.Now() | ||
runs := 3 | ||
count := 0 | ||
for range windowLimit * runs { | ||
err := limiter.Wait(ctx) | ||
require.NoError(t, err) | ||
count++ | ||
} | ||
end := time.Now() | ||
assert.Equal(t, runs*windowLimit, count) | ||
assert.Greater(t, end.Sub(start), windowSize.Nanoseconds()*int64(runs-1)) | ||
} | ||
|
||
func TestFixedWindowCountRateLimiter_Update(t *testing.T) { | ||
windowSize := 2 * time.Second | ||
windowLimit := 10 | ||
|
||
limiter := newFixedWindowCountRateLimiter(windowLimit, windowSize) | ||
|
||
ctx := context.Background() | ||
start := time.Now() | ||
runs := 2 | ||
count := 0 | ||
assert.NoError(t, limiter.Update(0)) | ||
for range windowLimit * runs { | ||
t.Logf("%s\n", time.Now().String()) | ||
err := limiter.Wait(ctx) | ||
require.NoError(t, err) | ||
count++ | ||
} | ||
end := time.Now() | ||
assert.Equal(t, runs*windowLimit, count) | ||
assert.Greater(t, end.Sub(start), windowSize.Nanoseconds()*int64(runs)) | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters