Skip to content

Commit

Permalink
Distributor: Move resource attribute promotion specialization to Over…
Browse files Browse the repository at this point in the history
…rides

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Dec 11, 2024
1 parent 5086788 commit 48281d9
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 15 deletions.
3 changes: 2 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,14 @@ const OTLPPushEndpoint = "/otlp/v1/metrics"
// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
limits.SpecializeResourceAttributePromotionConfig(pushConfig.OTelResourceAttributePromotionConfig)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(
pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader,
a.cfg.SkipLabelCountValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger,
), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(
pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.OTelResourceAttributePromotionConfig,
pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits,
pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger,
), true, false, "POST")

Expand Down
10 changes: 2 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,8 @@ type Distributor struct {
func defaultSleep(d time.Duration) { time.Sleep(d) }
func defaultNow() time.Time { return time.Now() }

// OTelResourceAttributePromotionConfig contains methods for configuring OTel resource attribute promotion.
type OTelResourceAttributePromotionConfig interface {
// PromoteOTelResourceAttributes returns which OTel resource attributes to promote for tenant ID.
PromoteOTelResourceAttributes(id string) []string
}

// Config contains the configuration required to
// create a Distributor
// create a Distributor.
type Config struct {
PoolConfig PoolConfig `yaml:"pool"`

Expand Down Expand Up @@ -247,7 +241,7 @@ type Config struct {
ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"`

// OTelResourceAttributePromotionConfig allows for specializing OTel resource attribute promotion.
OTelResourceAttributePromotionConfig OTelResourceAttributePromotionConfig `yaml:"-"`
OTelResourceAttributePromotionConfig validation.OTelResourceAttributePromotionConfig `yaml:"-"`
}

// PushWrapper wraps around a push. It is similar to middleware.Interface.
Expand Down
9 changes: 3 additions & 6 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ const (
)

type OTLPHandlerLimits interface {
validation.OTelResourceAttributePromotionConfig

OTelMetricSuffixesEnabled(id string) bool
OTelCreatedTimestampZeroIngestionEnabled(id string) bool
PromoteOTelResourceAttributes(id string) []string
OTelKeepIdentifyingResourceAttributes(id string) bool
}

Expand All @@ -60,7 +61,6 @@ func OTLPHandler(
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
limits OTLPHandlerLimits,
resourceAttributePromotionConfig OTelResourceAttributePromotionConfig,
retryCfg RetryConfig,
push PushFunc,
pushMetrics *PushMetrics,
Expand Down Expand Up @@ -171,10 +171,7 @@ func OTLPHandler(
}
addSuffixes := limits.OTelMetricSuffixesEnabled(tenantID)
enableCTZeroIngestion := limits.OTelCreatedTimestampZeroIngestionEnabled(tenantID)
if resourceAttributePromotionConfig == nil {
resourceAttributePromotionConfig = limits
}
promoteResourceAttributes := resourceAttributePromotionConfig.PromoteOTelResourceAttributes(tenantID)
promoteResourceAttributes := limits.PromoteOTelResourceAttributes(tenantID)
keepIdentifyingResourceAttributes := limits.OTelKeepIdentifyingResourceAttributes(tenantID)

pushMetrics.IncOTLPRequest(tenantID)
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ type TenantLimits interface {
type Overrides struct {
defaultLimits *Limits
tenantLimits TenantLimits

// otelResourceAttributePromotionCfg, if set, specializes OTel resource attribute promotion configuration.
otelResourceAttributePromotionCfg OTelResourceAttributePromotionConfig
}

// NewOverrides makes a new Overrides.
Expand Down Expand Up @@ -1112,9 +1115,18 @@ func (o *Overrides) OTelCreatedTimestampZeroIngestionEnabled(tenantID string) bo
}

func (o *Overrides) PromoteOTelResourceAttributes(tenantID string) []string {
if o.otelResourceAttributePromotionCfg != nil {
return o.otelResourceAttributePromotionCfg.PromoteOTelResourceAttributes(tenantID)
}
return o.getOverridesForUser(tenantID).PromoteOTelResourceAttributes
}

// SpecializeResourceAttributePromotionConfig specializes OTel resource attribute promotion configuration.
// This is to allow for plugging in a non-default method for configuring resource attribute promotion.
func (o *Overrides) SpecializeResourceAttributePromotionConfig(specialization OTelResourceAttributePromotionConfig) {
o.otelResourceAttributePromotionCfg = specialization
}

func (o *Overrides) OTelKeepIdentifyingResourceAttributes(tenantID string) bool {
return o.getOverridesForUser(tenantID).OTelKeepIdentifyingResourceAttributes
}
Expand Down Expand Up @@ -1147,6 +1159,12 @@ func (o *Overrides) getOverridesForUser(userID string) *Limits {
return o.defaultLimits
}

// OTelResourceAttributePromotionConfig contains methods for configuring OTel resource attribute promotion.
type OTelResourceAttributePromotionConfig interface {
// PromoteOTelResourceAttributes returns which OTel resource attributes to promote for tenant ID.
PromoteOTelResourceAttributes(id string) []string
}

// AllTrueBooleansPerTenant returns true only if limit func is true for all given tenants
func AllTrueBooleansPerTenant(tenantIDs []string, f func(string) bool) bool {
for _, tenantID := range tenantIDs {
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,43 @@ alertmanager_max_grafana_state_size_bytes: "0"
}
}

func TestOverrides_PromoteOTelResourceAttributes(t *testing.T) {
const tenant = "tenant"

t.Run("default implementation", func(t *testing.T) {
overrides, err := NewOverrides(
Limits{PromoteOTelResourceAttributes: []string{"default.attribute"}},
NewMockTenantLimits(map[string]*Limits{}),
)
require.NoError(t, err)

attrs := overrides.PromoteOTelResourceAttributes(tenant)
require.Equal(t, []string{"default.attribute"}, attrs)
})

t.Run("specialized implementation", func(t *testing.T) {
overrides, err := NewOverrides(
Limits{PromoteOTelResourceAttributes: []string{"default.attribute"}},
NewMockTenantLimits(map[string]*Limits{}),
)
require.NoError(t, err)

overrides.SpecializeResourceAttributePromotionConfig(fakeOTelResourceAttributePromotionConfig{
resourceAttrs: map[string][]string{tenant: []string{"specialized.attribute"}},
})
attrs := overrides.PromoteOTelResourceAttributes(tenant)
require.Equal(t, []string{"specialized.attribute"}, attrs)
})
}

type fakeOTelResourceAttributePromotionConfig struct {
resourceAttrs map[string][]string
}

func (c fakeOTelResourceAttributePromotionConfig) PromoteOTelResourceAttributes(tenant string) []string {
return c.resourceAttrs[tenant]
}

func getDefaultLimits() Limits {
limits := Limits{}
flagext.DefaultValues(&limits)
Expand Down

0 comments on commit 48281d9

Please sign in to comment.