diff --git a/pkg/network/protocols/http/protocol.go b/pkg/network/protocols/http/protocol.go index 0c1c4e8b2bc06..aa64c99e16fe3 100644 --- a/pkg/network/protocols/http/protocol.go +++ b/pkg/network/protocols/http/protocol.go @@ -225,6 +225,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats { } } +// ReleaseStats release objects back to pool. +func (p *protocol) ReleaseStats() { + p.statkeeper.ReleaseSketches() +} + // IsBuildModeSupported returns always true, as http module is supported by all modes. func (*protocol) IsBuildModeSupported(buildmode.Type) bool { return true diff --git a/pkg/network/protocols/http/statkeeper.go b/pkg/network/protocols/http/statkeeper.go index 6cb428c4722bf..28358e637b862 100644 --- a/pkg/network/protocols/http/statkeeper.go +++ b/pkg/network/protocols/http/statkeeper.go @@ -15,6 +15,8 @@ import ( "github.com/DataDog/datadog-agent/pkg/network/config" "github.com/DataDog/datadog-agent/pkg/network/usm/utils" "github.com/DataDog/datadog-agent/pkg/util/log" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" + "github.com/DataDog/sketches-go/ddsketch" ) // StatKeeper is responsible for aggregating HTTP stats. @@ -34,6 +36,9 @@ type StatKeeper struct { buffer []byte oversizedLogLimit *log.Limit + + // pool of 'DDSketch' objects + sketches *ddsync.TypedPool[ddsketch.DDSketch] } // NewStatkeeper returns a new StatKeeper. @@ -59,6 +64,7 @@ func NewStatkeeper(c *config.Config, telemetry *Telemetry, incompleteBuffer Inco buffer: make([]byte, getPathBufferSize(c)), telemetry: telemetry, oversizedLogLimit: log.NewLogLimit(10, time.Minute*10), + sketches: newSketchPool(), } } @@ -107,6 +113,7 @@ func (h *StatKeeper) GetAndResetAllStats() (stats map[Key]*RequestStats) { // Close closes the stat keeper. func (h *StatKeeper) Close() { h.oversizedLogLimit.Close() + h.ReleaseSketches() } func (h *StatKeeper) add(tx Transaction) { @@ -157,6 +164,7 @@ func (h *StatKeeper) add(tx Transaction) { } h.telemetry.aggregations.Add(1) stats = NewRequestStats() + stats.Sketches = h.sketches h.stats[key] = stats } @@ -217,3 +225,22 @@ func (h *StatKeeper) clearEphemeralPorts(aggregator *utils.ConnectionAggregator, stats[key] = aggregation } } + +// newSketchPool creates new pool of 'DDSketch' objects. +func newSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] { + sketchPool := ddsync.NewTypedPool(func() *ddsketch.DDSketch { + sketch, err := ddsketch.NewDefaultDDSketch(RelativeAccuracy) + if err != nil { + log.Debugf("http stats, could not create new ddsketch for pool, error: %v", err) + } + return sketch + }) + return sketchPool +} + +// ReleaseSketches puts 'DDSketch' objects back to pool. +func (h *StatKeeper) ReleaseSketches() { + for _, stats := range h.stats { + stats.PutSketches() + } +} diff --git a/pkg/network/protocols/http/statkeeper_test.go b/pkg/network/protocols/http/statkeeper_test.go index 8ae60da8b5859..79e996a46e31c 100644 --- a/pkg/network/protocols/http/statkeeper_test.go +++ b/pkg/network/protocols/http/statkeeper_test.go @@ -25,6 +25,9 @@ func TestProcessHTTPTransactions(t *testing.T) { cfg.MaxHTTPStatsBuffered = 1000 tel := NewTelemetry("http") sk := NewStatkeeper(cfg, tel, NewIncompleteBuffer(cfg, tel)) + t.Cleanup(func() { + sk.ReleaseSketches() + }) srcString := "1.1.1.1" dstString := "2.2.2.2" @@ -53,6 +56,7 @@ func TestProcessHTTPTransactions(t *testing.T) { for i := 0; i < 5; i++ { s := stats.Data[uint16((i+1)*100)] require.NotNil(t, s) + require.NotNil(t, s.Latencies) assert.Equal(t, 2, s.Count) assert.Equal(t, 2.0, s.Latencies.GetCount()) @@ -64,6 +68,7 @@ func TestProcessHTTPTransactions(t *testing.T) { assert.True(t, p50 >= expectedLatency-acceptableError) assert.True(t, p50 <= expectedLatency+acceptableError) } + sk.ReleaseSketches() } } @@ -322,3 +327,55 @@ func TestHTTPCorrectness(t *testing.T) { require.Len(t, stats, 0) }) } + +func makeStatkeeper() *StatKeeper { + cfg := config.New() + cfg.MaxHTTPStatsBuffered = 100000 + tel := NewTelemetry("http") + return NewStatkeeper(cfg, tel, NewIncompleteBuffer(cfg, tel)) +} + +func benchmarkHTTPStatkeeper(b *testing.B, sk *StatKeeper) { + sourceIP := util.AddressFromString("1.1.1.1") + sourcePort := 1234 + destIP := util.AddressFromString("2.2.2.2") + destPort := 8080 + + const numPaths = 10000 + const uniqPaths = 50 + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + sk.GetAndResetAllStats() + for p := 0; p < numPaths; p++ { + b.StopTimer() + //we use subset of unique endpoints, but those will occur over and over again like in regular target application + path := "/testpath/blablabla/dsadas/isdaasd/asdasadsadasd" + strconv.Itoa(p%uniqPaths) + //we simulate different conn tuples by increasing the port number + newSourcePort := sourcePort + (p % 30) + statusCode := (i%5 + 1) * 100 + latency := time.Duration(i%5+1) * time.Millisecond + tx := generateIPv4HTTPTransaction(sourceIP, destIP, newSourcePort, destPort, path, statusCode, latency) + b.StartTimer() + sk.Process(tx) + } + sk.ReleaseSketches() + } + b.StopTimer() +} + +// BenchmarkHTTPStatkeeperWithPool benchmark allocations with pool of 'DDSketch' objects +func BenchmarkHTTPStatkeeperWithPool(b *testing.B) { + sk := makeStatkeeper() + + benchmarkHTTPStatkeeper(b, sk) +} + +// BenchmarkHTTPStatkeeperNoPool benchmark allocations without pool of 'DDSketch' objects +func BenchmarkHTTPStatkeeperNoPool(b *testing.B) { + sk := makeStatkeeper() + // disable pool of 'DDSketch' objects + sk.sketches = nil + + benchmarkHTTPStatkeeper(b, sk) +} diff --git a/pkg/network/protocols/http/stats.go b/pkg/network/protocols/http/stats.go index 43c86354f4bcb..949b69ed7961f 100644 --- a/pkg/network/protocols/http/stats.go +++ b/pkg/network/protocols/http/stats.go @@ -6,12 +6,12 @@ package http import ( - "github.com/DataDog/sketches-go/ddsketch" - "github.com/DataDog/datadog-agent/pkg/network/types" "github.com/DataDog/datadog-agent/pkg/process/util" "github.com/DataDog/datadog-agent/pkg/util/intern" "github.com/DataDog/datadog-agent/pkg/util/log" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" + "github.com/DataDog/sketches-go/ddsketch" ) // Interner is used to intern strings to save memory allocations. @@ -138,7 +138,8 @@ func (r *RequestStat) initSketch() (err error) { // RequestStats stores HTTP request statistics. type RequestStats struct { - Data map[uint16]*RequestStat + Data map[uint16]*RequestStat + Sketches *ddsync.TypedPool[ddsketch.DDSketch] } // NewRequestStats creates a new RequestStats object. @@ -221,7 +222,9 @@ func (r *RequestStats) AddRequest(statusCode uint16, latency float64, staticTags } if stats.Latencies == nil { - if err := stats.initSketch(); err != nil { + if r.Sketches != nil { + stats.Latencies = r.Sketches.Get() + } else if err := stats.initSketch(); err != nil { return } @@ -245,3 +248,13 @@ func (r *RequestStats) HalfAllCounts() { } } } + +// PutSketches returns all obtained 'DDSketch' objects to the pool. +func (r *RequestStats) PutSketches() { + if r.Sketches != nil { + for _, stats := range r.Data { + r.Sketches.Put(stats.Latencies) + stats.Latencies = nil + } + } +} diff --git a/pkg/network/protocols/http2/protocol.go b/pkg/network/protocols/http2/protocol.go index 433f55e15715e..a8fd1375a480b 100644 --- a/pkg/network/protocols/http2/protocol.go +++ b/pkg/network/protocols/http2/protocol.go @@ -431,6 +431,11 @@ func (p *Protocol) GetStats() *protocols.ProtocolStats { } } +// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol. +func (p *Protocol) ReleaseStats() { + return +} + // IsBuildModeSupported returns always true, as http2 module is supported by all modes. func (*Protocol) IsBuildModeSupported(buildmode.Type) bool { return true diff --git a/pkg/network/protocols/kafka/protocol.go b/pkg/network/protocols/kafka/protocol.go index 54bcb67ff5fdf..772af679ae1ac 100644 --- a/pkg/network/protocols/kafka/protocol.go +++ b/pkg/network/protocols/kafka/protocol.go @@ -361,6 +361,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats { } } +// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol. +func (p *protocol) ReleaseStats() { + return +} + // IsBuildModeSupported returns always true, as kafka module is supported by all modes. func (*protocol) IsBuildModeSupported(buildmode.Type) bool { return true diff --git a/pkg/network/protocols/postgres/protocol.go b/pkg/network/protocols/postgres/protocol.go index c0d7aa37a6747..2d850e9b99714 100644 --- a/pkg/network/protocols/postgres/protocol.go +++ b/pkg/network/protocols/postgres/protocol.go @@ -239,6 +239,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats { } } +// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol. +func (p *protocol) ReleaseStats() { + return +} + // IsBuildModeSupported returns always true, as postgres module is supported by all modes. func (*protocol) IsBuildModeSupported(buildmode.Type) bool { return true diff --git a/pkg/network/protocols/protocols.go b/pkg/network/protocols/protocols.go index acec71ad05d33..50dc7bc420d1d 100644 --- a/pkg/network/protocols/protocols.go +++ b/pkg/network/protocols/protocols.go @@ -61,6 +61,9 @@ type Protocol interface { // implementation. GetStats() *ProtocolStats + // ReleaseStats puts back stats related objects to pool. + ReleaseStats() + // IsBuildModeSupported return true is the given build mode is supported by this protocol. IsBuildModeSupported(buildmode.Type) bool } diff --git a/pkg/network/protocols/redis/protocol.go b/pkg/network/protocols/redis/protocol.go index 831d019b30416..47456184d970a 100644 --- a/pkg/network/protocols/redis/protocol.go +++ b/pkg/network/protocols/redis/protocol.go @@ -155,6 +155,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats { } } +// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol. +func (p *protocol) ReleaseStats() { + return +} + // IsBuildModeSupported returns always true, as Redis module is supported by all modes. func (*protocol) IsBuildModeSupported(buildmode.Type) bool { return true diff --git a/pkg/network/tracer/tracer.go b/pkg/network/tracer/tracer.go index 20e97a49ac3e6..e75c6b9b3a123 100644 --- a/pkg/network/tracer/tracer.go +++ b/pkg/network/tracer/tracer.go @@ -426,6 +426,9 @@ func (t *Tracer) GetActiveConnections(clientID string) (*network.Connections, er delta := t.state.GetDelta(clientID, latestTime, active, t.reverseDNS.GetDNSStats(), t.usmMonitor.GetProtocolStats()) + // release stats objects back to pool + t.usmMonitor.ReleaseStats() + ips := make(map[util.Address]struct{}, len(delta.Conns)/2) var udpConns, tcpConns int for i := range delta.Conns { diff --git a/pkg/network/usm/ebpf_gotls.go b/pkg/network/usm/ebpf_gotls.go index e12871aa619bf..fcf7c02691f7d 100644 --- a/pkg/network/usm/ebpf_gotls.go +++ b/pkg/network/usm/ebpf_gotls.go @@ -290,6 +290,11 @@ func (p *goTLSProgram) GetStats() *protocols.ProtocolStats { return nil } +// ReleaseStats is a no-op. +func (p *goTLSProgram) ReleaseStats() { + return +} + // Stop terminates goTLS main goroutine. func (p *goTLSProgram) Stop(*manager.Manager) { close(p.done) diff --git a/pkg/network/usm/ebpf_main.go b/pkg/network/usm/ebpf_main.go index 85f9f7123a593..4ff4221d7790d 100644 --- a/pkg/network/usm/ebpf_main.go +++ b/pkg/network/usm/ebpf_main.go @@ -544,6 +544,12 @@ func (e *ebpfProgram) getProtocolStats() map[protocols.ProtocolType]interface{} return ret } +func (e *ebpfProgram) releaseStats() { + for _, protocol := range e.enabledProtocols { + protocol.Instance.ReleaseStats() + } +} + // executePerProtocol runs the given callback (`cb`) for every protocol in the given list (`protocolList`). // If the callback failed, then we call the error callback (`errorCb`). Eventually returning a list of protocols which // successfully executed the callback. diff --git a/pkg/network/usm/ebpf_ssl.go b/pkg/network/usm/ebpf_ssl.go index af4fa84364568..1939e05839772 100644 --- a/pkg/network/usm/ebpf_ssl.go +++ b/pkg/network/usm/ebpf_ssl.go @@ -564,6 +564,11 @@ func (o *sslProgram) GetStats() *protocols.ProtocolStats { return nil } +// ReleaseStats is a no-op. +func (p *sslProgram) ReleaseStats() { + return +} + const ( // Defined in https://man7.org/linux/man-pages/man5/proc.5.html. taskCommLen = 16 diff --git a/pkg/network/usm/monitor.go b/pkg/network/usm/monitor.go index 4ee3d2e535d68..2602ed9408f43 100644 --- a/pkg/network/usm/monitor.go +++ b/pkg/network/usm/monitor.go @@ -196,6 +196,11 @@ func (m *Monitor) GetProtocolStats() map[protocols.ProtocolType]interface{} { return m.ebpfProgram.getProtocolStats() } +// ReleaseStats puts back stats related objects to pool. +func (m *Monitor) ReleaseStats() { + m.ebpfProgram.releaseStats() +} + // Stop HTTP monitoring func (m *Monitor) Stop() { if m == nil {