From 08ce4b4ef52c4ecb5cacdc4b52bdd7021958c0c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20S=C3=A1nchez?= Date: Thu, 29 Aug 2024 08:58:04 +0100 Subject: [PATCH] feat: exclude matching metrics (#1914) * feat: add config item for exclude_matching_metrics * feat: event exclusion logic * style: use generic matcher fn type definition and derive include/exclude from it * refactor: leverage aliasing * test: fix * refactor: strict typing * test: exclude processes * style: typing * refactor: reduce function complexity (linter warning) --- internal/agent/agent.go | 13 ++- internal/agent/agent_test.go | 66 ++++++++++++++- internal/agent/event_sender_test.go | 2 +- pkg/config/config.go | 20 ++++- pkg/config/defaults.go | 3 +- pkg/metrics/sampler/matcher.go | 126 +++++++++++++++++----------- pkg/metrics/sampler/matcher_test.go | 4 +- test/infra/agent.go | 2 +- 8 files changed, 173 insertions(+), 63 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 605f9fcda..7fdbf27d6 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -160,6 +160,7 @@ type context struct { EntityMap entity.KnownIDs idLookup host.IDLookup shouldIncludeEvent sampler.IncludeSampleMatchFn + shouldExcludeEvent sampler.ExcludeSampleMatchFn } func (c *context) Context() context2.Context { @@ -206,6 +207,7 @@ func NewContext( resolver hostname.ResolverChangeNotifier, lookup host.IDLookup, sampleMatchFn sampler.IncludeSampleMatchFn, + sampleExcludeFn sampler.ExcludeSampleMatchFn, ) *context { ctx, cancel := context2.WithCancel(context2.Background()) @@ -223,6 +225,7 @@ func NewContext( resolver: resolver, idLookup: lookup, shouldIncludeEvent: sampleMatchFn, + shouldExcludeEvent: sampleExcludeFn, agentKey: agentKey, } } @@ -280,8 +283,9 @@ func NewAgent( cloudHarvester.Initialize(cloud.WithProvider(cloud.Type(cfg.CloudProvider))) idLookupTable := NewIdLookup(hostnameResolver, cloudHarvester, cfg.DisplayName) - sampleMatchFn := sampler.NewSampleMatchFn(cfg.EnableProcessMetrics, cfg.IncludeMetricsMatchers, ffRetriever) - ctx := NewContext(cfg, buildVersion, hostnameResolver, idLookupTable, sampleMatchFn) + sampleMatchFn := sampler.NewSampleMatchFn(cfg.EnableProcessMetrics, config.MetricsMap(cfg.IncludeMetricsMatchers), ffRetriever) + sampleExcludeFn := sampler.NewSampleMatchFn(cfg.EnableProcessMetrics, config.MetricsMap(cfg.ExcludeMetricsMatchers), ffRetriever) + ctx := NewContext(cfg, buildVersion, hostnameResolver, idLookupTable, sampler.IncludeSampleMatchFn(sampleMatchFn), sampler.ExcludeSampleMatchFn(sampleExcludeFn)) agentKey, err := idLookupTable.AgentKey() if err != nil { @@ -1158,7 +1162,10 @@ func (c *context) SendEvent(event sample.Event, entityKey entity.Key) { } } - includeSample := c.shouldIncludeEvent(event) + // check if event should be included + // include takes precedence, so the event will be included if + // it IS NOT EXCLUDED or if it IS INCLUDED + includeSample := !c.shouldExcludeEvent(event) || c.shouldIncludeEvent(event) if !includeSample { aclog. WithField("entity_key", entityKey.String()). diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index dc827b473..f3aa68f6e 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -63,7 +63,7 @@ func newTesting(cfg *config.Config) *Agent { cloudDetector := cloud.NewDetector(true, 0, 0, 0, false) lookups := NewIdLookup(hostname.CreateResolver("", "", true), cloudDetector, cfg.DisplayName) - ctx := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, lookups, matcher) + ctx := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, lookups, matcher, matcher) st := delta.NewStore(dataDir, "default", cfg.MaxInventorySize, true) @@ -146,7 +146,7 @@ func TestIgnoreInventory(t *testing.T) { } func TestServicePidMap(t *testing.T) { - ctx := NewContext(&config.Config{}, "", testhelpers.NullHostnameResolver, NilIDLookup, matcher) + ctx := NewContext(&config.Config{}, "", testhelpers.NullHostnameResolver, NilIDLookup, matcher, matcher) svc, ok := ctx.GetServiceForPid(1) assert.False(t, ok) assert.Len(t, svc, 0) @@ -915,6 +915,65 @@ func Test_ProcessSampling(t *testing.T) { } } +func Test_ProcessSamplingExcludes(t *testing.T) { + t.Parallel() + + someSample := &types.ProcessSample{ + ProcessDisplayName: "some-process", + } + + type testCase struct { + name string + c *config.Config + ff feature_flags.Retriever + want bool + } + testCases := []testCase{ + { + name: "Include not matching must not exclude", + c: &config.Config{IncludeMetricsMatchers: map[string][]string{"process.name": {"does-not-match"}}, DisableCloudMetadata: true}, + ff: test.NewFFRetrieverReturning(false, false), + want: false, + }, + { + name: "Include matching should not exclude", + c: &config.Config{IncludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, DisableCloudMetadata: true}, + ff: test.NewFFRetrieverReturning(false, false), + want: false, + }, + { + name: "Exclude matching should exclude", + c: &config.Config{ExcludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, DisableCloudMetadata: true}, + ff: test.NewFFRetrieverReturning(false, false), + want: true, + }, + { + name: "Exclude not matching should not exclude", + c: &config.Config{ExcludeMetricsMatchers: map[string][]string{"process.name": {"does-not-match"}}, DisableCloudMetadata: true}, + ff: test.NewFFRetrieverReturning(false, false), + want: false, + }, + { + name: "Exclude matching should exclude even if include is configured", + c: &config.Config{IncludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, ExcludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, DisableCloudMetadata: true}, + ff: test.NewFFRetrieverReturning(false, false), + want: true, + }, + } + + for _, tc := range testCases { + testCase := tc + a, _ := NewAgent(testCase.c, "test", "userAgent", testCase.ff) + + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + + actual := a.Context.shouldExcludeEvent(someSample) + assert.Equal(t, testCase.want, actual) + }) + } +} + type fakeEventSender struct{} func (f fakeEventSender) QueueEvent(_ sample.Event, _ entity.Key) error { @@ -941,7 +1000,8 @@ func TestContext_SendEvent_LogTruncatedEvent(t *testing.T) { "0.0.0", testhelpers.NewFakeHostnameResolver("foobar", "foo", nil), NilIDLookup, - func(sample interface{}) bool { return true }, + matcher, + matcher, ) c.eventSender = fakeEventSender{} diff --git a/internal/agent/event_sender_test.go b/internal/agent/event_sender_test.go index 16bd65df5..bbbd46a85 100644 --- a/internal/agent/event_sender_test.go +++ b/internal/agent/event_sender_test.go @@ -510,7 +510,7 @@ func TestEventSender_ResponseError(t *testing.T) { ConnectEnabled: true, PayloadCompressionLevel: gzip.NoCompression, } - c := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, host.IDLookup{}, nil) + c := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, host.IDLookup{}, nil, nil) c.setAgentKey(agentKey) c.SetAgentIdentity(agentIdn) diff --git a/pkg/config/config.go b/pkg/config/config.go index 902e4566e..7eca621aa 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -90,8 +90,13 @@ var ( type CustomAttributeMap map[string]interface{} +type MetricsMap map[string][]string + // IncludeMetricsMap configuration type to Map include_matching_metrics setting env var -type IncludeMetricsMap map[string][]string +type IncludeMetricsMap MetricsMap + +// IncludeMetricsMap configuration type to Map exclude_matching_metrics setting env var. +type ExcludeMetricsMap MetricsMap // LogFilters configuration specifies which log entries should be included/excluded. type LogFilters map[string][]interface{} @@ -1229,6 +1234,16 @@ type Config struct { // Public: Yes IncludeMetricsMatchers IncludeMetricsMap `yaml:"include_matching_metrics" envconfig:"include_matching_metrics"` + // ExcludeMetricsMatchers Configuration of the metrics matchers that determine which metric data should the agent + // filter out and not send to the New Relic backend. + // If no configuration is defined, the previous behaviour is maintained, i.e., every metric data captured is sent. + // If a configuration is defined, then only metric data not matching the configuration is sent. + // Note that ALL DATA MATCHED WILL BE DROPPED. + // Also note that at present it ONLY APPLIES to metric data related to processes. All other metric data is still being sent as usual. + // Default: none + // Public: Yes + ExcludeMetricsMatchers ExcludeMetricsMap `envconfig:"exclude_matching_metrics" yaml:"exclude_matching_metrics"` + // AgentMetricsEndpoint Set the endpoint (host:port) for the HTTP server the agent will use to server OpenMetrics // if empty the server will be not spawned // Default: empty @@ -1901,7 +1916,8 @@ func NewConfig() *Config { MetricsNFSSampleRate: DefaultMetricsNFSSampleRate, SmartVerboseModeEntryLimit: DefaultSmartVerboseModeEntryLimit, DefaultIntegrationsTempDir: defaultIntegrationsTempDir, - IncludeMetricsMatchers: defaultMetricsMatcherConfig, + IncludeMetricsMatchers: defaultIncludeMetricsMatcherConfig, + ExcludeMetricsMatchers: defaultExcludeMetricsMatcherConfig, InventoryQueueLen: DefaultInventoryQueue, NtpMetrics: NewNtpConfig(), Http: NewHttpConfig(), diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index bf88a45f3..fcd46d6f9 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -107,7 +107,8 @@ var ( defaultProxyValidateCerts = false defaultProxyConfigPlugin = true defaultWinRemovableDrives = true - defaultMetricsMatcherConfig = IncludeMetricsMap{} + defaultIncludeMetricsMatcherConfig = IncludeMetricsMap{} + defaultExcludeMetricsMatcherConfig = ExcludeMetricsMap{} defaultRegisterMaxRetryBoSecs = 60 defaultNtpPool = []string{} // i.e: []string{"time.cloudflare.com"} defaultNtpEnabled = false diff --git a/pkg/metrics/sampler/matcher.go b/pkg/metrics/sampler/matcher.go index f006c7aea..b934f63d4 100644 --- a/pkg/metrics/sampler/matcher.go +++ b/pkg/metrics/sampler/matcher.go @@ -26,9 +26,17 @@ var ( } ) +// MatcherFn func that returns whether an event/sample is matched. It satisfies +// the metrics matcher (processor.MatcherChain) interface. +type MatcherFn func(event any) bool + // IncludeSampleMatchFn func that returns whether an event/sample should be included, it satisfies // the metrics matcher (processor.MatcherChain) interface. -type IncludeSampleMatchFn func(sample interface{}) bool +type IncludeSampleMatchFn MatcherFn + +// ExcludeSampleMatchFn func that returns whether an event/sample should be excluded, it satisfies +// the metrics matcher (processor.MatcherChain) interface. +type ExcludeSampleMatchFn MatcherFn // ExpressionMatcher is an interface every evaluator must implement type ExpressionMatcher interface { @@ -214,7 +222,7 @@ type MatcherChain struct { // NewMatcherChain creates a new chain of matchers. // Each expression will generate an matcher that gets added to the chain // While the chain will be matched for each "sample", it terminates as soon as 1 match is matched (result = true) -func NewMatcherChain(expressions config.IncludeMetricsMap) MatcherChain { +func NewMatcherChain(expressions config.MetricsMap) MatcherChain { chain := MatcherChain{Matchers: map[string][]ExpressionMatcher{}, Enabled: false} // no matchers means the chain will be disabled @@ -271,82 +279,100 @@ func (ne constantMatcher) String() string { // NewSampleMatchFn creates new includeSampleMatchFn func, enableProcessMetrics might be nil when // value was not set. -func NewSampleMatchFn(enableProcessMetrics *bool, includeMetricsMatchers config.IncludeMetricsMap, ffRetriever feature_flags.Retriever) IncludeSampleMatchFn { +func NewSampleMatchFn(enableProcessMetrics *bool, metricsMatchers config.MetricsMap, ffRetriever feature_flags.Retriever) MatcherFn { // configuration option always takes precedence over FF and matchers configuration if enableProcessMetrics == nil { - // if config option is not set, check if we have rules defined. those take precedence over the FF - ec := NewMatcherChain(includeMetricsMatchers) - if ec.Enabled { - mlog. - WithField(config.TracesFieldName, config.FeatureTrace). - Tracef("EnableProcessMetrics is EMPTY and rules ARE defined, process metrics will be ENABLED for matching processes") - return func(sample interface{}) bool { - return ec.Evaluate(sample) - } + matcher := matcherFromMetricsMatchers(metricsMatchers) + if matcher != nil { + return matcher } // configuration option is not defined and feature flag is present, FF determines, otherwise // all process samples will be excluded - return func(sample interface{}) bool { - _, isProcessSample := sample.(*types.ProcessSample) - _, isFlatProcessSample := sample.(*types.FlatProcessSample) - - if !isProcessSample && !isFlatProcessSample { - return true - } - - enabled, exists := ffRetriever.GetFeatureFlag(fflag.FlagFullProcess) - return exists && enabled - } + return matcherFromFeatureFlag(ffRetriever) } if excludeProcessMetrics(enableProcessMetrics) { mlog. WithField(config.TracesFieldName, config.FeatureTrace). Trace("EnableProcessMetrics is FALSE, process metrics will be DISABLED") - return func(sample interface{}) bool { - switch sample.(type) { - case *types.ProcessSample: - mlog. - WithField(config.TracesFieldName, config.FeatureTrace). - Trace("Got a sample of type '*types.ProcessSample' so excluding sample.") - // no process samples are included - return false - case *types.FlatProcessSample: - mlog. - WithField(config.TracesFieldName, config.FeatureTrace). - Trace("Got a sample of type '*types.FlatProcessSample' so excluding sample.") - // no flat process samples are included - return false - default: - mlog. - WithField(config.TracesFieldName, config.FeatureTrace). - Tracef("Got a sample of type '%s' that should not be excluded.", reflect.TypeOf(sample).String()) - // other samples are included - return true - } - } + + return matcherForDisabledMetrics() } - ec := NewMatcherChain(includeMetricsMatchers) - if ec.Enabled { + matcherChain := NewMatcherChain(metricsMatchers) + if matcherChain.Enabled { mlog. WithField(config.TracesFieldName, config.FeatureTrace). Trace("EnableProcessMetrics is TRUE and rules ARE defined, process metrics will be ENABLED for matching processes") - return func(sample interface{}) bool { - return ec.Evaluate(sample) - } + + return matcherChain.Evaluate } mlog. WithField(config.TracesFieldName, config.FeatureTrace). Trace("EnableProcessMetrics is TRUE and rules are NOT defined, ALL process metrics will be ENABLED") + return func(sample interface{}) bool { // all process samples are included return true } } +func matcherForDisabledMetrics() MatcherFn { + return func(sample interface{}) bool { + switch sample.(type) { + case *types.ProcessSample: + mlog. + WithField(config.TracesFieldName, config.FeatureTrace). + Trace("Got a sample of type '*types.ProcessSample' so excluding sample.") + // no process samples are included + return false + case *types.FlatProcessSample: + mlog. + WithField(config.TracesFieldName, config.FeatureTrace). + Trace("Got a sample of type '*types.FlatProcessSample' so excluding sample.") + // no flat process samples are included + return false + default: + mlog. + WithField(config.TracesFieldName, config.FeatureTrace). + Tracef("Got a sample of type '%s' that should not be excluded.", reflect.TypeOf(sample).String()) + // other samples are included + return true + } + } +} + +func matcherFromMetricsMatchers(metricsMatchers config.MetricsMap) MatcherFn { + // if config option is not set, check if we have rules defined. those take precedence over the FF + matcherChain := NewMatcherChain(metricsMatchers) + if matcherChain.Enabled { + mlog. + WithField(config.TracesFieldName, config.FeatureTrace). + Tracef("EnableProcessMetrics is EMPTY and rules ARE defined, process metrics will be ENABLED for matching processes") + + return matcherChain.Evaluate + } + + return nil +} + +func matcherFromFeatureFlag(ffRetriever feature_flags.Retriever) MatcherFn { + return func(sample any) bool { + _, isProcessSample := sample.(*types.ProcessSample) + _, isFlatProcessSample := sample.(*types.FlatProcessSample) + + if !isProcessSample && !isFlatProcessSample { + return true + } + + enabled, exists := ffRetriever.GetFeatureFlag(fflag.FlagFullProcess) + + return exists && enabled + } +} + func excludeProcessMetrics(enableProcessMetrics *bool) bool { if enableProcessMetrics == nil || *enableProcessMetrics { return false diff --git a/pkg/metrics/sampler/matcher_test.go b/pkg/metrics/sampler/matcher_test.go index 799dad24e..e5e1a0677 100644 --- a/pkg/metrics/sampler/matcher_test.go +++ b/pkg/metrics/sampler/matcher_test.go @@ -549,7 +549,7 @@ func Test_EvaluatorChain_LogTraceMatcher(t *testing.T) { javaProcessSample := types.ProcessSample{ProcessDisplayName: "java", CmdLine: "/bin/java"} rule := config.IncludeMetricsMap{"process.name": {"java"}} - ec := sampler.NewMatcherChain(rule) + ec := sampler.NewMatcherChain(config.MetricsMap(rule)) assert.Len(t, ec.Matchers, len(rule)) assert.EqualValues(t, true, ec.Evaluate(javaProcessSample)) @@ -741,7 +741,7 @@ func TestNewSampleMatchFn(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - matchFn := sampler.NewSampleMatchFn(tt.args.enableProcessMetrics, tt.args.includeMetricsMatchers, tt.args.ffRetriever) + matchFn := sampler.NewSampleMatchFn(tt.args.enableProcessMetrics, config.MetricsMap(tt.args.includeMetricsMatchers), tt.args.ffRetriever) assert.Equal(t, tt.include, matchFn(tt.args.sample)) }) } diff --git a/test/infra/agent.go b/test/infra/agent.go index 87bc39378..3d0d7a1cc 100644 --- a/test/infra/agent.go +++ b/test/infra/agent.go @@ -79,7 +79,7 @@ func NewAgentWithConnectClientAndConfig(connectClient *http.Client, dataClient b lookups := agent.NewIdLookup(hostname.CreateResolver(cfg.OverrideHostname, cfg.OverrideHostnameShort, cfg.DnsHostnameResolution), cloudDetector, cfg.DisplayName) - ctx := agent.NewContext(cfg, "1.2.3", testhelpers.NewFakeHostnameResolver("foobar", "foo", nil), lookups, matcher) + ctx := agent.NewContext(cfg, "1.2.3", testhelpers.NewFakeHostnameResolver("foobar", "foo", nil), lookups, matcher, matcher) fingerprintHarvester, err := fingerprint.NewHarvestor(cfg, testhelpers.NullHostnameResolver, cloudDetector) if err != nil {