Skip to content

Commit

Permalink
Distributor: Move resource attribute promotion specialization to Over…
Browse files Browse the repository at this point in the history
…rides

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Dec 11, 2024
1 parent 5086788 commit 7f8f55e
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 23 deletions.
3 changes: 2 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,14 @@ const OTLPPushEndpoint = "/otlp/v1/metrics"
// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
limits.SpecializeResourceAttributePromotionConfig(pushConfig.OTelResourceAttributePromotionConfig)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(
pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader,
a.cfg.SkipLabelCountValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger,
), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(
pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.OTelResourceAttributePromotionConfig,
pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits,
pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger,
), true, false, "POST")

Expand Down
10 changes: 2 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,8 @@ type Distributor struct {
func defaultSleep(d time.Duration) { time.Sleep(d) }
func defaultNow() time.Time { return time.Now() }

// OTelResourceAttributePromotionConfig contains methods for configuring OTel resource attribute promotion.
type OTelResourceAttributePromotionConfig interface {
// PromoteOTelResourceAttributes returns which OTel resource attributes to promote for tenant ID.
PromoteOTelResourceAttributes(id string) []string
}

// Config contains the configuration required to
// create a Distributor
// create a Distributor.
type Config struct {
PoolConfig PoolConfig `yaml:"pool"`

Expand Down Expand Up @@ -247,7 +241,7 @@ type Config struct {
ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"`

// OTelResourceAttributePromotionConfig allows for specializing OTel resource attribute promotion.
OTelResourceAttributePromotionConfig OTelResourceAttributePromotionConfig `yaml:"-"`
OTelResourceAttributePromotionConfig validation.OTelResourceAttributePromotionConfig `yaml:"-"`
}

// PushWrapper wraps around a push. It is similar to middleware.Interface.
Expand Down
9 changes: 3 additions & 6 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ const (
)

type OTLPHandlerLimits interface {
validation.OTelResourceAttributePromotionConfig

OTelMetricSuffixesEnabled(id string) bool
OTelCreatedTimestampZeroIngestionEnabled(id string) bool
PromoteOTelResourceAttributes(id string) []string
OTelKeepIdentifyingResourceAttributes(id string) bool
}

Expand All @@ -60,7 +61,6 @@ func OTLPHandler(
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
limits OTLPHandlerLimits,
resourceAttributePromotionConfig OTelResourceAttributePromotionConfig,
retryCfg RetryConfig,
push PushFunc,
pushMetrics *PushMetrics,
Expand Down Expand Up @@ -171,10 +171,7 @@ func OTLPHandler(
}
addSuffixes := limits.OTelMetricSuffixesEnabled(tenantID)
enableCTZeroIngestion := limits.OTelCreatedTimestampZeroIngestionEnabled(tenantID)
if resourceAttributePromotionConfig == nil {
resourceAttributePromotionConfig = limits
}
promoteResourceAttributes := resourceAttributePromotionConfig.PromoteOTelResourceAttributes(tenantID)
promoteResourceAttributes := limits.PromoteOTelResourceAttributes(tenantID)
keepIdentifyingResourceAttributes := limits.OTelKeepIdentifyingResourceAttributes(tenantID)

pushMetrics.IncOTLPRequest(tenantID)
Expand Down
16 changes: 9 additions & 7 deletions pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func BenchmarkOTLPHandler(b *testing.B) {
validation.NewMockTenantLimits(map[string]*validation.Limits{}),
)
require.NoError(b, err)
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger())
handler := OTLPHandler(100000, nil, nil, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger())

b.Run("protobuf", func(b *testing.B) {
req := createOTLPProtoRequest(b, exportReq, "")
Expand Down Expand Up @@ -491,7 +491,7 @@ func TestHandlerOTLPPush(t *testing.T) {
expectedRetryHeader bool
promoteResourceAttributes []string
expectedAttributePromotions map[string]string
resourceAttributePromotionConfig OTelResourceAttributePromotionConfig
resourceAttributePromotionConfig validation.OTelResourceAttributePromotionConfig
}

samplesVerifierFunc := func(t *testing.T, pushReq *Request, tc testCase) error {
Expand Down Expand Up @@ -742,6 +742,8 @@ func TestHandlerOTLPPush(t *testing.T) {
}),
)
require.NoError(t, err)
limits.SpecializeResourceAttributePromotionConfig(tt.resourceAttributePromotionConfig)

pusher := func(_ context.Context, pushReq *Request) error {
t.Helper()
t.Cleanup(pushReq.CleanUp)
Expand All @@ -750,7 +752,7 @@ func TestHandlerOTLPPush(t *testing.T) {

logs := &concurrency.SyncBuffer{}
retryConfig := RetryConfig{Enabled: true, MinBackoff: 5 * time.Second, MaxBackoff: 5 * time.Second}
handler := OTLPHandler(tt.maxMsgSize, nil, nil, limits, tt.resourceAttributePromotionConfig, retryConfig, pusher, nil, nil, level.NewFilter(log.NewLogfmtLogger(logs), level.AllowInfo()))
handler := OTLPHandler(tt.maxMsgSize, nil, nil, limits, retryConfig, pusher, nil, nil, level.NewFilter(log.NewLogfmtLogger(logs), level.AllowInfo()))

resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
Expand Down Expand Up @@ -823,7 +825,7 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) {

req := createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "")
resp := httptest.NewRecorder()
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
handler := OTLPHandler(100000, nil, nil, limits, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
request, err := pushReq.WriteRequest()
assert.NoError(t, err)
assert.Len(t, request.Timeseries, 3)
Expand Down Expand Up @@ -869,7 +871,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {

req := createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "")
resp := httptest.NewRecorder()
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
handler := OTLPHandler(100000, nil, nil, limits, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
request, err := pushReq.WriteRequest()
t.Cleanup(pushReq.CleanUp)
require.NoError(t, err)
Expand All @@ -895,7 +897,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {

req = createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "")
resp = httptest.NewRecorder()
handler = OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
handler = OTLPHandler(100000, nil, nil, limits, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
request, err := pushReq.WriteRequest()
t.Cleanup(pushReq.CleanUp)
require.NoError(t, err)
Expand Down Expand Up @@ -923,7 +925,7 @@ func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) {

resp := httptest.NewRecorder()

handler := OTLPHandler(140, nil, nil, nil, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger())
handler := OTLPHandler(140, nil, nil, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusRequestEntityTooLarge, resp.Code)
body, err := io.ReadAll(resp.Body)
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) {

return nil
}
h := OTLPHandler(200, util.NewBufferPool(0), nil, otlpLimitsMock{}, nil, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger())
h := OTLPHandler(200, util.NewBufferPool(0), nil, otlpLimitsMock{}, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger())
srv.HTTP.Handle("/otlp", h)

// start the server
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ type TenantLimits interface {
type Overrides struct {
defaultLimits *Limits
tenantLimits TenantLimits

// otelResourceAttributePromotionCfg, if set, specializes OTel resource attribute promotion configuration.
otelResourceAttributePromotionCfg OTelResourceAttributePromotionConfig
}

// NewOverrides makes a new Overrides.
Expand Down Expand Up @@ -1112,9 +1115,18 @@ func (o *Overrides) OTelCreatedTimestampZeroIngestionEnabled(tenantID string) bo
}

func (o *Overrides) PromoteOTelResourceAttributes(tenantID string) []string {
if o.otelResourceAttributePromotionCfg != nil {
return o.otelResourceAttributePromotionCfg.PromoteOTelResourceAttributes(tenantID)
}
return o.getOverridesForUser(tenantID).PromoteOTelResourceAttributes
}

// SpecializeResourceAttributePromotionConfig specializes OTel resource attribute promotion configuration.
// This is to allow for plugging in a non-default method for configuring resource attribute promotion.
func (o *Overrides) SpecializeResourceAttributePromotionConfig(specialization OTelResourceAttributePromotionConfig) {
o.otelResourceAttributePromotionCfg = specialization
}

func (o *Overrides) OTelKeepIdentifyingResourceAttributes(tenantID string) bool {
return o.getOverridesForUser(tenantID).OTelKeepIdentifyingResourceAttributes
}
Expand Down Expand Up @@ -1147,6 +1159,12 @@ func (o *Overrides) getOverridesForUser(userID string) *Limits {
return o.defaultLimits
}

// OTelResourceAttributePromotionConfig contains methods for configuring OTel resource attribute promotion.
type OTelResourceAttributePromotionConfig interface {
// PromoteOTelResourceAttributes returns which OTel resource attributes to promote for tenant ID.
PromoteOTelResourceAttributes(id string) []string
}

// AllTrueBooleansPerTenant returns true only if limit func is true for all given tenants
func AllTrueBooleansPerTenant(tenantIDs []string, f func(string) bool) bool {
for _, tenantID := range tenantIDs {
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,43 @@ alertmanager_max_grafana_state_size_bytes: "0"
}
}

func TestOverrides_PromoteOTelResourceAttributes(t *testing.T) {
const tenant = "tenant"

t.Run("default implementation", func(t *testing.T) {
overrides, err := NewOverrides(
Limits{PromoteOTelResourceAttributes: []string{"default.attribute"}},
NewMockTenantLimits(map[string]*Limits{}),
)
require.NoError(t, err)

attrs := overrides.PromoteOTelResourceAttributes(tenant)
require.Equal(t, []string{"default.attribute"}, attrs)
})

t.Run("specialized implementation", func(t *testing.T) {
overrides, err := NewOverrides(
Limits{PromoteOTelResourceAttributes: []string{"default.attribute"}},
NewMockTenantLimits(map[string]*Limits{}),
)
require.NoError(t, err)

overrides.SpecializeResourceAttributePromotionConfig(fakeOTelResourceAttributePromotionConfig{
resourceAttrs: map[string][]string{tenant: {"specialized.attribute"}},
})
attrs := overrides.PromoteOTelResourceAttributes(tenant)
require.Equal(t, []string{"specialized.attribute"}, attrs)
})
}

type fakeOTelResourceAttributePromotionConfig struct {
resourceAttrs map[string][]string
}

func (c fakeOTelResourceAttributePromotionConfig) PromoteOTelResourceAttributes(tenant string) []string {
return c.resourceAttrs[tenant]
}

func getDefaultLimits() Limits {
limits := Limits{}
flagext.DefaultValues(&limits)
Expand Down

0 comments on commit 7f8f55e

Please sign in to comment.