Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[usm] reliability, http, use DDSketch pool in StatKeeper. #31983

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/network/protocols/http/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats {
}
}

// ReleaseStats release objects back to pool.
func (p *protocol) ReleaseStats() {
p.statkeeper.ReleaseSketches()
}

// IsBuildModeSupported returns always true, as http module is supported by all modes.
func (*protocol) IsBuildModeSupported(buildmode.Type) bool {
return true
Expand Down
27 changes: 27 additions & 0 deletions pkg/network/protocols/http/statkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/util/log"
ddsync "github.com/DataDog/datadog-agent/pkg/util/sync"
"github.com/DataDog/sketches-go/ddsketch"
)

// StatKeeper is responsible for aggregating HTTP stats.
Expand All @@ -34,6 +36,9 @@ type StatKeeper struct {
buffer []byte

oversizedLogLimit *log.Limit

// pool of 'DDSketch' objects
sketches *ddsync.TypedPool[ddsketch.DDSketch]
}

// NewStatkeeper returns a new StatKeeper.
Expand All @@ -59,6 +64,7 @@ func NewStatkeeper(c *config.Config, telemetry *Telemetry, incompleteBuffer Inco
buffer: make([]byte, getPathBufferSize(c)),
telemetry: telemetry,
oversizedLogLimit: log.NewLogLimit(10, time.Minute*10),
sketches: newSketchPool(),
}
}

Expand Down Expand Up @@ -107,6 +113,7 @@ func (h *StatKeeper) GetAndResetAllStats() (stats map[Key]*RequestStats) {
// Close closes the stat keeper.
func (h *StatKeeper) Close() {
h.oversizedLogLimit.Close()
h.ReleaseSketches()
}

func (h *StatKeeper) add(tx Transaction) {
Expand Down Expand Up @@ -157,6 +164,7 @@ func (h *StatKeeper) add(tx Transaction) {
}
h.telemetry.aggregations.Add(1)
stats = NewRequestStats()
stats.Sketches = h.sketches
h.stats[key] = stats
}

Expand Down Expand Up @@ -217,3 +225,22 @@ func (h *StatKeeper) clearEphemeralPorts(aggregator *utils.ConnectionAggregator,
stats[key] = aggregation
}
}

// newSketchPool creates new pool of 'DDSketch' objects.
func newSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] {
sketchPool := ddsync.NewTypedPool(func() *ddsketch.DDSketch {
sketch, err := ddsketch.NewDefaultDDSketch(RelativeAccuracy)
if err != nil {
log.Debugf("http stats, could not create new ddsketch for pool, error: %v", err)
}
return sketch
})
return sketchPool
}

// ReleaseSketches puts 'DDSketch' objects back to pool.
func (h *StatKeeper) ReleaseSketches() {
for _, stats := range h.stats {
stats.PutSketches()
}
}
57 changes: 57 additions & 0 deletions pkg/network/protocols/http/statkeeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func TestProcessHTTPTransactions(t *testing.T) {
cfg.MaxHTTPStatsBuffered = 1000
tel := NewTelemetry("http")
sk := NewStatkeeper(cfg, tel, NewIncompleteBuffer(cfg, tel))
t.Cleanup(func() {
sk.ReleaseSketches()
})

srcString := "1.1.1.1"
dstString := "2.2.2.2"
Expand Down Expand Up @@ -53,6 +56,7 @@ func TestProcessHTTPTransactions(t *testing.T) {
for i := 0; i < 5; i++ {
s := stats.Data[uint16((i+1)*100)]
require.NotNil(t, s)
require.NotNil(t, s.Latencies)
assert.Equal(t, 2, s.Count)
assert.Equal(t, 2.0, s.Latencies.GetCount())

Expand All @@ -64,6 +68,7 @@ func TestProcessHTTPTransactions(t *testing.T) {
assert.True(t, p50 >= expectedLatency-acceptableError)
assert.True(t, p50 <= expectedLatency+acceptableError)
}
sk.ReleaseSketches()
}
}

Expand Down Expand Up @@ -322,3 +327,55 @@ func TestHTTPCorrectness(t *testing.T) {
require.Len(t, stats, 0)
})
}

func makeStatkeeper() *StatKeeper {
cfg := config.New()
cfg.MaxHTTPStatsBuffered = 100000
tel := NewTelemetry("http")
return NewStatkeeper(cfg, tel, NewIncompleteBuffer(cfg, tel))
}

func benchmarkHTTPStatkeeper(b *testing.B, sk *StatKeeper) {
sourceIP := util.AddressFromString("1.1.1.1")
sourcePort := 1234
destIP := util.AddressFromString("2.2.2.2")
destPort := 8080

const numPaths = 10000
const uniqPaths = 50
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
sk.GetAndResetAllStats()
for p := 0; p < numPaths; p++ {
b.StopTimer()
//we use subset of unique endpoints, but those will occur over and over again like in regular target application
path := "/testpath/blablabla/dsadas/isdaasd/asdasadsadasd" + strconv.Itoa(p%uniqPaths)
//we simulate different conn tuples by increasing the port number
newSourcePort := sourcePort + (p % 30)
statusCode := (i%5 + 1) * 100
latency := time.Duration(i%5+1) * time.Millisecond
tx := generateIPv4HTTPTransaction(sourceIP, destIP, newSourcePort, destPort, path, statusCode, latency)
b.StartTimer()
sk.Process(tx)
}
sk.ReleaseSketches()
}
b.StopTimer()
}

// BenchmarkHTTPStatkeeperWithPool benchmark allocations with pool of 'DDSketch' objects
func BenchmarkHTTPStatkeeperWithPool(b *testing.B) {
sk := makeStatkeeper()

benchmarkHTTPStatkeeper(b, sk)
}

// BenchmarkHTTPStatkeeperNoPool benchmark allocations without pool of 'DDSketch' objects
func BenchmarkHTTPStatkeeperNoPool(b *testing.B) {
sk := makeStatkeeper()
// disable pool of 'DDSketch' objects
sk.sketches = nil

benchmarkHTTPStatkeeper(b, sk)
}
21 changes: 17 additions & 4 deletions pkg/network/protocols/http/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
package http

import (
"github.com/DataDog/sketches-go/ddsketch"

"github.com/DataDog/datadog-agent/pkg/network/types"
"github.com/DataDog/datadog-agent/pkg/process/util"
"github.com/DataDog/datadog-agent/pkg/util/intern"
"github.com/DataDog/datadog-agent/pkg/util/log"
ddsync "github.com/DataDog/datadog-agent/pkg/util/sync"
"github.com/DataDog/sketches-go/ddsketch"
)

// Interner is used to intern strings to save memory allocations.
Expand Down Expand Up @@ -138,7 +138,8 @@ func (r *RequestStat) initSketch() (err error) {

// RequestStats stores HTTP request statistics.
type RequestStats struct {
Data map[uint16]*RequestStat
Data map[uint16]*RequestStat
Sketches *ddsync.TypedPool[ddsketch.DDSketch]
}

// NewRequestStats creates a new RequestStats object.
Expand Down Expand Up @@ -221,7 +222,9 @@ func (r *RequestStats) AddRequest(statusCode uint16, latency float64, staticTags
}

if stats.Latencies == nil {
if err := stats.initSketch(); err != nil {
if r.Sketches != nil {
stats.Latencies = r.Sketches.Get()
} else if err := stats.initSketch(); err != nil {
return
}

Expand All @@ -245,3 +248,13 @@ func (r *RequestStats) HalfAllCounts() {
}
}
}

// PutSketches returns all obtained 'DDSketch' objects to the pool.
func (r *RequestStats) PutSketches() {
if r.Sketches != nil {
for _, stats := range r.Data {
r.Sketches.Put(stats.Latencies)
stats.Latencies = nil
}
}
}
5 changes: 5 additions & 0 deletions pkg/network/protocols/http2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,11 @@ func (p *Protocol) GetStats() *protocols.ProtocolStats {
}
}

// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol.
func (p *Protocol) ReleaseStats() {
return
}

// IsBuildModeSupported returns always true, as http2 module is supported by all modes.
func (*Protocol) IsBuildModeSupported(buildmode.Type) bool {
return true
Expand Down
5 changes: 5 additions & 0 deletions pkg/network/protocols/kafka/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats {
}
}

// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol.
func (p *protocol) ReleaseStats() {
return
}

// IsBuildModeSupported returns always true, as kafka module is supported by all modes.
func (*protocol) IsBuildModeSupported(buildmode.Type) bool {
return true
Expand Down
5 changes: 5 additions & 0 deletions pkg/network/protocols/postgres/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats {
}
}

// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol.
func (p *protocol) ReleaseStats() {
return
}

// IsBuildModeSupported returns always true, as postgres module is supported by all modes.
func (*protocol) IsBuildModeSupported(buildmode.Type) bool {
return true
Expand Down
3 changes: 3 additions & 0 deletions pkg/network/protocols/protocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type Protocol interface {
// implementation.
GetStats() *ProtocolStats

// ReleaseStats puts back stats related objects to pool.
ReleaseStats()

// IsBuildModeSupported return true is the given build mode is supported by this protocol.
IsBuildModeSupported(buildmode.Type) bool
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/network/protocols/redis/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats {
}
}

// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol.
func (p *protocol) ReleaseStats() {
return
}

// IsBuildModeSupported returns always true, as Redis module is supported by all modes.
func (*protocol) IsBuildModeSupported(buildmode.Type) bool {
return true
Expand Down
3 changes: 3 additions & 0 deletions pkg/network/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ func (t *Tracer) GetActiveConnections(clientID string) (*network.Connections, er

delta := t.state.GetDelta(clientID, latestTime, active, t.reverseDNS.GetDNSStats(), t.usmMonitor.GetProtocolStats())

// release stats objects back to pool
t.usmMonitor.ReleaseStats()

ips := make(map[util.Address]struct{}, len(delta.Conns)/2)
var udpConns, tcpConns int
for i := range delta.Conns {
Expand Down
5 changes: 5 additions & 0 deletions pkg/network/usm/ebpf_gotls.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ func (p *goTLSProgram) GetStats() *protocols.ProtocolStats {
return nil
}

// ReleaseStats is a no-op.
func (p *goTLSProgram) ReleaseStats() {
return
}

// Stop terminates goTLS main goroutine.
func (p *goTLSProgram) Stop(*manager.Manager) {
close(p.done)
Expand Down
6 changes: 6 additions & 0 deletions pkg/network/usm/ebpf_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,12 @@ func (e *ebpfProgram) getProtocolStats() map[protocols.ProtocolType]interface{}
return ret
}

func (e *ebpfProgram) releaseStats() {
for _, protocol := range e.enabledProtocols {
protocol.Instance.ReleaseStats()
}
}

// executePerProtocol runs the given callback (`cb`) for every protocol in the given list (`protocolList`).
// If the callback failed, then we call the error callback (`errorCb`). Eventually returning a list of protocols which
// successfully executed the callback.
Expand Down
5 changes: 5 additions & 0 deletions pkg/network/usm/ebpf_ssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,11 @@ func (o *sslProgram) GetStats() *protocols.ProtocolStats {
return nil
}

// ReleaseStats is a no-op.
func (p *sslProgram) ReleaseStats() {
return
}

const (
// Defined in https://man7.org/linux/man-pages/man5/proc.5.html.
taskCommLen = 16
Expand Down
5 changes: 5 additions & 0 deletions pkg/network/usm/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ func (m *Monitor) GetProtocolStats() map[protocols.ProtocolType]interface{} {
return m.ebpfProgram.getProtocolStats()
}

// ReleaseStats puts back stats related objects to pool.
func (m *Monitor) ReleaseStats() {
m.ebpfProgram.releaseStats()
}

// Stop HTTP monitoring
func (m *Monitor) Stop() {
if m == nil {
Expand Down
Loading