From 72c02da8b81d4ebb4dcfe0e77a7139fbc6593ca4 Mon Sep 17 00:00:00 2001 From: "yuri.lipnesh" Date: Tue, 10 Dec 2024 14:34:20 -0500 Subject: [PATCH 1/2] [usm] reliability, http, use DDSketch pool in StatKeeper. --- pkg/network/protocols/http/statkeeper.go | 47 +++++++++++++++- pkg/network/protocols/http/statkeeper_test.go | 55 +++++++++++++++++++ pkg/network/protocols/http/stats.go | 18 +++++- 3 files changed, 115 insertions(+), 5 deletions(-) diff --git a/pkg/network/protocols/http/statkeeper.go b/pkg/network/protocols/http/statkeeper.go index 6cb428c4722bf..356b33f479930 100644 --- a/pkg/network/protocols/http/statkeeper.go +++ b/pkg/network/protocols/http/statkeeper.go @@ -12,15 +12,19 @@ import ( "sync" "time" + "github.com/DataDog/sketches-go/ddsketch" + "github.com/DataDog/datadog-agent/pkg/network/config" "github.com/DataDog/datadog-agent/pkg/network/usm/utils" "github.com/DataDog/datadog-agent/pkg/util/log" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" ) // StatKeeper is responsible for aggregating HTTP stats. type StatKeeper struct { mux sync.Mutex stats map[Key]*RequestStats + prevStats map[Key]*RequestStats incomplete IncompleteBuffer maxEntries int quantizer *URLQuantizer @@ -34,6 +38,9 @@ type StatKeeper struct { buffer []byte oversizedLogLimit *log.Limit + + // pool of 'DDSketch' objects + sketches *ddsync.TypedPool[ddsketch.DDSketch] } // NewStatkeeper returns a new StatKeeper. @@ -59,6 +66,7 @@ func NewStatkeeper(c *config.Config, telemetry *Telemetry, incompleteBuffer Inco buffer: make([]byte, getPathBufferSize(c)), telemetry: telemetry, oversizedLogLimit: log.NewLogLimit(10, time.Minute*10), + sketches: newSketchPool(), } } @@ -86,8 +94,11 @@ func (h *StatKeeper) GetAndResetAllStats() (stats map[Key]*RequestStats) { h.add(tx) } + // put back 'DDSketch' objects to pool + h.releasePreviousSketches() + // Rotate stats - stats = h.stats + h.prevStats = h.stats h.stats = make(map[Key]*RequestStats) // Rotate ConnectionAggregator @@ -100,13 +111,14 @@ func (h *StatKeeper) GetAndResetAllStats() (stats map[Key]*RequestStats) { h.connectionAggregator = utils.NewConnectionAggregator() }() - h.clearEphemeralPorts(previousAggregationState, stats) - return stats + h.clearEphemeralPorts(previousAggregationState, h.prevStats) + return h.prevStats } // Close closes the stat keeper. func (h *StatKeeper) Close() { h.oversizedLogLimit.Close() + h.releaseAllSketches() } func (h *StatKeeper) add(tx Transaction) { @@ -157,6 +169,7 @@ func (h *StatKeeper) add(tx Transaction) { } h.telemetry.aggregations.Add(1) stats = NewRequestStats() + stats.Sketches = h.sketches h.stats[key] = stats } @@ -217,3 +230,31 @@ func (h *StatKeeper) clearEphemeralPorts(aggregator *utils.ConnectionAggregator, stats[key] = aggregation } } + +// newSketchPool creates new pool of 'DDSketch' objects. +func newSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] { + sketchPool := ddsync.NewTypedPool(func() *ddsketch.DDSketch { + sketch, err := ddsketch.NewDefaultDDSketch(RelativeAccuracy) + if err != nil { + log.Debugf("http stats, could not create new ddsketch for pool, error: %v", err) + } + return sketch + }) + return sketchPool +} + +// releasePreviousSketches puts 'DDSketch' objects from previous cycle back to pool. +func (h *StatKeeper) releasePreviousSketches() { + for _, stats := range h.prevStats { + stats.PutSketches() + } + h.prevStats = nil +} + +// releaseAllSketches puts 'DDSketch' objects from previous and current cycle back to pool. +func (h *StatKeeper) releaseAllSketches() { + h.releasePreviousSketches() + for _, stats := range h.stats { + stats.PutSketches() + } +} diff --git a/pkg/network/protocols/http/statkeeper_test.go b/pkg/network/protocols/http/statkeeper_test.go index 8ae60da8b5859..4e44fa6844abd 100644 --- a/pkg/network/protocols/http/statkeeper_test.go +++ b/pkg/network/protocols/http/statkeeper_test.go @@ -25,6 +25,9 @@ func TestProcessHTTPTransactions(t *testing.T) { cfg.MaxHTTPStatsBuffered = 1000 tel := NewTelemetry("http") sk := NewStatkeeper(cfg, tel, NewIncompleteBuffer(cfg, tel)) + t.Cleanup(func() { + sk.releaseAllSketches() + }) srcString := "1.1.1.1" dstString := "2.2.2.2" @@ -53,6 +56,7 @@ func TestProcessHTTPTransactions(t *testing.T) { for i := 0; i < 5; i++ { s := stats.Data[uint16((i+1)*100)] require.NotNil(t, s) + require.NotNil(t, s.Latencies) assert.Equal(t, 2, s.Count) assert.Equal(t, 2.0, s.Latencies.GetCount()) @@ -322,3 +326,54 @@ func TestHTTPCorrectness(t *testing.T) { require.Len(t, stats, 0) }) } + +func makeStatkeeper() *StatKeeper { + cfg := config.New() + cfg.MaxHTTPStatsBuffered = 100000 + tel := NewTelemetry("http") + return NewStatkeeper(cfg, tel, NewIncompleteBuffer(cfg, tel)) +} + +func benchmarkHTTPStatkeeper(b *testing.B, sk *StatKeeper) { + sourceIP := util.AddressFromString("1.1.1.1") + sourcePort := 1234 + destIP := util.AddressFromString("2.2.2.2") + destPort := 8080 + + const numPaths = 10000 + const uniqPaths = 50 + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + sk.GetAndResetAllStats() + for p := 0; p < numPaths; p++ { + b.StopTimer() + //we use subset of unique endpoints, but those will occur over and over again like in regular target application + path := "/testpath/blablabla/dsadas/isdaasd/asdasadsadasd" + strconv.Itoa(p%uniqPaths) + //we simulate different conn tuples by increasing the port number + newSourcePort := sourcePort + (p % 30) + statusCode := (i%5 + 1) * 100 + latency := time.Duration(i%5+1) * time.Millisecond + tx := generateIPv4HTTPTransaction(sourceIP, destIP, newSourcePort, destPort, path, statusCode, latency) + b.StartTimer() + sk.Process(tx) + } + } + b.StopTimer() +} + +// BenchmarkHTTPStatkeeperWithPool benchmark allocations with pool of 'DDSketch' objects +func BenchmarkHTTPStatkeeperWithPool(b *testing.B) { + sk := makeStatkeeper() + + benchmarkHTTPStatkeeper(b, sk) +} + +// BenchmarkHTTPStatkeeperNoPool benchmark allocations without pool of 'DDSketch' objects +func BenchmarkHTTPStatkeeperNoPool(b *testing.B) { + sk := makeStatkeeper() + // disable pool of 'DDSketch' objects + sk.sketches = nil + + benchmarkHTTPStatkeeper(b, sk) +} diff --git a/pkg/network/protocols/http/stats.go b/pkg/network/protocols/http/stats.go index 43c86354f4bcb..7c1f3a65b9ae4 100644 --- a/pkg/network/protocols/http/stats.go +++ b/pkg/network/protocols/http/stats.go @@ -6,6 +6,7 @@ package http import ( + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" "github.com/DataDog/sketches-go/ddsketch" "github.com/DataDog/datadog-agent/pkg/network/types" @@ -138,7 +139,8 @@ func (r *RequestStat) initSketch() (err error) { // RequestStats stores HTTP request statistics. type RequestStats struct { - Data map[uint16]*RequestStat + Data map[uint16]*RequestStat + Sketches *ddsync.TypedPool[ddsketch.DDSketch] } // NewRequestStats creates a new RequestStats object. @@ -221,7 +223,9 @@ func (r *RequestStats) AddRequest(statusCode uint16, latency float64, staticTags } if stats.Latencies == nil { - if err := stats.initSketch(); err != nil { + if r.Sketches != nil { + stats.Latencies = r.Sketches.Get() + } else if err := stats.initSketch(); err != nil { return } @@ -245,3 +249,13 @@ func (r *RequestStats) HalfAllCounts() { } } } + +// PutSketches returns all obtained 'DDSketch' objects to the pool. +func (r *RequestStats) PutSketches() { + if r.Sketches != nil { + for _, stats := range r.Data { + r.Sketches.Put(stats.Latencies) + stats.Latencies = nil + } + } +} From 1dd7af59b896a9ca92c48e3772be5c3a3856eae2 Mon Sep 17 00:00:00 2001 From: "yuri.lipnesh" Date: Wed, 11 Dec 2024 15:12:40 -0500 Subject: [PATCH 2/2] [usm] reliability, http, add ReleaseStats() to Protocol, put DDSketch objects to pool. --- pkg/network/protocols/http/protocol.go | 5 ++++ pkg/network/protocols/http/statkeeper.go | 28 +++++-------------- pkg/network/protocols/http/statkeeper_test.go | 4 ++- pkg/network/protocols/http/stats.go | 5 ++-- pkg/network/protocols/http2/protocol.go | 5 ++++ pkg/network/protocols/kafka/protocol.go | 5 ++++ pkg/network/protocols/postgres/protocol.go | 5 ++++ pkg/network/protocols/protocols.go | 3 ++ pkg/network/protocols/redis/protocol.go | 5 ++++ pkg/network/tracer/tracer.go | 3 ++ pkg/network/usm/ebpf_gotls.go | 5 ++++ pkg/network/usm/ebpf_main.go | 6 ++++ pkg/network/usm/ebpf_ssl.go | 5 ++++ pkg/network/usm/monitor.go | 5 ++++ 14 files changed, 64 insertions(+), 25 deletions(-) diff --git a/pkg/network/protocols/http/protocol.go b/pkg/network/protocols/http/protocol.go index 0c1c4e8b2bc06..aa64c99e16fe3 100644 --- a/pkg/network/protocols/http/protocol.go +++ b/pkg/network/protocols/http/protocol.go @@ -225,6 +225,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats { } } +// ReleaseStats release objects back to pool. +func (p *protocol) ReleaseStats() { + p.statkeeper.ReleaseSketches() +} + // IsBuildModeSupported returns always true, as http module is supported by all modes. func (*protocol) IsBuildModeSupported(buildmode.Type) bool { return true diff --git a/pkg/network/protocols/http/statkeeper.go b/pkg/network/protocols/http/statkeeper.go index 356b33f479930..28358e637b862 100644 --- a/pkg/network/protocols/http/statkeeper.go +++ b/pkg/network/protocols/http/statkeeper.go @@ -12,19 +12,17 @@ import ( "sync" "time" - "github.com/DataDog/sketches-go/ddsketch" - "github.com/DataDog/datadog-agent/pkg/network/config" "github.com/DataDog/datadog-agent/pkg/network/usm/utils" "github.com/DataDog/datadog-agent/pkg/util/log" ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" + "github.com/DataDog/sketches-go/ddsketch" ) // StatKeeper is responsible for aggregating HTTP stats. type StatKeeper struct { mux sync.Mutex stats map[Key]*RequestStats - prevStats map[Key]*RequestStats incomplete IncompleteBuffer maxEntries int quantizer *URLQuantizer @@ -94,11 +92,8 @@ func (h *StatKeeper) GetAndResetAllStats() (stats map[Key]*RequestStats) { h.add(tx) } - // put back 'DDSketch' objects to pool - h.releasePreviousSketches() - // Rotate stats - h.prevStats = h.stats + stats = h.stats h.stats = make(map[Key]*RequestStats) // Rotate ConnectionAggregator @@ -111,14 +106,14 @@ func (h *StatKeeper) GetAndResetAllStats() (stats map[Key]*RequestStats) { h.connectionAggregator = utils.NewConnectionAggregator() }() - h.clearEphemeralPorts(previousAggregationState, h.prevStats) - return h.prevStats + h.clearEphemeralPorts(previousAggregationState, stats) + return stats } // Close closes the stat keeper. func (h *StatKeeper) Close() { h.oversizedLogLimit.Close() - h.releaseAllSketches() + h.ReleaseSketches() } func (h *StatKeeper) add(tx Transaction) { @@ -243,17 +238,8 @@ func newSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] { return sketchPool } -// releasePreviousSketches puts 'DDSketch' objects from previous cycle back to pool. -func (h *StatKeeper) releasePreviousSketches() { - for _, stats := range h.prevStats { - stats.PutSketches() - } - h.prevStats = nil -} - -// releaseAllSketches puts 'DDSketch' objects from previous and current cycle back to pool. -func (h *StatKeeper) releaseAllSketches() { - h.releasePreviousSketches() +// ReleaseSketches puts 'DDSketch' objects back to pool. +func (h *StatKeeper) ReleaseSketches() { for _, stats := range h.stats { stats.PutSketches() } diff --git a/pkg/network/protocols/http/statkeeper_test.go b/pkg/network/protocols/http/statkeeper_test.go index 4e44fa6844abd..79e996a46e31c 100644 --- a/pkg/network/protocols/http/statkeeper_test.go +++ b/pkg/network/protocols/http/statkeeper_test.go @@ -26,7 +26,7 @@ func TestProcessHTTPTransactions(t *testing.T) { tel := NewTelemetry("http") sk := NewStatkeeper(cfg, tel, NewIncompleteBuffer(cfg, tel)) t.Cleanup(func() { - sk.releaseAllSketches() + sk.ReleaseSketches() }) srcString := "1.1.1.1" @@ -68,6 +68,7 @@ func TestProcessHTTPTransactions(t *testing.T) { assert.True(t, p50 >= expectedLatency-acceptableError) assert.True(t, p50 <= expectedLatency+acceptableError) } + sk.ReleaseSketches() } } @@ -358,6 +359,7 @@ func benchmarkHTTPStatkeeper(b *testing.B, sk *StatKeeper) { b.StartTimer() sk.Process(tx) } + sk.ReleaseSketches() } b.StopTimer() } diff --git a/pkg/network/protocols/http/stats.go b/pkg/network/protocols/http/stats.go index 7c1f3a65b9ae4..949b69ed7961f 100644 --- a/pkg/network/protocols/http/stats.go +++ b/pkg/network/protocols/http/stats.go @@ -6,13 +6,12 @@ package http import ( - ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" - "github.com/DataDog/sketches-go/ddsketch" - "github.com/DataDog/datadog-agent/pkg/network/types" "github.com/DataDog/datadog-agent/pkg/process/util" "github.com/DataDog/datadog-agent/pkg/util/intern" "github.com/DataDog/datadog-agent/pkg/util/log" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" + "github.com/DataDog/sketches-go/ddsketch" ) // Interner is used to intern strings to save memory allocations. diff --git a/pkg/network/protocols/http2/protocol.go b/pkg/network/protocols/http2/protocol.go index 433f55e15715e..a8fd1375a480b 100644 --- a/pkg/network/protocols/http2/protocol.go +++ b/pkg/network/protocols/http2/protocol.go @@ -431,6 +431,11 @@ func (p *Protocol) GetStats() *protocols.ProtocolStats { } } +// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol. +func (p *Protocol) ReleaseStats() { + return +} + // IsBuildModeSupported returns always true, as http2 module is supported by all modes. func (*Protocol) IsBuildModeSupported(buildmode.Type) bool { return true diff --git a/pkg/network/protocols/kafka/protocol.go b/pkg/network/protocols/kafka/protocol.go index 54bcb67ff5fdf..772af679ae1ac 100644 --- a/pkg/network/protocols/kafka/protocol.go +++ b/pkg/network/protocols/kafka/protocol.go @@ -361,6 +361,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats { } } +// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol. +func (p *protocol) ReleaseStats() { + return +} + // IsBuildModeSupported returns always true, as kafka module is supported by all modes. func (*protocol) IsBuildModeSupported(buildmode.Type) bool { return true diff --git a/pkg/network/protocols/postgres/protocol.go b/pkg/network/protocols/postgres/protocol.go index c0d7aa37a6747..2d850e9b99714 100644 --- a/pkg/network/protocols/postgres/protocol.go +++ b/pkg/network/protocols/postgres/protocol.go @@ -239,6 +239,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats { } } +// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol. +func (p *protocol) ReleaseStats() { + return +} + // IsBuildModeSupported returns always true, as postgres module is supported by all modes. func (*protocol) IsBuildModeSupported(buildmode.Type) bool { return true diff --git a/pkg/network/protocols/protocols.go b/pkg/network/protocols/protocols.go index acec71ad05d33..50dc7bc420d1d 100644 --- a/pkg/network/protocols/protocols.go +++ b/pkg/network/protocols/protocols.go @@ -61,6 +61,9 @@ type Protocol interface { // implementation. GetStats() *ProtocolStats + // ReleaseStats puts back stats related objects to pool. + ReleaseStats() + // IsBuildModeSupported return true is the given build mode is supported by this protocol. IsBuildModeSupported(buildmode.Type) bool } diff --git a/pkg/network/protocols/redis/protocol.go b/pkg/network/protocols/redis/protocol.go index 831d019b30416..47456184d970a 100644 --- a/pkg/network/protocols/redis/protocol.go +++ b/pkg/network/protocols/redis/protocol.go @@ -155,6 +155,11 @@ func (p *protocol) GetStats() *protocols.ProtocolStats { } } +// ReleaseStats release objects back to pool, not implemented yet, to satisfy protocol. +func (p *protocol) ReleaseStats() { + return +} + // IsBuildModeSupported returns always true, as Redis module is supported by all modes. func (*protocol) IsBuildModeSupported(buildmode.Type) bool { return true diff --git a/pkg/network/tracer/tracer.go b/pkg/network/tracer/tracer.go index 20e97a49ac3e6..e75c6b9b3a123 100644 --- a/pkg/network/tracer/tracer.go +++ b/pkg/network/tracer/tracer.go @@ -426,6 +426,9 @@ func (t *Tracer) GetActiveConnections(clientID string) (*network.Connections, er delta := t.state.GetDelta(clientID, latestTime, active, t.reverseDNS.GetDNSStats(), t.usmMonitor.GetProtocolStats()) + // release stats objects back to pool + t.usmMonitor.ReleaseStats() + ips := make(map[util.Address]struct{}, len(delta.Conns)/2) var udpConns, tcpConns int for i := range delta.Conns { diff --git a/pkg/network/usm/ebpf_gotls.go b/pkg/network/usm/ebpf_gotls.go index e12871aa619bf..fcf7c02691f7d 100644 --- a/pkg/network/usm/ebpf_gotls.go +++ b/pkg/network/usm/ebpf_gotls.go @@ -290,6 +290,11 @@ func (p *goTLSProgram) GetStats() *protocols.ProtocolStats { return nil } +// ReleaseStats is a no-op. +func (p *goTLSProgram) ReleaseStats() { + return +} + // Stop terminates goTLS main goroutine. func (p *goTLSProgram) Stop(*manager.Manager) { close(p.done) diff --git a/pkg/network/usm/ebpf_main.go b/pkg/network/usm/ebpf_main.go index 85f9f7123a593..4ff4221d7790d 100644 --- a/pkg/network/usm/ebpf_main.go +++ b/pkg/network/usm/ebpf_main.go @@ -544,6 +544,12 @@ func (e *ebpfProgram) getProtocolStats() map[protocols.ProtocolType]interface{} return ret } +func (e *ebpfProgram) releaseStats() { + for _, protocol := range e.enabledProtocols { + protocol.Instance.ReleaseStats() + } +} + // executePerProtocol runs the given callback (`cb`) for every protocol in the given list (`protocolList`). // If the callback failed, then we call the error callback (`errorCb`). Eventually returning a list of protocols which // successfully executed the callback. diff --git a/pkg/network/usm/ebpf_ssl.go b/pkg/network/usm/ebpf_ssl.go index af4fa84364568..1939e05839772 100644 --- a/pkg/network/usm/ebpf_ssl.go +++ b/pkg/network/usm/ebpf_ssl.go @@ -564,6 +564,11 @@ func (o *sslProgram) GetStats() *protocols.ProtocolStats { return nil } +// ReleaseStats is a no-op. +func (p *sslProgram) ReleaseStats() { + return +} + const ( // Defined in https://man7.org/linux/man-pages/man5/proc.5.html. taskCommLen = 16 diff --git a/pkg/network/usm/monitor.go b/pkg/network/usm/monitor.go index 4ee3d2e535d68..2602ed9408f43 100644 --- a/pkg/network/usm/monitor.go +++ b/pkg/network/usm/monitor.go @@ -196,6 +196,11 @@ func (m *Monitor) GetProtocolStats() map[protocols.ProtocolType]interface{} { return m.ebpfProgram.getProtocolStats() } +// ReleaseStats puts back stats related objects to pool. +func (m *Monitor) ReleaseStats() { + m.ebpfProgram.releaseStats() +} + // Stop HTTP monitoring func (m *Monitor) Stop() { if m == nil {