Skip to content

Commit

Permalink
ratelimits: Account for expired buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
beautifulentropy committed Dec 13, 2024
1 parent 62f1a26 commit 82ace44
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 8 deletions.
36 changes: 28 additions & 8 deletions ratelimits/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,14 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision
batchDecision := allowedDecision
newBuckets := make(map[string]time.Time)
incrBuckets := make(map[string]increment)
expiredBuckets := make(map[string]time.Time)
txnOutcomes := make(map[Transaction]string)

for _, txn := range batch {
tat, bucketExists := tats[txn.bucketKey]
if !bucketExists {
// First request from this client.
originalTAT := tat
if !bucketExists || l.clk.Now().After(originalTAT) {
// First request from this client or the bucket has expired.
tat = l.clk.Now()
}

Expand All @@ -294,14 +296,18 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision
}

if d.allowed && (tat != d.newTAT) && txn.spend {
// New bucket state should be persisted.
if bucketExists {
if !bucketExists {
// No bucket exists, initialize a new bucket with BatchSetNotExist.
newBuckets[txn.bucketKey] = d.newTAT
} else if originalTAT.Before(l.clk.Now()) {
// A bucket exists, with a TAT in the past, update it with BatchSet.
expiredBuckets[txn.bucketKey] = d.newTAT
} else {
// A bucket exists, with a TAT in the future, update it with BatchIncrement.
incrBuckets[txn.bucketKey] = increment{
cost: time.Duration(txn.cost * txn.limit.emissionInterval),
ttl: time.Duration(txn.limit.burstOffset),
}
} else {
newBuckets[txn.bucketKey] = d.newTAT
}
}

Expand All @@ -319,14 +325,28 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision

if batchDecision.allowed {
if len(newBuckets) > 0 {
err = l.source.BatchSet(ctx, newBuckets)
createdBuckets, err := l.source.BatchSetNotExisting(ctx, newBuckets)
if err != nil {
return nil, fmt.Errorf("batch set for %d keys: %w", len(newBuckets), err)
}
for k, created := range createdBuckets {
if !created {
// A bucket was created by another request, fall back to
// BatchSet.
expiredBuckets[k] = newBuckets[k]
}
}
}

if len(expiredBuckets) > 0 {
err := l.source.BatchSet(ctx, expiredBuckets)
if err != nil {
return nil, fmt.Errorf("batch set for %d keys: %w", len(expiredBuckets), err)
}
}

if len(incrBuckets) > 0 {
err = l.source.BatchIncrement(ctx, incrBuckets)
err := l.source.BatchIncrement(ctx, incrBuckets)
if err != nil {
return nil, fmt.Errorf("batch increment for %d keys: %w", len(incrBuckets), err)
}
Expand Down
19 changes: 19 additions & 0 deletions ratelimits/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type Source interface {
// the underlying storage client implementation).
BatchSet(ctx context.Context, bucketKeys map[string]time.Time) error

// BatchSetNotExisting attempts to set TATs for the specified bucketKeys if
// they do not already exist. Returns a map indicating which keys were set
// successfully.
BatchSetNotExisting(ctx context.Context, buckets map[string]time.Time) (map[string]bool, error)

// BatchIncrement updates the TATs for the specified bucketKeys, similar to
// BatchSet. Implementations MUST ensure non-blocking operations by either:
// a) applying a deadline or timeout to the context WITHIN the method, or
Expand Down Expand Up @@ -79,6 +84,20 @@ func (in *inmem) BatchSet(_ context.Context, bucketKeys map[string]time.Time) er
return nil
}

func (in *inmem) BatchSetNotExisting(_ context.Context, bucketKeys map[string]time.Time) (map[string]bool, error) {
in.Lock()
defer in.Unlock()
results := make(map[string]bool, len(bucketKeys))
for k, v := range bucketKeys {
_, ok := in.m[k]
if !ok {
in.m[k] = v
results[k] = true
}
}
return results, nil
}

func (in *inmem) BatchIncrement(_ context.Context, bucketKeys map[string]increment) error {
in.Lock()
defer in.Unlock()
Expand Down
32 changes: 32 additions & 0 deletions ratelimits/source_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,38 @@ func (r *RedisSource) BatchSet(ctx context.Context, buckets map[string]time.Time
return nil
}

// BatchSetNotExisting attempts to set TATs for the specified bucketKeys if they
// do not already exist. Returns a map indicating which keys were set successfully.
func (r *RedisSource) BatchSetNotExisting(ctx context.Context, buckets map[string]time.Time) (map[string]bool, error) {
start := r.clk.Now()

pipeline := r.client.Pipeline()
cmds := make(map[string]*redis.BoolCmd, len(buckets))
for bucketKey, tat := range buckets {
// Set a TTL of TAT + 10 minutes to account for clock skew.
ttl := tat.UTC().Sub(r.clk.Now()) + 10*time.Minute
cmds[bucketKey] = pipeline.SetNX(ctx, bucketKey, tat.UTC().UnixNano(), ttl)
}

_, err := pipeline.Exec(ctx)
totalLatency := r.clk.Since(start)
perSetLatency := totalLatency / time.Duration(len(buckets))
if err != nil {
r.observeLatency("batchsetnotexisting", totalLatency, err)
return nil, err
}

results := make(map[string]bool, len(buckets))
for bucketKey, cmd := range cmds {
success, _ := cmd.Result()
results[bucketKey] = success
r.observeLatency("batchsetnotexisting_entry", perSetLatency, nil)
}

r.observeLatency("batchsetnotexisting", totalLatency, nil)
return results, nil
}

// BatchIncrement updates TATs for the specified bucketKeys using a pipelined
// Redis Transaction in order to reduce the number of round-trips to each Redis
// shard.
Expand Down

0 comments on commit 82ace44

Please sign in to comment.