Skip to content

Commit

Permalink
Add rdnsquerier config, rate limiter, resolver and internal telemetry. (
Browse files Browse the repository at this point in the history
  • Loading branch information
jmw51798 authored Jul 17, 2024
1 parent 3480368 commit 6ebdfec
Show file tree
Hide file tree
Showing 13 changed files with 864 additions and 59 deletions.
11 changes: 9 additions & 2 deletions comp/netflow/flowaggregator/flowaccumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (f *flowAccumulator) add(flowToAdd *common.Flow) {
}

func (f *flowAccumulator) addRDNSEnrichment(aggHash uint64, srcAddr []byte, dstAddr []byte) {
f.rdnsQuerier.GetHostnameAsync(
err := f.rdnsQuerier.GetHostnameAsync(
srcAddr,
func(hostname string) {
f.flowsMutex.Lock()
Expand All @@ -174,7 +174,11 @@ func (f *flowAccumulator) addRDNSEnrichment(aggHash uint64, srcAddr []byte, dstA
}
},
)
f.rdnsQuerier.GetHostnameAsync(
if err != nil {
f.logger.Debugf("Error requesting reverse DNS enrichment for source IP address: %v error: %v", srcAddr, err)
}

err = f.rdnsQuerier.GetHostnameAsync(
dstAddr,
func(hostname string) {
f.flowsMutex.Lock()
Expand All @@ -186,6 +190,9 @@ func (f *flowAccumulator) addRDNSEnrichment(aggHash uint64, srcAddr []byte, dstA
}
},
)
if err != nil {
f.logger.Debugf("Error requesting reverse DNS enrichment for destination IP address: %v error: %v", dstAddr, err)
}
}

func (f *flowAccumulator) getFlowContextCount() int {
Expand Down
4 changes: 2 additions & 2 deletions comp/netflow/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func newServer(lc fx.Lifecycle, deps dependencies) (provides, error) {
// it with a noop implementation.
rdnsQuerier := deps.RDNSQuerier
if conf.ReverseDNSEnrichmentEnabled {
deps.Logger.Debugf("NetFlow Reverse DNS Enrichment enabled")
deps.Logger.Infof("Reverse DNS Enrichment is enabled for NDM NetFlow")
} else {
rdnsQuerier = rdnsquerierimplnone.NewNone().Comp
deps.Logger.Debugf("NetFlow Reverse DNS Enrichment disabled")
deps.Logger.Infof("Reverse DNS Enrichment is disabled for NDM NetFlow")
}

flowAgg := flowaggregator.NewFlowAggregator(sender, deps.Forwarder, conf, deps.Hostname.GetSafe(context.Background()), deps.Logger, rdnsQuerier)
Expand Down
2 changes: 1 addition & 1 deletion comp/rdnsquerier/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ package rdnsquerier

// Component is the component type.
type Component interface {
GetHostnameAsync([]byte, func(string))
GetHostnameAsync([]byte, func(string)) error
}
3 changes: 2 additions & 1 deletion comp/rdnsquerier/impl-none/none.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func NewNone() Provides {
}

// GetHostnameAsync does nothing for the noop rdnsquerier implementation
func (q *rdnsQuerierImplNone) GetHostnameAsync(_ []byte, _ func(string)) {
func (q *rdnsQuerierImplNone) GetHostnameAsync(_ []byte, _ func(string)) error {
// noop
return nil
}
91 changes: 91 additions & 0 deletions comp/rdnsquerier/impl/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

package rdnsquerierimpl

import (
"time"

"github.com/DataDog/datadog-agent/comp/core/config"
)

type rdnsQuerierConfig struct {
enabled bool
workers int
chanSize int

rateLimiterEnabled bool
rateLimitPerSec int

cacheEnabled bool
cacheEntryTTL time.Duration
cacheCleanInterval time.Duration
cachePersistInterval time.Duration
}

const (
defaultWorkers = 10
defaultChanSize = 1000

defaultRateLimitPerSec = 1000

defaultCacheEntryTTL = time.Hour
defaultCacheCleanInterval = 30 * time.Minute
defaultCachePersistInterval = 30 * time.Minute
)

func newConfig(agentConfig config.Component) *rdnsQuerierConfig {
netflowRDNSEnrichmentEnabled := agentConfig.GetBool("network_devices.netflow.reverse_dns_enrichment_enabled")

c := &rdnsQuerierConfig{
enabled: netflowRDNSEnrichmentEnabled,
workers: agentConfig.GetInt("reverse_dns_enrichment.workers"),
chanSize: agentConfig.GetInt("reverse_dns_enrichment.chan_size"),

rateLimiterEnabled: agentConfig.GetBool("reverse_dns_enrichment.rate_limiter.enabled"),
rateLimitPerSec: agentConfig.GetInt("reverse_dns_enrichment.rate_limiter.limit_per_sec"),

cacheEnabled: agentConfig.GetBool("reverse_dns_enrichment.cache.enabled"),
cacheEntryTTL: agentConfig.GetDuration("reverse_dns_enrichment.cache.entry_ttl"),
cacheCleanInterval: agentConfig.GetDuration("reverse_dns_enrichment.cache.clean_interval"),
cachePersistInterval: agentConfig.GetDuration("reverse_dns_enrichment.cache.persist_interval"),
}

c.setDefaults()

return c
}

func (c *rdnsQuerierConfig) setDefaults() {
if !c.enabled {
return
}

if c.workers <= 0 {
c.workers = defaultWorkers
}

if c.chanSize <= 0 {
c.chanSize = defaultChanSize
}

if c.rateLimiterEnabled {
if c.rateLimitPerSec <= 0 {
c.rateLimitPerSec = defaultRateLimitPerSec
}
}

if c.cacheEnabled {
if c.cacheEntryTTL <= 0 {
c.cacheEntryTTL = defaultCacheEntryTTL
}
if c.cacheCleanInterval <= 0 {
c.cacheCleanInterval = defaultCacheCleanInterval
}
if c.cachePersistInterval <= 0 {
c.cachePersistInterval = defaultCachePersistInterval
}
}
}
177 changes: 177 additions & 0 deletions comp/rdnsquerier/impl/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

package rdnsquerierimpl

import (
"testing"
"time"

pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"

"github.com/stretchr/testify/assert"
)

func TestConfig(t *testing.T) {
var tests = []struct {
name string
configYaml string
expectedConfig rdnsQuerierConfig
}{
{
name: "disabled by default",
configYaml: ``,
expectedConfig: rdnsQuerierConfig{
enabled: false,
workers: 0,
chanSize: 0,
rateLimiterEnabled: true,
rateLimitPerSec: 0,
cacheEnabled: true,
cacheEntryTTL: 0,
cacheCleanInterval: 0,
cachePersistInterval: 0,
},
},
{
name: "default config when enabled",
configYaml: `
network_devices:
netflow:
reverse_dns_enrichment_enabled: true
`,
expectedConfig: rdnsQuerierConfig{
enabled: true,
workers: defaultWorkers,
chanSize: defaultChanSize,
rateLimiterEnabled: true,
rateLimitPerSec: defaultRateLimitPerSec,
cacheEnabled: true,
cacheEntryTTL: defaultCacheEntryTTL,
cacheCleanInterval: defaultCacheCleanInterval,
cachePersistInterval: defaultCachePersistInterval,
},
},
{
name: "use defaults for invalid values",
configYaml: `
network_devices:
netflow:
reverse_dns_enrichment_enabled: true
reverse_dns_enrichment:
workers: 0
chan_size: 0
rate_limiter:
enabled: true
limit_per_sec: 0
cache:
enabled: true
entry_ttl: 0
clean_interval: 0
persist_interval: 0
`,
expectedConfig: rdnsQuerierConfig{
enabled: true,
workers: defaultWorkers,
chanSize: defaultChanSize,
rateLimiterEnabled: true,
rateLimitPerSec: defaultRateLimitPerSec,
cacheEnabled: true,
cacheEntryTTL: defaultCacheEntryTTL,
cacheCleanInterval: defaultCacheCleanInterval,
cachePersistInterval: defaultCachePersistInterval,
},
},
{
name: "specific config",
configYaml: `
network_devices:
netflow:
reverse_dns_enrichment_enabled: true
reverse_dns_enrichment:
workers: 25
chan_size: 999
rate_limiter:
enabled: true
limit_per_sec: 111
cache:
enabled: true
entry_ttl: 24h
clean_interval: 30m
persist_interval: 2h
`,
expectedConfig: rdnsQuerierConfig{
enabled: true,
workers: 25,
chanSize: 999,
rateLimiterEnabled: true,
rateLimitPerSec: 111,
cacheEnabled: true,
cacheEntryTTL: 24 * time.Hour,
cacheCleanInterval: 30 * time.Minute,
cachePersistInterval: 2 * time.Hour,
},
},
{
name: "specific config with defaults when rate limiter and cache are enabled",
configYaml: `
network_devices:
netflow:
reverse_dns_enrichment_enabled: true
reverse_dns_enrichment:
workers: 25
chan_size: 999
rate_limiter:
enabled: true
cache:
enabled: true
`,
expectedConfig: rdnsQuerierConfig{
enabled: true,
workers: 25,
chanSize: 999,
rateLimiterEnabled: true,
rateLimitPerSec: defaultRateLimitPerSec,
cacheEnabled: true,
cacheEntryTTL: defaultCacheEntryTTL,
cacheCleanInterval: defaultCacheCleanInterval,
cachePersistInterval: defaultCachePersistInterval,
},
},
{
name: "specific config with rate limiter and cache disabled",
configYaml: `
network_devices:
netflow:
reverse_dns_enrichment_enabled: true
reverse_dns_enrichment:
workers: 25
chan_size: 999
rate_limiter:
enabled: false
cache:
enabled: false
`,
expectedConfig: rdnsQuerierConfig{
enabled: true,
workers: 25,
chanSize: 999,
rateLimiterEnabled: false,
rateLimitPerSec: 0,
cacheEnabled: false,
cacheEntryTTL: 0,
cacheCleanInterval: 0,
cachePersistInterval: 0,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockConfig := pkgconfigsetup.ConfFromYAML(tt.configYaml)
testConfig := newConfig(mockConfig)
assert.Equal(t, tt.expectedConfig, *testConfig)
})
}
}
43 changes: 43 additions & 0 deletions comp/rdnsquerier/impl/ratelimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

package rdnsquerierimpl

import (
"context"

"golang.org/x/time/rate"
)

const rateLimiterBurst = 1 // burst of 1 is sufficient since wait() requests permission to perform a single operation

type rateLimiter interface {
wait(context.Context) error
}

func newRateLimiter(config *rdnsQuerierConfig) rateLimiter {
if !config.rateLimiterEnabled {
return &rateLimiterNone{}
}
return &rateLimiterImpl{
limiter: rate.NewLimiter(rate.Limit(config.rateLimitPerSec), rateLimiterBurst),
}
}

// Rate limiter implementation for when rdnsquerier rate limiting is enabled
type rateLimiterImpl struct {
limiter *rate.Limiter
}

func (r *rateLimiterImpl) wait(ctx context.Context) error {
return r.limiter.Wait(ctx)
}

// No limit rate limiter for when rdnsquerier rate limiting is disabled
type rateLimiterNone struct{}

func (r *rateLimiterNone) wait(_ context.Context) error {
return nil
}
Loading

0 comments on commit 6ebdfec

Please sign in to comment.