From 8bb0b00446bfbe41e87cd8246ccc77be21527522 Mon Sep 17 00:00:00 2001 From: Ken Schneider <103530259+ken-schneider@users.noreply.github.com> Date: Tue, 6 Aug 2024 17:09:33 -0400 Subject: [PATCH] [NETPATH-175] add maxContexts to pathteststore (#28142) Co-authored-by: Ursula Chen <58821586+urseberry@users.noreply.github.com> --- .../npcollector/npcollectorimpl/config.go | 2 + .../npcollectorimpl/npcollector.go | 5 +- .../npcollectorimpl/npcollector_test.go | 13 +-- .../pathteststore/pathteststore.go | 26 ++++-- .../pathteststore/pathteststore_test.go | 90 ++++++++++++++++++- pkg/config/setup/config.go | 1 + ...-add-max-path-config-91d70b05b0db086a.yaml | 13 +++ 7 files changed, 135 insertions(+), 15 deletions(-) create mode 100644 releasenotes/notes/network-path-add-max-path-config-91d70b05b0db086a.yaml diff --git a/comp/networkpath/npcollector/npcollectorimpl/config.go b/comp/networkpath/npcollector/npcollectorimpl/config.go index d18c31970da85..c752153db7747 100644 --- a/comp/networkpath/npcollector/npcollectorimpl/config.go +++ b/comp/networkpath/npcollector/npcollectorimpl/config.go @@ -16,6 +16,7 @@ type collectorConfigs struct { workers int pathtestInputChanSize int pathtestProcessingChanSize int + pathtestContextsLimit int pathtestTTL time.Duration pathtestInterval time.Duration flushInterval time.Duration @@ -28,6 +29,7 @@ func newConfig(agentConfig config.Component) *collectorConfigs { workers: agentConfig.GetInt("network_path.collector.workers"), pathtestInputChanSize: agentConfig.GetInt("network_path.collector.input_chan_size"), pathtestProcessingChanSize: agentConfig.GetInt("network_path.collector.processing_chan_size"), + pathtestContextsLimit: agentConfig.GetInt("network_path.collector.pathtest_contexts_limit"), pathtestTTL: agentConfig.GetDuration("network_path.collector.pathtest_ttl"), pathtestInterval: agentConfig.GetDuration("network_path.collector.pathtest_interval"), flushInterval: agentConfig.GetDuration("network_path.collector.flush_interval"), diff --git a/comp/networkpath/npcollector/npcollectorimpl/npcollector.go b/comp/networkpath/npcollector/npcollectorimpl/npcollector.go index e7cb212833032..7f90b19a70c80 100644 --- a/comp/networkpath/npcollector/npcollectorimpl/npcollector.go +++ b/comp/networkpath/npcollector/npcollectorimpl/npcollector.go @@ -75,10 +75,11 @@ func newNoopNpCollectorImpl() *npCollectorImpl { } func newNpCollectorImpl(epForwarder eventplatform.Forwarder, collectorConfigs *collectorConfigs, logger log.Component, telemetrycomp telemetryComp.Component) *npCollectorImpl { - logger.Infof("New NpCollector (workers=%d input_chan_size=%d processing_chan_size=%d pathtest_ttl=%s pathtest_interval=%s flush_interval=%s)", + logger.Infof("New NpCollector (workers=%d input_chan_size=%d processing_chan_size=%d pathtest_contexts_limit=%d pathtest_ttl=%s pathtest_interval=%s flush_interval=%s)", collectorConfigs.workers, collectorConfigs.pathtestInputChanSize, collectorConfigs.pathtestProcessingChanSize, + collectorConfigs.pathtestContextsLimit, collectorConfigs.pathtestTTL, collectorConfigs.pathtestInterval, collectorConfigs.flushInterval) @@ -88,7 +89,7 @@ func newNpCollectorImpl(epForwarder eventplatform.Forwarder, collectorConfigs *c collectorConfigs: collectorConfigs, logger: logger, - pathtestStore: pathteststore.NewPathtestStore(collectorConfigs.pathtestTTL, collectorConfigs.pathtestInterval, logger), + pathtestStore: pathteststore.NewPathtestStore(collectorConfigs.pathtestTTL, collectorConfigs.pathtestInterval, collectorConfigs.pathtestContextsLimit, logger), pathtestInputChan: make(chan *common.Pathtest, collectorConfigs.pathtestInputChanSize), pathtestProcessingChan: make(chan *pathteststore.PathtestContext, collectorConfigs.pathtestProcessingChanSize), flushInterval: collectorConfigs.flushInterval, diff --git a/comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go b/comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go index 60b6001544ad3..c8c33675b4e34 100644 --- a/comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go +++ b/comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go @@ -293,16 +293,18 @@ func Test_newNpCollectorImpl_defaultConfigs(t *testing.T) { assert.Equal(t, 4, npCollector.workers) assert.Equal(t, 1000, cap(npCollector.pathtestInputChan)) assert.Equal(t, 1000, cap(npCollector.pathtestProcessingChan)) + assert.Equal(t, 10000, npCollector.collectorConfigs.pathtestContextsLimit) assert.Equal(t, "default", npCollector.networkDevicesNamespace) } func Test_newNpCollectorImpl_overrideConfigs(t *testing.T) { agentConfigs := map[string]any{ - "network_path.connections_monitoring.enabled": true, - "network_path.collector.workers": 2, - "network_path.collector.input_chan_size": 300, - "network_path.collector.processing_chan_size": 400, - "network_devices.namespace": "ns1", + "network_path.connections_monitoring.enabled": true, + "network_path.collector.workers": 2, + "network_path.collector.input_chan_size": 300, + "network_path.collector.processing_chan_size": 400, + "network_path.collector.pathtest_contexts_limit": 500, + "network_devices.namespace": "ns1", } _, npCollector := newTestNpCollector(t, agentConfigs) @@ -311,6 +313,7 @@ func Test_newNpCollectorImpl_overrideConfigs(t *testing.T) { assert.Equal(t, 2, npCollector.workers) assert.Equal(t, 300, cap(npCollector.pathtestInputChan)) assert.Equal(t, 400, cap(npCollector.pathtestProcessingChan)) + assert.Equal(t, 500, npCollector.collectorConfigs.pathtestContextsLimit) assert.Equal(t, "ns1", npCollector.networkDevicesNamespace) } diff --git a/comp/networkpath/npcollector/npcollectorimpl/pathteststore/pathteststore.go b/comp/networkpath/npcollector/npcollectorimpl/pathteststore/pathteststore.go index 9a7fb594cbcba..4c662c8700dc3 100644 --- a/comp/networkpath/npcollector/npcollectorimpl/pathteststore/pathteststore.go +++ b/comp/networkpath/npcollector/npcollectorimpl/pathteststore/pathteststore.go @@ -46,12 +46,18 @@ type Store struct { // are called by different routines. contextsMutex sync.Mutex + // contextsLimit is the maximum number of contexts to keep in the store + contextsLimit int + // interval defines how frequently pathtests should run interval time.Duration // ttl is the duration a Pathtest should run from discovery. // If a Pathtest is added again before the TTL expires, the TTL is reset to this duration. ttl time.Duration + + // lastContextWarning is the last time a warning was logged about the store being full + lastContextWarning time.Time } func newPathtestContext(pt *common.Pathtest, runUntilDuration time.Duration) *PathtestContext { @@ -64,12 +70,13 @@ func newPathtestContext(pt *common.Pathtest, runUntilDuration time.Duration) *Pa } // NewPathtestStore creates a new Store -func NewPathtestStore(pathtestTTL time.Duration, pathtestInterval time.Duration, logger log.Component) *Store { +func NewPathtestStore(pathtestTTL time.Duration, pathtestInterval time.Duration, contextsLimit int, logger log.Component) *Store { return &Store{ - contexts: make(map[uint64]*PathtestContext), - ttl: pathtestTTL, - interval: pathtestInterval, - logger: logger, + contexts: make(map[uint64]*PathtestContext), + ttl: pathtestTTL, + interval: pathtestInterval, + contextsLimit: contextsLimit, + logger: logger, } } @@ -119,6 +126,15 @@ func (f *Store) Add(pathtestToAdd *common.Pathtest) { f.contextsMutex.Lock() defer f.contextsMutex.Unlock() + if len(f.contexts) >= f.contextsLimit { + // only log if it has been 1 minute since the last warning + if time.Since(f.lastContextWarning) >= time.Minute { + f.logger.Warnf("Pathteststore is full, maximum set to: %d, dropping pathtest: %+v", f.contextsLimit, pathtestToAdd) + f.lastContextWarning = time.Now() + } + return + } + hash := pathtestToAdd.GetHash() pathtestCtx, ok := f.contexts[hash] if !ok { diff --git a/comp/networkpath/npcollector/npcollectorimpl/pathteststore/pathteststore_test.go b/comp/networkpath/npcollector/npcollectorimpl/pathteststore/pathteststore_test.go index 4fd758f2253d6..a758c0664fa14 100644 --- a/comp/networkpath/npcollector/npcollectorimpl/pathteststore/pathteststore_test.go +++ b/comp/networkpath/npcollector/npcollectorimpl/pathteststore/pathteststore_test.go @@ -6,13 +6,18 @@ package pathteststore import ( + "bufio" + "bytes" + "strings" "testing" "time" + "github.com/cihub/seelog" "github.com/stretchr/testify/assert" logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" "github.com/DataDog/datadog-agent/comp/networkpath/npcollector/npcollectorimpl/common" + utillog "github.com/DataDog/datadog-agent/pkg/util/log" ) // MockTimeNow mocks time.Now @@ -30,10 +35,89 @@ func setMockTimeNow(newTime time.Time) { } func Test_pathtestStore_add(t *testing.T) { + testcases := []struct { + name string + initialSize int + pathtests []*common.Pathtest + expectedSize int + expectedLogCount int + overrideLogTime bool + }{ + { + name: "Store not full", + initialSize: 3, + pathtests: []*common.Pathtest{ + {Hostname: "host1", Port: 53}, + {Hostname: "host2", Port: 53}, + {Hostname: "host3", Port: 53}, + }, + expectedSize: 3, + expectedLogCount: 0, + overrideLogTime: false, + }, + { + name: "Store full, one warning", + initialSize: 2, + pathtests: []*common.Pathtest{ + {Hostname: "host1", Port: 53}, + {Hostname: "host2", Port: 53}, + {Hostname: "host3", Port: 53}, + {Hostname: "host4", Port: 53}, + }, + expectedSize: 2, + expectedLogCount: 1, + overrideLogTime: false, + }, + { + name: "Store full, multiple warnings", + initialSize: 2, + pathtests: []*common.Pathtest{ + {Hostname: "host1", Port: 53}, + {Hostname: "host2", Port: 53}, + {Hostname: "host3", Port: 53}, + {Hostname: "host4", Port: 53}, + }, + expectedSize: 2, + expectedLogCount: 2, + overrideLogTime: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + var b bytes.Buffer + w := bufio.NewWriter(&b) + l, err := seelog.LoggerFromWriterWithMinLevelAndFormat(w, seelog.DebugLvl, "[%LEVEL] %FuncShort: %Msg") + assert.Nil(t, err) + utillog.SetupLogger(l, "debug") + + store := NewPathtestStore(10*time.Minute, 1*time.Minute, tc.initialSize, l) + + for _, pt := range tc.pathtests { + store.Add(pt) + if tc.overrideLogTime { + store.contextsMutex.Lock() + store.lastContextWarning = MockTimeNow().Add(5 * time.Minute) + store.contextsMutex.Unlock() + } + } + + // TEST START/STOP using logs + l.Close() // We need to first close the logger to avoid a race-cond between seelog and out test when calling w.Flush() + w.Flush() + logs := b.String() + + assert.Equal(t, tc.expectedSize, len(store.contexts)) + assert.Equal(t, tc.expectedLogCount, strings.Count(logs, "Pathteststore is full"), logs) + }) + } +} + +func Test_pathtestStore_add_when_full(t *testing.T) { logger := logmock.New(t) // GIVEN - store := NewPathtestStore(10*time.Minute, 1*time.Minute, logger) + store := NewPathtestStore(10*time.Minute, 1*time.Minute, 2, logger) // WHEN pt1 := &common.Pathtest{Hostname: "host1", Port: 53} @@ -44,7 +128,7 @@ func Test_pathtestStore_add(t *testing.T) { store.Add(pt3) // THEN - assert.Equal(t, 3, len(store.contexts)) + assert.Equal(t, 2, len(store.contexts)) pt1Ctx := store.contexts[pt1.GetHash()] pt2Ctx := store.contexts[pt2.GetHash()] @@ -59,7 +143,7 @@ func Test_pathtestStore_flush(t *testing.T) { runInterval := 1 * time.Minute // GIVEN - store := NewPathtestStore(runDurationFromDisc, runInterval, logger) + store := NewPathtestStore(runDurationFromDisc, runInterval, 10, logger) // WHEN pt := &common.Pathtest{Hostname: "host1", Port: 53} diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 90c4865c76e61..311fb6412d13a 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -422,6 +422,7 @@ func InitConfig(config pkgconfigmodel.Config) { config.BindEnvAndSetDefault("network_path.collector.workers", 4) config.BindEnvAndSetDefault("network_path.collector.input_chan_size", 1000) config.BindEnvAndSetDefault("network_path.collector.processing_chan_size", 1000) + config.BindEnvAndSetDefault("network_path.collector.pathtest_contexts_limit", 10000) config.BindEnvAndSetDefault("network_path.collector.pathtest_ttl", "15m") config.BindEnvAndSetDefault("network_path.collector.pathtest_interval", "5m") config.BindEnvAndSetDefault("network_path.collector.flush_interval", "10s") diff --git a/releasenotes/notes/network-path-add-max-path-config-91d70b05b0db086a.yaml b/releasenotes/notes/network-path-add-max-path-config-91d70b05b0db086a.yaml new file mode 100644 index 0000000000000..6bc310f634635 --- /dev/null +++ b/releasenotes/notes/network-path-add-max-path-config-91d70b05b0db086a.yaml @@ -0,0 +1,13 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +enhancements: + - | + Adds a default upper limit of 10000 to the number of network traffic + paths that are captured at a single time. The user can increase or + decrease this limit as needed.