Skip to content

Commit

Permalink
feat: exclude matching metrics (#1914)
Browse files Browse the repository at this point in the history
* feat: add config item for exclude_matching_metrics

* feat: event exclusion logic

* style: use generic matcher fn type definition and derive include/exclude from it

* refactor: leverage aliasing

* test: fix

* refactor: strict typing

* test: exclude processes

* style: typing

* refactor: reduce function complexity (linter warning)
  • Loading branch information
DavSanchez authored Aug 29, 2024
1 parent dab967b commit 08ce4b4
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 63 deletions.
13 changes: 10 additions & 3 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type context struct {
EntityMap entity.KnownIDs
idLookup host.IDLookup
shouldIncludeEvent sampler.IncludeSampleMatchFn
shouldExcludeEvent sampler.ExcludeSampleMatchFn
}

func (c *context) Context() context2.Context {
Expand Down Expand Up @@ -206,6 +207,7 @@ func NewContext(
resolver hostname.ResolverChangeNotifier,
lookup host.IDLookup,
sampleMatchFn sampler.IncludeSampleMatchFn,
sampleExcludeFn sampler.ExcludeSampleMatchFn,
) *context {
ctx, cancel := context2.WithCancel(context2.Background())

Expand All @@ -223,6 +225,7 @@ func NewContext(
resolver: resolver,
idLookup: lookup,
shouldIncludeEvent: sampleMatchFn,
shouldExcludeEvent: sampleExcludeFn,
agentKey: agentKey,
}
}
Expand Down Expand Up @@ -280,8 +283,9 @@ func NewAgent(
cloudHarvester.Initialize(cloud.WithProvider(cloud.Type(cfg.CloudProvider)))

idLookupTable := NewIdLookup(hostnameResolver, cloudHarvester, cfg.DisplayName)
sampleMatchFn := sampler.NewSampleMatchFn(cfg.EnableProcessMetrics, cfg.IncludeMetricsMatchers, ffRetriever)
ctx := NewContext(cfg, buildVersion, hostnameResolver, idLookupTable, sampleMatchFn)
sampleMatchFn := sampler.NewSampleMatchFn(cfg.EnableProcessMetrics, config.MetricsMap(cfg.IncludeMetricsMatchers), ffRetriever)
sampleExcludeFn := sampler.NewSampleMatchFn(cfg.EnableProcessMetrics, config.MetricsMap(cfg.ExcludeMetricsMatchers), ffRetriever)
ctx := NewContext(cfg, buildVersion, hostnameResolver, idLookupTable, sampler.IncludeSampleMatchFn(sampleMatchFn), sampler.ExcludeSampleMatchFn(sampleExcludeFn))

agentKey, err := idLookupTable.AgentKey()
if err != nil {
Expand Down Expand Up @@ -1158,7 +1162,10 @@ func (c *context) SendEvent(event sample.Event, entityKey entity.Key) {
}
}

includeSample := c.shouldIncludeEvent(event)
// check if event should be included
// include takes precedence, so the event will be included if
// it IS NOT EXCLUDED or if it IS INCLUDED
includeSample := !c.shouldExcludeEvent(event) || c.shouldIncludeEvent(event)
if !includeSample {
aclog.
WithField("entity_key", entityKey.String()).
Expand Down
66 changes: 63 additions & 3 deletions internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newTesting(cfg *config.Config) *Agent {
cloudDetector := cloud.NewDetector(true, 0, 0, 0, false)
lookups := NewIdLookup(hostname.CreateResolver("", "", true), cloudDetector, cfg.DisplayName)

ctx := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, lookups, matcher)
ctx := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, lookups, matcher, matcher)

st := delta.NewStore(dataDir, "default", cfg.MaxInventorySize, true)

Expand Down Expand Up @@ -146,7 +146,7 @@ func TestIgnoreInventory(t *testing.T) {
}

func TestServicePidMap(t *testing.T) {
ctx := NewContext(&config.Config{}, "", testhelpers.NullHostnameResolver, NilIDLookup, matcher)
ctx := NewContext(&config.Config{}, "", testhelpers.NullHostnameResolver, NilIDLookup, matcher, matcher)
svc, ok := ctx.GetServiceForPid(1)
assert.False(t, ok)
assert.Len(t, svc, 0)
Expand Down Expand Up @@ -915,6 +915,65 @@ func Test_ProcessSampling(t *testing.T) {
}
}

func Test_ProcessSamplingExcludes(t *testing.T) {
t.Parallel()

someSample := &types.ProcessSample{
ProcessDisplayName: "some-process",
}

type testCase struct {
name string
c *config.Config
ff feature_flags.Retriever
want bool
}
testCases := []testCase{
{
name: "Include not matching must not exclude",
c: &config.Config{IncludeMetricsMatchers: map[string][]string{"process.name": {"does-not-match"}}, DisableCloudMetadata: true},
ff: test.NewFFRetrieverReturning(false, false),
want: false,
},
{
name: "Include matching should not exclude",
c: &config.Config{IncludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, DisableCloudMetadata: true},
ff: test.NewFFRetrieverReturning(false, false),
want: false,
},
{
name: "Exclude matching should exclude",
c: &config.Config{ExcludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, DisableCloudMetadata: true},
ff: test.NewFFRetrieverReturning(false, false),
want: true,
},
{
name: "Exclude not matching should not exclude",
c: &config.Config{ExcludeMetricsMatchers: map[string][]string{"process.name": {"does-not-match"}}, DisableCloudMetadata: true},
ff: test.NewFFRetrieverReturning(false, false),
want: false,
},
{
name: "Exclude matching should exclude even if include is configured",
c: &config.Config{IncludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, ExcludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, DisableCloudMetadata: true},
ff: test.NewFFRetrieverReturning(false, false),
want: true,
},
}

for _, tc := range testCases {
testCase := tc
a, _ := NewAgent(testCase.c, "test", "userAgent", testCase.ff)

t.Run(testCase.name, func(t *testing.T) {
t.Parallel()

actual := a.Context.shouldExcludeEvent(someSample)
assert.Equal(t, testCase.want, actual)
})
}
}

type fakeEventSender struct{}

func (f fakeEventSender) QueueEvent(_ sample.Event, _ entity.Key) error {
Expand All @@ -941,7 +1000,8 @@ func TestContext_SendEvent_LogTruncatedEvent(t *testing.T) {
"0.0.0",
testhelpers.NewFakeHostnameResolver("foobar", "foo", nil),
NilIDLookup,
func(sample interface{}) bool { return true },
matcher,
matcher,
)
c.eventSender = fakeEventSender{}

Expand Down
2 changes: 1 addition & 1 deletion internal/agent/event_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func TestEventSender_ResponseError(t *testing.T) {
ConnectEnabled: true,
PayloadCompressionLevel: gzip.NoCompression,
}
c := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, host.IDLookup{}, nil)
c := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, host.IDLookup{}, nil, nil)
c.setAgentKey(agentKey)
c.SetAgentIdentity(agentIdn)

Expand Down
20 changes: 18 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ var (

type CustomAttributeMap map[string]interface{}

type MetricsMap map[string][]string

// IncludeMetricsMap configuration type to Map include_matching_metrics setting env var
type IncludeMetricsMap map[string][]string
type IncludeMetricsMap MetricsMap

// IncludeMetricsMap configuration type to Map exclude_matching_metrics setting env var.
type ExcludeMetricsMap MetricsMap

// LogFilters configuration specifies which log entries should be included/excluded.
type LogFilters map[string][]interface{}
Expand Down Expand Up @@ -1229,6 +1234,16 @@ type Config struct {
// Public: Yes
IncludeMetricsMatchers IncludeMetricsMap `yaml:"include_matching_metrics" envconfig:"include_matching_metrics"`

// ExcludeMetricsMatchers Configuration of the metrics matchers that determine which metric data should the agent
// filter out and not send to the New Relic backend.
// If no configuration is defined, the previous behaviour is maintained, i.e., every metric data captured is sent.
// If a configuration is defined, then only metric data not matching the configuration is sent.
// Note that ALL DATA MATCHED WILL BE DROPPED.
// Also note that at present it ONLY APPLIES to metric data related to processes. All other metric data is still being sent as usual.
// Default: none
// Public: Yes
ExcludeMetricsMatchers ExcludeMetricsMap `envconfig:"exclude_matching_metrics" yaml:"exclude_matching_metrics"`

// AgentMetricsEndpoint Set the endpoint (host:port) for the HTTP server the agent will use to server OpenMetrics
// if empty the server will be not spawned
// Default: empty
Expand Down Expand Up @@ -1901,7 +1916,8 @@ func NewConfig() *Config {
MetricsNFSSampleRate: DefaultMetricsNFSSampleRate,
SmartVerboseModeEntryLimit: DefaultSmartVerboseModeEntryLimit,
DefaultIntegrationsTempDir: defaultIntegrationsTempDir,
IncludeMetricsMatchers: defaultMetricsMatcherConfig,
IncludeMetricsMatchers: defaultIncludeMetricsMatcherConfig,
ExcludeMetricsMatchers: defaultExcludeMetricsMatcherConfig,
InventoryQueueLen: DefaultInventoryQueue,
NtpMetrics: NewNtpConfig(),
Http: NewHttpConfig(),
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ var (
defaultProxyValidateCerts = false
defaultProxyConfigPlugin = true
defaultWinRemovableDrives = true
defaultMetricsMatcherConfig = IncludeMetricsMap{}
defaultIncludeMetricsMatcherConfig = IncludeMetricsMap{}
defaultExcludeMetricsMatcherConfig = ExcludeMetricsMap{}
defaultRegisterMaxRetryBoSecs = 60
defaultNtpPool = []string{} // i.e: []string{"time.cloudflare.com"}
defaultNtpEnabled = false
Expand Down
126 changes: 76 additions & 50 deletions pkg/metrics/sampler/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@ var (
}
)

// MatcherFn func that returns whether an event/sample is matched. It satisfies
// the metrics matcher (processor.MatcherChain) interface.
type MatcherFn func(event any) bool

// IncludeSampleMatchFn func that returns whether an event/sample should be included, it satisfies
// the metrics matcher (processor.MatcherChain) interface.
type IncludeSampleMatchFn func(sample interface{}) bool
type IncludeSampleMatchFn MatcherFn

// ExcludeSampleMatchFn func that returns whether an event/sample should be excluded, it satisfies
// the metrics matcher (processor.MatcherChain) interface.
type ExcludeSampleMatchFn MatcherFn

// ExpressionMatcher is an interface every evaluator must implement
type ExpressionMatcher interface {
Expand Down Expand Up @@ -214,7 +222,7 @@ type MatcherChain struct {
// NewMatcherChain creates a new chain of matchers.
// Each expression will generate an matcher that gets added to the chain
// While the chain will be matched for each "sample", it terminates as soon as 1 match is matched (result = true)
func NewMatcherChain(expressions config.IncludeMetricsMap) MatcherChain {
func NewMatcherChain(expressions config.MetricsMap) MatcherChain {
chain := MatcherChain{Matchers: map[string][]ExpressionMatcher{}, Enabled: false}

// no matchers means the chain will be disabled
Expand Down Expand Up @@ -271,82 +279,100 @@ func (ne constantMatcher) String() string {

// NewSampleMatchFn creates new includeSampleMatchFn func, enableProcessMetrics might be nil when
// value was not set.
func NewSampleMatchFn(enableProcessMetrics *bool, includeMetricsMatchers config.IncludeMetricsMap, ffRetriever feature_flags.Retriever) IncludeSampleMatchFn {
func NewSampleMatchFn(enableProcessMetrics *bool, metricsMatchers config.MetricsMap, ffRetriever feature_flags.Retriever) MatcherFn {
// configuration option always takes precedence over FF and matchers configuration
if enableProcessMetrics == nil {
// if config option is not set, check if we have rules defined. those take precedence over the FF
ec := NewMatcherChain(includeMetricsMatchers)
if ec.Enabled {
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Tracef("EnableProcessMetrics is EMPTY and rules ARE defined, process metrics will be ENABLED for matching processes")
return func(sample interface{}) bool {
return ec.Evaluate(sample)
}
matcher := matcherFromMetricsMatchers(metricsMatchers)
if matcher != nil {
return matcher
}

// configuration option is not defined and feature flag is present, FF determines, otherwise
// all process samples will be excluded
return func(sample interface{}) bool {
_, isProcessSample := sample.(*types.ProcessSample)
_, isFlatProcessSample := sample.(*types.FlatProcessSample)

if !isProcessSample && !isFlatProcessSample {
return true
}

enabled, exists := ffRetriever.GetFeatureFlag(fflag.FlagFullProcess)
return exists && enabled
}
return matcherFromFeatureFlag(ffRetriever)
}

if excludeProcessMetrics(enableProcessMetrics) {
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("EnableProcessMetrics is FALSE, process metrics will be DISABLED")
return func(sample interface{}) bool {
switch sample.(type) {
case *types.ProcessSample:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("Got a sample of type '*types.ProcessSample' so excluding sample.")
// no process samples are included
return false
case *types.FlatProcessSample:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("Got a sample of type '*types.FlatProcessSample' so excluding sample.")
// no flat process samples are included
return false
default:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Tracef("Got a sample of type '%s' that should not be excluded.", reflect.TypeOf(sample).String())
// other samples are included
return true
}
}

return matcherForDisabledMetrics()
}

ec := NewMatcherChain(includeMetricsMatchers)
if ec.Enabled {
matcherChain := NewMatcherChain(metricsMatchers)
if matcherChain.Enabled {
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("EnableProcessMetrics is TRUE and rules ARE defined, process metrics will be ENABLED for matching processes")
return func(sample interface{}) bool {
return ec.Evaluate(sample)
}

return matcherChain.Evaluate
}

mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("EnableProcessMetrics is TRUE and rules are NOT defined, ALL process metrics will be ENABLED")

return func(sample interface{}) bool {
// all process samples are included
return true
}
}

func matcherForDisabledMetrics() MatcherFn {
return func(sample interface{}) bool {
switch sample.(type) {
case *types.ProcessSample:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("Got a sample of type '*types.ProcessSample' so excluding sample.")
// no process samples are included
return false
case *types.FlatProcessSample:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("Got a sample of type '*types.FlatProcessSample' so excluding sample.")
// no flat process samples are included
return false
default:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Tracef("Got a sample of type '%s' that should not be excluded.", reflect.TypeOf(sample).String())
// other samples are included
return true
}
}
}

func matcherFromMetricsMatchers(metricsMatchers config.MetricsMap) MatcherFn {
// if config option is not set, check if we have rules defined. those take precedence over the FF
matcherChain := NewMatcherChain(metricsMatchers)
if matcherChain.Enabled {
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Tracef("EnableProcessMetrics is EMPTY and rules ARE defined, process metrics will be ENABLED for matching processes")

return matcherChain.Evaluate
}

return nil
}

func matcherFromFeatureFlag(ffRetriever feature_flags.Retriever) MatcherFn {
return func(sample any) bool {
_, isProcessSample := sample.(*types.ProcessSample)
_, isFlatProcessSample := sample.(*types.FlatProcessSample)

if !isProcessSample && !isFlatProcessSample {
return true
}

enabled, exists := ffRetriever.GetFeatureFlag(fflag.FlagFullProcess)

return exists && enabled
}
}

func excludeProcessMetrics(enableProcessMetrics *bool) bool {
if enableProcessMetrics == nil || *enableProcessMetrics {
return false
Expand Down
4 changes: 2 additions & 2 deletions pkg/metrics/sampler/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func Test_EvaluatorChain_LogTraceMatcher(t *testing.T) {
javaProcessSample := types.ProcessSample{ProcessDisplayName: "java", CmdLine: "/bin/java"}

rule := config.IncludeMetricsMap{"process.name": {"java"}}
ec := sampler.NewMatcherChain(rule)
ec := sampler.NewMatcherChain(config.MetricsMap(rule))

assert.Len(t, ec.Matchers, len(rule))
assert.EqualValues(t, true, ec.Evaluate(javaProcessSample))
Expand Down Expand Up @@ -741,7 +741,7 @@ func TestNewSampleMatchFn(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
matchFn := sampler.NewSampleMatchFn(tt.args.enableProcessMetrics, tt.args.includeMetricsMatchers, tt.args.ffRetriever)
matchFn := sampler.NewSampleMatchFn(tt.args.enableProcessMetrics, config.MetricsMap(tt.args.includeMetricsMatchers), tt.args.ffRetriever)
assert.Equal(t, tt.include, matchFn(tt.args.sample))
})
}
Expand Down
Loading

0 comments on commit 08ce4b4

Please sign in to comment.