Skip to content

Commit

Permalink
ratelimits: Remove a couple metrics that we're not finding useful
Browse files Browse the repository at this point in the history
  • Loading branch information
beautifulentropy committed Dec 18, 2024
1 parent 0e5e1e9 commit 092f4ec
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 155 deletions.
21 changes: 4 additions & 17 deletions ratelimits/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ type Limiter struct {
source Source
clk clock.Clock

spendLatency *prometheus.HistogramVec
overrideUsageGauge *prometheus.GaugeVec
spendLatency *prometheus.HistogramVec
}

// NewLimiter returns a new *Limiter. The provided source must be safe for
Expand All @@ -52,17 +51,10 @@ func NewLimiter(clk clock.Clock, source Source, stats prometheus.Registerer) (*L
}, []string{"limit", "decision"})
stats.MustRegister(spendLatency)

overrideUsageGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "ratelimits_override_usage",
Help: "Proportion of override limit used, by limit name and bucket key.",
}, []string{"limit", "bucket_key"})
stats.MustRegister(overrideUsageGauge)

return &Limiter{
source: source,
clk: clk,
spendLatency: spendLatency,
overrideUsageGauge: overrideUsageGauge,
source: source,
clk: clk,
spendLatency: spendLatency,
}, nil
}

Expand Down Expand Up @@ -284,11 +276,6 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision
storedTAT, bucketExists := tats[txn.bucketKey]
d := maybeSpend(l.clk, txn, storedTAT)

if txn.limit.isOverride() {
utilization := float64(txn.limit.Burst-d.remaining) / float64(txn.limit.Burst)
l.overrideUsageGauge.WithLabelValues(txn.limit.name.String(), txn.limit.overrideKey).Set(utilization)
}

if d.allowed && (storedTAT != d.newTAT) && txn.spend {
if !bucketExists {
newBuckets[txn.bucketKey] = d.newTAT
Expand Down
13 changes: 0 additions & 13 deletions ratelimits/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus"

"github.com/letsencrypt/boulder/config"
berrors "github.com/letsencrypt/boulder/errors"
Expand Down Expand Up @@ -60,12 +59,6 @@ func TestLimiter_CheckWithLimitOverrides(t *testing.T) {
testCtx, limiters, txnBuilder, clk, testIP := setup(t)
for name, l := range limiters {
t.Run(name, func(t *testing.T) {
// Verify our overrideUsageGauge is being set correctly. 0.0 == 0%
// of the bucket has been consumed.
test.AssertMetricWithLabelsEquals(t, l.overrideUsageGauge, prometheus.Labels{
"limit": NewRegistrationsPerIPAddress.String(),
"bucket_key": joinWithColon(NewRegistrationsPerIPAddress.EnumString(), tenZeroZeroTwo)}, 0)

overriddenBucketKey, err := newIPAddressBucketKey(NewRegistrationsPerIPAddress, net.ParseIP(tenZeroZeroTwo))
test.AssertNotError(t, err, "should not error")
overriddenLimit, err := txnBuilder.getLimit(NewRegistrationsPerIPAddress, overriddenBucketKey)
Expand All @@ -87,12 +80,6 @@ func TestLimiter_CheckWithLimitOverrides(t *testing.T) {
test.AssertEquals(t, d.remaining, int64(0))
test.AssertEquals(t, d.resetIn, time.Second)

// Verify our overrideUsageGauge is being set correctly. 1.0 == 100%
// of the bucket has been consumed.
test.AssertMetricWithLabelsEquals(t, l.overrideUsageGauge, prometheus.Labels{
"limit_name": NewRegistrationsPerIPAddress.String(),
"bucket_key": joinWithColon(NewRegistrationsPerIPAddress.EnumString(), tenZeroZeroTwo)}, 1.0)

// Verify our RetryIn is correct. 1 second == 1000 milliseconds and
// 1000/40 = 25 milliseconds per request.
test.AssertEquals(t, d.retryIn, time.Millisecond*25)
Expand Down
129 changes: 4 additions & 125 deletions ratelimits/source_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ratelimits
import (
"context"
"errors"
"net"
"time"

"github.com/jmhodges/clock"
Expand All @@ -16,76 +15,22 @@ var _ Source = (*RedisSource)(nil)

// RedisSource is a ratelimits source backed by sharded Redis.
type RedisSource struct {
client *redis.Ring
clk clock.Clock
latency *prometheus.HistogramVec
client *redis.Ring
clk clock.Clock
}

// NewRedisSource returns a new Redis backed source using the provided
// *redis.Ring client.
func NewRedisSource(client *redis.Ring, clk clock.Clock, stats prometheus.Registerer) *RedisSource {
latency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "ratelimits_latency",
Help: "Histogram of Redis call latencies labeled by call=[set|get|delete|ping] and result=[success|error]",
// Exponential buckets ranging from 0.0005s to 3s.
Buckets: prometheus.ExponentialBucketsRange(0.0005, 3, 8),
},
[]string{"call", "result"},
)
stats.MustRegister(latency)

return &RedisSource{
client: client,
clk: clk,
latency: latency,
}
}

var errMixedSuccess = errors.New("some keys not found")

// resultForError returns a string representing the result of the operation
// based on the provided error.
func resultForError(err error) string {
if errors.Is(errMixedSuccess, err) {
// Indicates that some of the keys in a batchset operation were not found.
return "mixedSuccess"
} else if errors.Is(redis.Nil, err) {
// Bucket key does not exist.
return "notFound"
} else if errors.Is(err, context.DeadlineExceeded) {
// Client read or write deadline exceeded.
return "deadlineExceeded"
} else if errors.Is(err, context.Canceled) {
// Caller canceled the operation.
return "canceled"
}
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
// Dialer timed out connecting to Redis.
return "timeout"
}
var redisErr redis.Error
if errors.Is(err, redisErr) {
// An internal error was returned by the Redis server.
return "redisError"
}
return "failed"
}

func (r *RedisSource) observeLatency(call string, latency time.Duration, err error) {
result := "success"
if err != nil {
result = resultForError(err)
client: client,
clk: clk,
}
r.latency.With(prometheus.Labels{"call": call, "result": result}).Observe(latency.Seconds())
}

// BatchSet stores TATs at the specified bucketKeys using a pipelined Redis
// Transaction in order to reduce the number of round-trips to each Redis shard.
func (r *RedisSource) BatchSet(ctx context.Context, buckets map[string]time.Time) error {
start := r.clk.Now()

pipeline := r.client.Pipeline()
for bucketKey, tat := range buckets {
// Set a TTL of TAT + 10 minutes to account for clock skew.
Expand All @@ -94,25 +39,14 @@ func (r *RedisSource) BatchSet(ctx context.Context, buckets map[string]time.Time
}
_, err := pipeline.Exec(ctx)
if err != nil {
r.observeLatency("batchset", r.clk.Since(start), err)
return err
}

totalLatency := r.clk.Since(start)
perSetLatency := totalLatency / time.Duration(len(buckets))
for range buckets {
r.observeLatency("batchset_entry", perSetLatency, nil)
}

r.observeLatency("batchset", totalLatency, nil)
return nil
}

// BatchSetNotExisting attempts to set TATs for the specified bucketKeys if they
// do not already exist. Returns a map indicating which keys already existed.
func (r *RedisSource) BatchSetNotExisting(ctx context.Context, buckets map[string]time.Time) (map[string]bool, error) {
start := r.clk.Now()

pipeline := r.client.Pipeline()
cmds := make(map[string]*redis.BoolCmd, len(buckets))
for bucketKey, tat := range buckets {
Expand All @@ -122,74 +56,49 @@ func (r *RedisSource) BatchSetNotExisting(ctx context.Context, buckets map[strin
}
_, err := pipeline.Exec(ctx)
if err != nil {
r.observeLatency("batchsetnotexisting", r.clk.Since(start), err)
return nil, err
}

alreadyExists := make(map[string]bool, len(buckets))
totalLatency := r.clk.Since(start)
perSetLatency := totalLatency / time.Duration(len(buckets))
for bucketKey, cmd := range cmds {
success, err := cmd.Result()
if err != nil {
r.observeLatency("batchsetnotexisting_entry", perSetLatency, err)
return nil, err
}
if !success {
alreadyExists[bucketKey] = true
}
r.observeLatency("batchsetnotexisting_entry", perSetLatency, nil)
}

r.observeLatency("batchsetnotexisting", totalLatency, nil)
return alreadyExists, nil
}

// BatchIncrement updates TATs for the specified bucketKeys using a pipelined
// Redis Transaction in order to reduce the number of round-trips to each Redis
// shard.
func (r *RedisSource) BatchIncrement(ctx context.Context, buckets map[string]increment) error {
start := r.clk.Now()

pipeline := r.client.Pipeline()
for bucketKey, incr := range buckets {
pipeline.IncrBy(ctx, bucketKey, incr.cost.Nanoseconds())
pipeline.Expire(ctx, bucketKey, incr.ttl)
}
_, err := pipeline.Exec(ctx)
if err != nil {
r.observeLatency("batchincrby", r.clk.Since(start), err)
return err
}

totalLatency := r.clk.Since(start)
perSetLatency := totalLatency / time.Duration(len(buckets))
for range buckets {
r.observeLatency("batchincrby_entry", perSetLatency, nil)
}

r.observeLatency("batchincrby", totalLatency, nil)
return nil
}

// Get retrieves the TAT at the specified bucketKey. If the bucketKey does not
// exist, ErrBucketNotFound is returned.
func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, error) {
start := r.clk.Now()

tatNano, err := r.client.Get(ctx, bucketKey).Int64()
if err != nil {
if errors.Is(err, redis.Nil) {
// Bucket key does not exist.
r.observeLatency("get", r.clk.Since(start), err)
return time.Time{}, ErrBucketNotFound
}
// An error occurred while retrieving the TAT.
r.observeLatency("get", r.clk.Since(start), err)
return time.Time{}, err
}

r.observeLatency("get", r.clk.Since(start), nil)
return time.Unix(0, tatNano).UTC(), nil
}

Expand All @@ -198,21 +107,15 @@ func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, err
// shard. If a bucketKey does not exist, it WILL NOT be included in the returned
// map.
func (r *RedisSource) BatchGet(ctx context.Context, bucketKeys []string) (map[string]time.Time, error) {
start := r.clk.Now()

pipeline := r.client.Pipeline()
for _, bucketKey := range bucketKeys {
pipeline.Get(ctx, bucketKey)
}
results, err := pipeline.Exec(ctx)
if err != nil && !errors.Is(err, redis.Nil) {
r.observeLatency("batchget", r.clk.Since(start), err)
return nil, err
}

totalLatency := r.clk.Since(start)
perEntryLatency := totalLatency / time.Duration(len(bucketKeys))

tats := make(map[string]time.Time, len(bucketKeys))
notFoundCount := 0
for i, result := range results {
Expand All @@ -221,59 +124,35 @@ func (r *RedisSource) BatchGet(ctx context.Context, bucketKeys []string) (map[st
if !errors.Is(err, redis.Nil) {
// This should never happen as any errors should have been
// caught after the pipeline.Exec() call.
r.observeLatency("batchget", r.clk.Since(start), err)
return nil, err
}
// Bucket key does not exist.
r.observeLatency("batchget_entry", perEntryLatency, err)
notFoundCount++
continue
}
tats[bucketKeys[i]] = time.Unix(0, tatNano).UTC()
r.observeLatency("batchget_entry", perEntryLatency, nil)
}

var batchErr error
if notFoundCount < len(results) {
// Some keys were not found.
batchErr = errMixedSuccess
} else if notFoundCount == len(results) {
// All keys were not found.
batchErr = redis.Nil
}

r.observeLatency("batchget", totalLatency, batchErr)
return tats, nil
}

// Delete deletes the TAT at the specified bucketKey ('name:id'). A nil return
// value does not indicate that the bucketKey existed.
func (r *RedisSource) Delete(ctx context.Context, bucketKey string) error {
start := r.clk.Now()

err := r.client.Del(ctx, bucketKey).Err()
if err != nil {
r.observeLatency("delete", r.clk.Since(start), err)
return err
}

r.observeLatency("delete", r.clk.Since(start), nil)
return nil
}

// Ping checks that each shard of the *redis.Ring is reachable using the PING
// command.
func (r *RedisSource) Ping(ctx context.Context) error {
start := r.clk.Now()

err := r.client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
return shard.Ping(ctx).Err()
})
if err != nil {
r.observeLatency("ping", r.clk.Since(start), err)
return err
}

r.observeLatency("ping", r.clk.Since(start), nil)
return nil
}

0 comments on commit 092f4ec

Please sign in to comment.