Skip to content

Commit

Permalink
[NETPATH-175] add maxContexts to pathteststore (#28142)
Browse files Browse the repository at this point in the history
Co-authored-by: Ursula Chen <58821586+urseberry@users.noreply.github.com>
  • Loading branch information
ken-schneider and urseberry authored Aug 6, 2024
1 parent 838c8e5 commit 8bb0b00
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 15 deletions.
2 changes: 2 additions & 0 deletions comp/networkpath/npcollector/npcollectorimpl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type collectorConfigs struct {
workers int
pathtestInputChanSize int
pathtestProcessingChanSize int
pathtestContextsLimit int
pathtestTTL time.Duration
pathtestInterval time.Duration
flushInterval time.Duration
Expand All @@ -28,6 +29,7 @@ func newConfig(agentConfig config.Component) *collectorConfigs {
workers: agentConfig.GetInt("network_path.collector.workers"),
pathtestInputChanSize: agentConfig.GetInt("network_path.collector.input_chan_size"),
pathtestProcessingChanSize: agentConfig.GetInt("network_path.collector.processing_chan_size"),
pathtestContextsLimit: agentConfig.GetInt("network_path.collector.pathtest_contexts_limit"),
pathtestTTL: agentConfig.GetDuration("network_path.collector.pathtest_ttl"),
pathtestInterval: agentConfig.GetDuration("network_path.collector.pathtest_interval"),
flushInterval: agentConfig.GetDuration("network_path.collector.flush_interval"),
Expand Down
5 changes: 3 additions & 2 deletions comp/networkpath/npcollector/npcollectorimpl/npcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ func newNoopNpCollectorImpl() *npCollectorImpl {
}

func newNpCollectorImpl(epForwarder eventplatform.Forwarder, collectorConfigs *collectorConfigs, logger log.Component, telemetrycomp telemetryComp.Component) *npCollectorImpl {
logger.Infof("New NpCollector (workers=%d input_chan_size=%d processing_chan_size=%d pathtest_ttl=%s pathtest_interval=%s flush_interval=%s)",
logger.Infof("New NpCollector (workers=%d input_chan_size=%d processing_chan_size=%d pathtest_contexts_limit=%d pathtest_ttl=%s pathtest_interval=%s flush_interval=%s)",
collectorConfigs.workers,
collectorConfigs.pathtestInputChanSize,
collectorConfigs.pathtestProcessingChanSize,
collectorConfigs.pathtestContextsLimit,
collectorConfigs.pathtestTTL,
collectorConfigs.pathtestInterval,
collectorConfigs.flushInterval)
Expand All @@ -88,7 +89,7 @@ func newNpCollectorImpl(epForwarder eventplatform.Forwarder, collectorConfigs *c
collectorConfigs: collectorConfigs,
logger: logger,

pathtestStore: pathteststore.NewPathtestStore(collectorConfigs.pathtestTTL, collectorConfigs.pathtestInterval, logger),
pathtestStore: pathteststore.NewPathtestStore(collectorConfigs.pathtestTTL, collectorConfigs.pathtestInterval, collectorConfigs.pathtestContextsLimit, logger),
pathtestInputChan: make(chan *common.Pathtest, collectorConfigs.pathtestInputChanSize),
pathtestProcessingChan: make(chan *pathteststore.PathtestContext, collectorConfigs.pathtestProcessingChanSize),
flushInterval: collectorConfigs.flushInterval,
Expand Down
13 changes: 8 additions & 5 deletions comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,16 +293,18 @@ func Test_newNpCollectorImpl_defaultConfigs(t *testing.T) {
assert.Equal(t, 4, npCollector.workers)
assert.Equal(t, 1000, cap(npCollector.pathtestInputChan))
assert.Equal(t, 1000, cap(npCollector.pathtestProcessingChan))
assert.Equal(t, 10000, npCollector.collectorConfigs.pathtestContextsLimit)
assert.Equal(t, "default", npCollector.networkDevicesNamespace)
}

func Test_newNpCollectorImpl_overrideConfigs(t *testing.T) {
agentConfigs := map[string]any{
"network_path.connections_monitoring.enabled": true,
"network_path.collector.workers": 2,
"network_path.collector.input_chan_size": 300,
"network_path.collector.processing_chan_size": 400,
"network_devices.namespace": "ns1",
"network_path.connections_monitoring.enabled": true,
"network_path.collector.workers": 2,
"network_path.collector.input_chan_size": 300,
"network_path.collector.processing_chan_size": 400,
"network_path.collector.pathtest_contexts_limit": 500,
"network_devices.namespace": "ns1",
}

_, npCollector := newTestNpCollector(t, agentConfigs)
Expand All @@ -311,6 +313,7 @@ func Test_newNpCollectorImpl_overrideConfigs(t *testing.T) {
assert.Equal(t, 2, npCollector.workers)
assert.Equal(t, 300, cap(npCollector.pathtestInputChan))
assert.Equal(t, 400, cap(npCollector.pathtestProcessingChan))
assert.Equal(t, 500, npCollector.collectorConfigs.pathtestContextsLimit)
assert.Equal(t, "ns1", npCollector.networkDevicesNamespace)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ type Store struct {
// are called by different routines.
contextsMutex sync.Mutex

// contextsLimit is the maximum number of contexts to keep in the store
contextsLimit int

// interval defines how frequently pathtests should run
interval time.Duration

// ttl is the duration a Pathtest should run from discovery.
// If a Pathtest is added again before the TTL expires, the TTL is reset to this duration.
ttl time.Duration

// lastContextWarning is the last time a warning was logged about the store being full
lastContextWarning time.Time
}

func newPathtestContext(pt *common.Pathtest, runUntilDuration time.Duration) *PathtestContext {
Expand All @@ -64,12 +70,13 @@ func newPathtestContext(pt *common.Pathtest, runUntilDuration time.Duration) *Pa
}

// NewPathtestStore creates a new Store
func NewPathtestStore(pathtestTTL time.Duration, pathtestInterval time.Duration, logger log.Component) *Store {
func NewPathtestStore(pathtestTTL time.Duration, pathtestInterval time.Duration, contextsLimit int, logger log.Component) *Store {
return &Store{
contexts: make(map[uint64]*PathtestContext),
ttl: pathtestTTL,
interval: pathtestInterval,
logger: logger,
contexts: make(map[uint64]*PathtestContext),
ttl: pathtestTTL,
interval: pathtestInterval,
contextsLimit: contextsLimit,
logger: logger,
}
}

Expand Down Expand Up @@ -119,6 +126,15 @@ func (f *Store) Add(pathtestToAdd *common.Pathtest) {
f.contextsMutex.Lock()
defer f.contextsMutex.Unlock()

if len(f.contexts) >= f.contextsLimit {
// only log if it has been 1 minute since the last warning
if time.Since(f.lastContextWarning) >= time.Minute {
f.logger.Warnf("Pathteststore is full, maximum set to: %d, dropping pathtest: %+v", f.contextsLimit, pathtestToAdd)
f.lastContextWarning = time.Now()
}
return
}

hash := pathtestToAdd.GetHash()
pathtestCtx, ok := f.contexts[hash]
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@
package pathteststore

import (
"bufio"
"bytes"
"strings"
"testing"
"time"

"github.com/cihub/seelog"
"github.com/stretchr/testify/assert"

logmock "github.com/DataDog/datadog-agent/comp/core/log/mock"
"github.com/DataDog/datadog-agent/comp/networkpath/npcollector/npcollectorimpl/common"
utillog "github.com/DataDog/datadog-agent/pkg/util/log"
)

// MockTimeNow mocks time.Now
Expand All @@ -30,10 +35,89 @@ func setMockTimeNow(newTime time.Time) {
}

func Test_pathtestStore_add(t *testing.T) {
testcases := []struct {
name string
initialSize int
pathtests []*common.Pathtest
expectedSize int
expectedLogCount int
overrideLogTime bool
}{
{
name: "Store not full",
initialSize: 3,
pathtests: []*common.Pathtest{
{Hostname: "host1", Port: 53},
{Hostname: "host2", Port: 53},
{Hostname: "host3", Port: 53},
},
expectedSize: 3,
expectedLogCount: 0,
overrideLogTime: false,
},
{
name: "Store full, one warning",
initialSize: 2,
pathtests: []*common.Pathtest{
{Hostname: "host1", Port: 53},
{Hostname: "host2", Port: 53},
{Hostname: "host3", Port: 53},
{Hostname: "host4", Port: 53},
},
expectedSize: 2,
expectedLogCount: 1,
overrideLogTime: false,
},
{
name: "Store full, multiple warnings",
initialSize: 2,
pathtests: []*common.Pathtest{
{Hostname: "host1", Port: 53},
{Hostname: "host2", Port: 53},
{Hostname: "host3", Port: 53},
{Hostname: "host4", Port: 53},
},
expectedSize: 2,
expectedLogCount: 2,
overrideLogTime: true,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
var b bytes.Buffer
w := bufio.NewWriter(&b)
l, err := seelog.LoggerFromWriterWithMinLevelAndFormat(w, seelog.DebugLvl, "[%LEVEL] %FuncShort: %Msg")
assert.Nil(t, err)
utillog.SetupLogger(l, "debug")

store := NewPathtestStore(10*time.Minute, 1*time.Minute, tc.initialSize, l)

for _, pt := range tc.pathtests {
store.Add(pt)
if tc.overrideLogTime {
store.contextsMutex.Lock()
store.lastContextWarning = MockTimeNow().Add(5 * time.Minute)
store.contextsMutex.Unlock()
}
}

// TEST START/STOP using logs
l.Close() // We need to first close the logger to avoid a race-cond between seelog and out test when calling w.Flush()
w.Flush()
logs := b.String()

assert.Equal(t, tc.expectedSize, len(store.contexts))
assert.Equal(t, tc.expectedLogCount, strings.Count(logs, "Pathteststore is full"), logs)
})
}
}

func Test_pathtestStore_add_when_full(t *testing.T) {
logger := logmock.New(t)

// GIVEN
store := NewPathtestStore(10*time.Minute, 1*time.Minute, logger)
store := NewPathtestStore(10*time.Minute, 1*time.Minute, 2, logger)

// WHEN
pt1 := &common.Pathtest{Hostname: "host1", Port: 53}
Expand All @@ -44,7 +128,7 @@ func Test_pathtestStore_add(t *testing.T) {
store.Add(pt3)

// THEN
assert.Equal(t, 3, len(store.contexts))
assert.Equal(t, 2, len(store.contexts))

pt1Ctx := store.contexts[pt1.GetHash()]
pt2Ctx := store.contexts[pt2.GetHash()]
Expand All @@ -59,7 +143,7 @@ func Test_pathtestStore_flush(t *testing.T) {
runInterval := 1 * time.Minute

// GIVEN
store := NewPathtestStore(runDurationFromDisc, runInterval, logger)
store := NewPathtestStore(runDurationFromDisc, runInterval, 10, logger)

// WHEN
pt := &common.Pathtest{Hostname: "host1", Port: 53}
Expand Down
1 change: 1 addition & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func InitConfig(config pkgconfigmodel.Config) {
config.BindEnvAndSetDefault("network_path.collector.workers", 4)
config.BindEnvAndSetDefault("network_path.collector.input_chan_size", 1000)
config.BindEnvAndSetDefault("network_path.collector.processing_chan_size", 1000)
config.BindEnvAndSetDefault("network_path.collector.pathtest_contexts_limit", 10000)
config.BindEnvAndSetDefault("network_path.collector.pathtest_ttl", "15m")
config.BindEnvAndSetDefault("network_path.collector.pathtest_interval", "5m")
config.BindEnvAndSetDefault("network_path.collector.flush_interval", "10s")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
enhancements:
- |
Adds a default upper limit of 10000 to the number of network traffic
paths that are captured at a single time. The user can increase or
decrease this limit as needed.

0 comments on commit 8bb0b00

Please sign in to comment.