From de46c0a9eafab1d877919f183b63dc03ec624834 Mon Sep 17 00:00:00 2001 From: Brian Floersch Date: Fri, 15 Nov 2024 10:13:20 -0500 Subject: [PATCH] Add logs agent pipeline performance telemetry (#30744) Co-authored-by: blt --- cmd/serverless/dependencies_linux_amd64.txt | 1 + cmd/serverless/dependencies_linux_arm64.txt | 1 + .../eventplatformimpl/epforwarder.go | 18 ++- comp/logs/agent/config/constants.go | 4 +- comp/otelcol/ddflareextension/impl/go.mod | 2 + comp/otelcol/logsagentpipeline/go.mod | 2 + .../logsagentpipelineimpl/go.mod | 2 + .../exporter/datadogexporter/go.mod | 2 + go.mod | 2 + modules.yml | 2 + pkg/collector/worker/worker.go | 12 +- pkg/config/setup/config.go | 3 + pkg/logs/auditor/auditor.go | 4 +- pkg/logs/auditor/go.mod | 2 +- pkg/logs/client/destination.go | 7 +- pkg/logs/client/destination_metadata.go | 54 +++++++++ pkg/logs/client/go.mod | 3 + pkg/logs/client/go.sum | 2 + pkg/logs/client/http/destination.go | 45 ++++--- pkg/logs/client/http/destination_test.go | 3 +- pkg/logs/client/http/sync_destination.go | 13 +- pkg/logs/client/http/test_utils.go | 3 +- pkg/logs/client/tcp/destination.go | 5 + pkg/logs/diagnostic/go.mod | 2 +- pkg/logs/diagnostic/message_receiver.go | 8 +- pkg/logs/launchers/file/launcher.go | 19 +-- pkg/logs/message/message.go | 29 ++++- pkg/logs/metrics/capacity_monitor.go | 81 +++++++++++++ pkg/logs/metrics/capacity_monitor_test.go | 56 +++++++++ pkg/logs/metrics/go.mod | 3 + pkg/logs/metrics/go.sum | 2 + pkg/logs/metrics/metrics.go | 10 ++ pkg/logs/metrics/pipeline_monitor.go | 112 +++++++++++++++++ pkg/logs/metrics/pipeline_monitor_test.go | 46 +++++++ pkg/logs/metrics/utilization_monitor.go | 113 ++++++++++++++++++ pkg/logs/metrics/utilization_monitor_test.go | 19 +++ pkg/logs/pipeline/go.mod | 6 +- pkg/logs/pipeline/mock/mock.go | 6 + pkg/logs/pipeline/pipeline.go | 65 +++++----- pkg/logs/pipeline/provider.go | 13 ++ pkg/logs/processor/go.mod | 3 + pkg/logs/processor/go.sum | 2 + pkg/logs/processor/processor.go | 19 ++- pkg/logs/processor/processor_test.go | 6 +- pkg/logs/sds/scanner.go | 10 +- pkg/logs/sds/scanner_nosds.go | 2 +- pkg/logs/sds/scanner_test.go | 13 +- pkg/logs/sender/batch_strategy.go | 24 +++- pkg/logs/sender/batch_strategy_test.go | 13 +- pkg/logs/sender/destination_sender_test.go | 5 + pkg/logs/sender/go.mod | 4 +- pkg/logs/sender/sender.go | 17 ++- pkg/logs/sender/sender_test.go | 15 +-- pkg/logs/tailers/file/tailer.go | 44 ++++--- pkg/logs/tailers/file/tailer_nix.go | 3 +- pkg/logs/tailers/file/tailer_test.go | 89 +++++++------- pkg/util/utilizationtracker/doc.go | 7 ++ pkg/util/utilizationtracker/go.mod | 14 +++ pkg/util/utilizationtracker/go.sum | 12 ++ .../utilization_tracker.go | 69 +++++------ .../utilization_tracker_test.go | 17 +-- test/otel/go.mod | 2 + .../experiment.yaml | 2 +- .../experiment.yaml | 6 + .../experiment.yaml | 2 +- .../experiment.yaml | 6 + .../experiment.yaml | 6 + 67 files changed, 968 insertions(+), 226 deletions(-) create mode 100644 pkg/logs/client/destination_metadata.go create mode 100644 pkg/logs/metrics/capacity_monitor.go create mode 100644 pkg/logs/metrics/capacity_monitor_test.go create mode 100644 pkg/logs/metrics/pipeline_monitor.go create mode 100644 pkg/logs/metrics/pipeline_monitor_test.go create mode 100644 pkg/logs/metrics/utilization_monitor.go create mode 100644 pkg/logs/metrics/utilization_monitor_test.go create mode 100644 pkg/util/utilizationtracker/doc.go create mode 100644 pkg/util/utilizationtracker/go.mod create mode 100644 pkg/util/utilizationtracker/go.sum rename pkg/{collector/worker => util/utilizationtracker}/utilization_tracker.go (61%) rename pkg/{collector/worker => util/utilizationtracker}/utilization_tracker_test.go (94%) diff --git a/cmd/serverless/dependencies_linux_amd64.txt b/cmd/serverless/dependencies_linux_amd64.txt index f8ea17aaec60d..e4d51a4419dd3 100644 --- a/cmd/serverless/dependencies_linux_amd64.txt +++ b/cmd/serverless/dependencies_linux_amd64.txt @@ -301,6 +301,7 @@ github.com/DataDog/datadog-agent/pkg/util/sync github.com/DataDog/datadog-agent/pkg/util/system github.com/DataDog/datadog-agent/pkg/util/system/socket github.com/DataDog/datadog-agent/pkg/util/tmplvar +github.com/DataDog/datadog-agent/pkg/util/utilizationtracker github.com/DataDog/datadog-agent/pkg/version github.com/DataDog/datadog-api-client-go/v2 github.com/DataDog/datadog-api-client-go/v2/api/datadog diff --git a/cmd/serverless/dependencies_linux_arm64.txt b/cmd/serverless/dependencies_linux_arm64.txt index 7e155bda7e040..69eb12f7ebaaf 100644 --- a/cmd/serverless/dependencies_linux_arm64.txt +++ b/cmd/serverless/dependencies_linux_arm64.txt @@ -301,6 +301,7 @@ github.com/DataDog/datadog-agent/pkg/util/sync github.com/DataDog/datadog-agent/pkg/util/system github.com/DataDog/datadog-agent/pkg/util/system/socket github.com/DataDog/datadog-agent/pkg/util/tmplvar +github.com/DataDog/datadog-agent/pkg/util/utilizationtracker github.com/DataDog/datadog-agent/pkg/version github.com/DataDog/datadog-api-client-go/v2 github.com/DataDog/datadog-api-client-go/v2/api/datadog diff --git a/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go index 7e0f51f509689..9c4fb0d451ed5 100644 --- a/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go +++ b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go @@ -9,6 +9,7 @@ package eventplatformimpl import ( "context" "fmt" + "strconv" "strings" "sync" @@ -27,6 +28,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/client" logshttp "github.com/DataDog/datadog-agent/pkg/logs/client/http" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sender" "github.com/DataDog/datadog-agent/pkg/util/fxutil" "github.com/DataDog/datadog-agent/pkg/util/log" @@ -393,15 +395,18 @@ func newHTTPPassthroughPipeline(coreConfig model.Reader, eventPlatformReceiver e if endpoints.InputChanSize <= pkgconfigsetup.DefaultInputChanSize { endpoints.InputChanSize = desc.defaultInputChanSize } + + pipelineMonitor := metrics.NewNoopPipelineMonitor(strconv.Itoa(pipelineID)) + reliable := []client.Destination{} for i, endpoint := range endpoints.GetReliableEndpoints() { - telemetryName := fmt.Sprintf("%s_%d_reliable_%d", desc.eventType, pipelineID, i) - reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, telemetryName, pkgconfigsetup.Datadog())) + destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "reliable", strconv.Itoa(i)) + reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor)) } additionals := []client.Destination{} for i, endpoint := range endpoints.GetUnReliableEndpoints() { - telemetryName := fmt.Sprintf("%s_%d_unreliable_%d", desc.eventType, pipelineID, i) - additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, telemetryName, pkgconfigsetup.Datadog())) + destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "unreliable", strconv.Itoa(i)) + additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor)) } destinations := client.NewDestinations(reliable, additionals) inputChan := make(chan *message.Message, endpoints.InputChanSize) @@ -426,14 +431,15 @@ func newHTTPPassthroughPipeline(coreConfig model.Reader, eventPlatformReceiver e endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, desc.eventType, - encoder) + encoder, + pipelineMonitor) } a := auditor.NewNullAuditor() log.Debugf("Initialized event platform forwarder pipeline. eventType=%s mainHosts=%s additionalHosts=%s batch_max_concurrent_send=%d batch_max_content_size=%d batch_max_size=%d, input_chan_size=%d", desc.eventType, joinHosts(endpoints.GetReliableEndpoints()), joinHosts(endpoints.GetUnReliableEndpoints()), endpoints.BatchMaxConcurrentSend, endpoints.BatchMaxContentSize, endpoints.BatchMaxSize, endpoints.InputChanSize) return &passthroughPipeline{ - sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil), + sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil, pipelineMonitor), strategy: strategy, in: inputChan, auditor: a, diff --git a/comp/logs/agent/config/constants.go b/comp/logs/agent/config/constants.go index 2fd39527b3b90..ae9a0d74680f0 100644 --- a/comp/logs/agent/config/constants.go +++ b/comp/logs/agent/config/constants.go @@ -7,9 +7,7 @@ package config // Pipeline constraints const ( - ChanSize = 100 - DestinationPayloadChanSize = 10 - NumberOfPipelines = 4 + NumberOfPipelines = 4 ) const ( diff --git a/comp/otelcol/ddflareextension/impl/go.mod b/comp/otelcol/ddflareextension/impl/go.mod index 9f67392c10ece..180da88e89fc3 100644 --- a/comp/otelcol/ddflareextension/impl/go.mod +++ b/comp/otelcol/ddflareextension/impl/go.mod @@ -97,6 +97,7 @@ replace ( github.com/DataDog/datadog-agent/pkg/util/system => ../../../../pkg/util/system github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../../../pkg/util/system/socket github.com/DataDog/datadog-agent/pkg/util/testutil => ../../../../pkg/util/testutil + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../../../pkg/util/utilizationtracker github.com/DataDog/datadog-agent/pkg/util/winutil => ../../../../pkg/util/winutil github.com/DataDog/datadog-agent/pkg/version => ../../../../pkg/version github.com/coreos/go-systemd => github.com/coreos/go-systemd v0.0.0-20180202092358-40e2722dffea @@ -239,6 +240,7 @@ require ( github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/system v0.57.1 // indirect github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.1 // indirect + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect github.com/DataDog/datadog-api-client-go/v2 v2.26.0 // indirect github.com/DataDog/datadog-go/v5 v5.5.0 // indirect diff --git a/comp/otelcol/logsagentpipeline/go.mod b/comp/otelcol/logsagentpipeline/go.mod index 1859b4075ba35..38c20efb7ef24 100644 --- a/comp/otelcol/logsagentpipeline/go.mod +++ b/comp/otelcol/logsagentpipeline/go.mod @@ -55,6 +55,7 @@ replace ( github.com/DataDog/datadog-agent/pkg/util/system => ../../../pkg/util/system github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../../pkg/util/system/socket github.com/DataDog/datadog-agent/pkg/util/testutil => ../../../pkg/util/testutil + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../../pkg/util/utilizationtracker github.com/DataDog/datadog-agent/pkg/util/winutil => ../../../pkg/util/winutil github.com/DataDog/datadog-agent/pkg/version => ../../../pkg/version ) @@ -103,6 +104,7 @@ require ( github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/system v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.0 // indirect + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect github.com/DataDog/datadog-agent/pkg/version v0.56.0-rc.3 // indirect github.com/DataDog/dd-sensitive-data-scanner/sds-go/go v0.0.0-20240816154533-f7f9beb53a42 // indirect diff --git a/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/go.mod b/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/go.mod index a39b31a1e35e8..3a7f3981ce689 100644 --- a/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/go.mod +++ b/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/go.mod @@ -56,6 +56,7 @@ replace ( github.com/DataDog/datadog-agent/pkg/util/system => ../../../../pkg/util/system github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../../../pkg/util/system/socket github.com/DataDog/datadog-agent/pkg/util/testutil => ../../../../pkg/util/testutil + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../../../pkg/util/utilizationtracker github.com/DataDog/datadog-agent/pkg/util/winutil => ../../../../pkg/util/winutil github.com/DataDog/datadog-agent/pkg/version => ../../../../pkg/version ) @@ -118,6 +119,7 @@ require ( github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/system v0.57.1 // indirect github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.1 // indirect + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect github.com/DataDog/datadog-agent/pkg/version v0.56.0-rc.3 // indirect github.com/DataDog/dd-sensitive-data-scanner/sds-go/go v0.0.0-20240816154533-f7f9beb53a42 // indirect diff --git a/comp/otelcol/otlp/components/exporter/datadogexporter/go.mod b/comp/otelcol/otlp/components/exporter/datadogexporter/go.mod index 9254ddbb724e6..0908f2867a79b 100644 --- a/comp/otelcol/otlp/components/exporter/datadogexporter/go.mod +++ b/comp/otelcol/otlp/components/exporter/datadogexporter/go.mod @@ -86,6 +86,7 @@ replace ( github.com/DataDog/datadog-agent/pkg/util/system => ../../../../../../pkg/util/system/ github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../../../../../pkg/util/system/socket/ github.com/DataDog/datadog-agent/pkg/util/testutil => ../../../../../../pkg/util/testutil/ + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../../../../../pkg/util/utilizationtracker/ github.com/DataDog/datadog-agent/pkg/util/winutil => ../../../../../../pkg/util/winutil/ github.com/DataDog/datadog-agent/pkg/version => ../../../../../../pkg/version ) @@ -189,6 +190,7 @@ require ( github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/system v0.57.1 // indirect github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.1 // indirect + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect github.com/DataDog/datadog-agent/pkg/version v0.57.1 // indirect github.com/DataDog/datadog-api-client-go/v2 v2.26.0 // indirect diff --git a/go.mod b/go.mod index c915b3d7e4f06..e181eeb8896bb 100644 --- a/go.mod +++ b/go.mod @@ -138,6 +138,7 @@ replace ( github.com/DataDog/datadog-agent/pkg/util/system => ./pkg/util/system github.com/DataDog/datadog-agent/pkg/util/system/socket => ./pkg/util/system/socket/ github.com/DataDog/datadog-agent/pkg/util/testutil => ./pkg/util/testutil + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ./pkg/util/utilizationtracker github.com/DataDog/datadog-agent/pkg/util/uuid => ./pkg/util/uuid github.com/DataDog/datadog-agent/pkg/util/winutil => ./pkg/util/winutil/ github.com/DataDog/datadog-agent/pkg/version => ./pkg/version @@ -605,6 +606,7 @@ require ( github.com/DataDog/datadog-agent/comp/otelcol/ddflareextension/impl v0.0.0-00010101000000-000000000000 github.com/DataDog/datadog-agent/pkg/config/structure v0.60.0-devel github.com/DataDog/datadog-agent/pkg/util/defaultpaths v0.0.0-00010101000000-000000000000 + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 github.com/NVIDIA/go-nvml v0.12.4-0 github.com/containerd/containerd/api v1.8.0 github.com/containerd/errdefs v1.0.0 diff --git a/modules.yml b/modules.yml index e60721fc1dce0..76f89c8ba8211 100644 --- a/modules.yml +++ b/modules.yml @@ -252,6 +252,8 @@ modules: used_by_otel: true pkg/util/testutil: used_by_otel: true + pkg/util/utilizationtracker: + used_by_otel: true pkg/util/uuid: default pkg/util/winutil: used_by_otel: true diff --git a/pkg/collector/worker/worker.go b/pkg/collector/worker/worker.go index 7a366434e317a..00a0b40668135 100644 --- a/pkg/collector/worker/worker.go +++ b/pkg/collector/worker/worker.go @@ -20,6 +20,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/hostname" "github.com/DataDog/datadog-agent/pkg/util/log" + "github.com/DataDog/datadog-agent/pkg/util/utilizationtracker" ) const ( @@ -122,7 +123,8 @@ func newWorkerWithOptions( func (w *Worker) Run() { log.Debugf("Runner %d, worker %d: Ready to process checks...", w.runnerID, w.ID) - utilizationTracker := NewUtilizationTracker(w.Name, w.utilizationTickInterval) + alpha := 0.25 // converges to 99.98% of constant input in 30 iterations. + utilizationTracker := utilizationtracker.NewUtilizationTracker(w.utilizationTickInterval, alpha) defer utilizationTracker.Stop() startUtilizationUpdater(w.Name, utilizationTracker) @@ -146,12 +148,12 @@ func (w *Worker) Run() { expvars.AddRunningCheckCount(1) expvars.SetRunningStats(check.ID(), checkStartTime) - utilizationTracker.CheckStarted() + utilizationTracker.Started() // Run the check checkErr := check.Run() - utilizationTracker.CheckFinished() + utilizationTracker.Finished() expvars.DeleteRunningStats(check.ID()) @@ -210,7 +212,7 @@ func (w *Worker) Run() { log.Debugf("Runner %d, worker %d: Finished processing checks.", w.runnerID, w.ID) } -func startUtilizationUpdater(name string, ut *UtilizationTracker) { +func startUtilizationUpdater(name string, ut *utilizationtracker.UtilizationTracker) { expvars.SetWorkerStats(name, &expvars.WorkerStats{ Utilization: 0.0, }) @@ -229,7 +231,7 @@ func startUtilizationUpdater(name string, ut *UtilizationTracker) { }() } -func startTrackerTicker(ut *UtilizationTracker, interval time.Duration) func() { +func startTrackerTicker(ut *utilizationtracker.UtilizationTracker, interval time.Duration) func() { ticker := time.NewTicker(interval) cancel := make(chan struct{}, 1) done := make(chan struct{}) diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index f368c4272bdb2..8a392b4d5c789 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -1526,6 +1526,9 @@ func logsagent(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("logs_config.dev_mode_use_proto", true) config.BindEnvAndSetDefault("logs_config.dd_url_443", "agent-443-intake.logs.datadoghq.com") config.BindEnvAndSetDefault("logs_config.stop_grace_period", 30) + config.BindEnvAndSetDefault("logs_config.message_channel_size", 100) + config.BindEnvAndSetDefault("logs_config.payload_channel_size", 10) + // maximum time that the unix tailer will hold a log file open after it has been rotated config.BindEnvAndSetDefault("logs_config.close_timeout", 60) // maximum time that the windows tailer will hold a log file open, while waiting for diff --git a/pkg/logs/auditor/auditor.go b/pkg/logs/auditor/auditor.go index cad651a7c7d27..05a196125cf03 100644 --- a/pkg/logs/auditor/auditor.go +++ b/pkg/logs/auditor/auditor.go @@ -16,7 +16,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/status/health" "github.com/DataDog/datadog-agent/pkg/util/log" - "github.com/DataDog/datadog-agent/comp/logs/agent/config" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/message" ) @@ -104,7 +104,7 @@ func (a *RegistryAuditor) Stop() { func (a *RegistryAuditor) createChannels() { a.chansMutex.Lock() defer a.chansMutex.Unlock() - a.inputChan = make(chan *message.Payload, config.ChanSize) + a.inputChan = make(chan *message.Payload, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) a.done = make(chan struct{}) } diff --git a/pkg/logs/auditor/go.mod b/pkg/logs/auditor/go.mod index 350a008e3e91c..59d61c086e1ef 100644 --- a/pkg/logs/auditor/go.mod +++ b/pkg/logs/auditor/go.mod @@ -43,6 +43,7 @@ replace ( require ( github.com/DataDog/datadog-agent/comp/logs/agent/config v0.56.0-rc.3 + github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 github.com/DataDog/datadog-agent/pkg/logs/message v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/sources v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/status/health v0.56.0-rc.3 @@ -56,7 +57,6 @@ require ( github.com/DataDog/datadog-agent/pkg/config/env v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/model v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/nodetreemodel v0.0.0-00010101000000-000000000000 // indirect - github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/structure v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/teeconfig v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/utils v0.56.0-rc.3 // indirect diff --git a/pkg/logs/client/destination.go b/pkg/logs/client/destination.go index b1bfa151bff9c..affb2bc6b7651 100644 --- a/pkg/logs/client/destination.go +++ b/pkg/logs/client/destination.go @@ -6,7 +6,9 @@ //nolint:revive // TODO(AML) Fix revive linter package client -import "github.com/DataDog/datadog-agent/pkg/logs/message" +import ( + "github.com/DataDog/datadog-agent/pkg/logs/message" +) // Destination sends a payload to a specific endpoint over a given network protocol. type Destination interface { @@ -16,6 +18,9 @@ type Destination interface { // Destination target (e.g. https://agent-intake.logs.datadoghq.com) Target() string + // Metadata returns the metadata of the destination + Metadata() *DestinationMetadata + // Start starts the destination send loop. close the intput to stop listening for payloads. stopChan is // signaled when the destination has fully shutdown and all buffered payloads have been flushed. isRetrying is // signaled when the retry state changes. isRetrying can be nil if you don't need to handle retries. diff --git a/pkg/logs/client/destination_metadata.go b/pkg/logs/client/destination_metadata.go new file mode 100644 index 0000000000000..1c4eaa429a559 --- /dev/null +++ b/pkg/logs/client/destination_metadata.go @@ -0,0 +1,54 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//nolint:revive // TODO(AML) Fix revive linter +package client + +import ( + "fmt" +) + +// DestinationMetadata contains metadata about a destination +type DestinationMetadata struct { + componentName string + instanceID string + kind string + endpointId string + ReportingEnabled bool +} + +// NewDestinationMetadata returns a new DestinationMetadata +func NewDestinationMetadata(componentName, instanceID, kind, endpointId string) *DestinationMetadata { + return &DestinationMetadata{ + componentName: componentName, + instanceID: instanceID, + kind: kind, + endpointId: endpointId, + ReportingEnabled: true, + } +} + +// NewNoopDestinationMetadata returns a new DestinationMetadata with reporting disabled +func NewNoopDestinationMetadata() *DestinationMetadata { + return &DestinationMetadata{ + ReportingEnabled: false, + } +} + +// TelemetryName returns the telemetry name for the destination +func (d *DestinationMetadata) TelemetryName() string { + if !d.ReportingEnabled { + return "" + } + return fmt.Sprintf("%s_%s_%s_%s", d.componentName, d.instanceID, d.kind, d.endpointId) +} + +// MonitorTag returns the monitor tag for the destination +func (d *DestinationMetadata) MonitorTag() string { + if !d.ReportingEnabled { + return "" + } + return fmt.Sprintf("destination_%s_%s", d.kind, d.endpointId) +} diff --git a/pkg/logs/client/go.mod b/pkg/logs/client/go.mod index 58bfdf330a8cb..c22dea009faf2 100644 --- a/pkg/logs/client/go.mod +++ b/pkg/logs/client/go.mod @@ -43,6 +43,7 @@ replace ( github.com/DataDog/datadog-agent/pkg/util/system => ../../util/system github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../util/system/socket github.com/DataDog/datadog-agent/pkg/util/testutil => ../../util/testutil + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../util/utilizationtracker github.com/DataDog/datadog-agent/pkg/util/winutil => ../../util/winutil github.com/DataDog/datadog-agent/pkg/version => ../../version ) @@ -87,9 +88,11 @@ require ( github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/system v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.0 // indirect + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect github.com/DataDog/viper v1.13.5 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect diff --git a/pkg/logs/client/go.sum b/pkg/logs/client/go.sum index 447b5e01ec8da..c486d982207c3 100644 --- a/pkg/logs/client/go.sum +++ b/pkg/logs/client/go.sum @@ -12,6 +12,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/pkg/logs/client/http/destination.go b/pkg/logs/client/http/destination.go index 954397f9882e7..7553b7e91d913 100644 --- a/pkg/logs/client/http/destination.go +++ b/pkg/logs/client/http/destination.go @@ -81,8 +81,10 @@ type Destination struct { lastRetryError error // Telemetry - expVars *expvar.Map - telemetryName string + expVars *expvar.Map + destMeta *client.DestinationMetadata + pipelineMonitor metrics.PipelineMonitor + utilization metrics.UtilizationMonitor } // NewDestination returns a new Destination. @@ -94,8 +96,9 @@ func NewDestination(endpoint config.Endpoint, destinationsContext *client.DestinationsContext, maxConcurrentBackgroundSends int, shouldRetry bool, - telemetryName string, - cfg pkgconfigmodel.Reader) *Destination { + destMeta *client.DestinationMetadata, + cfg pkgconfigmodel.Reader, + pipelineMonitor metrics.PipelineMonitor) *Destination { return newDestination(endpoint, contentType, @@ -103,8 +106,9 @@ func NewDestination(endpoint config.Endpoint, time.Second*10, maxConcurrentBackgroundSends, shouldRetry, - telemetryName, - cfg) + destMeta, + cfg, + pipelineMonitor) } func newDestination(endpoint config.Endpoint, @@ -113,8 +117,9 @@ func newDestination(endpoint config.Endpoint, timeout time.Duration, maxConcurrentBackgroundSends int, shouldRetry bool, - telemetryName string, - cfg pkgconfigmodel.Reader) *Destination { + destMeta *client.DestinationMetadata, + cfg pkgconfigmodel.Reader, + pipelineMonitor metrics.PipelineMonitor) *Destination { if maxConcurrentBackgroundSends <= 0 { maxConcurrentBackgroundSends = 1 @@ -130,8 +135,9 @@ func newDestination(endpoint config.Endpoint, expVars := &expvar.Map{} expVars.AddFloat(expVarIdleMsMapKey, 0) expVars.AddFloat(expVarInUseMsMapKey, 0) - if telemetryName != "" { - metrics.DestinationExpVars.Set(telemetryName, expVars) + + if destMeta.ReportingEnabled { + metrics.DestinationExpVars.Set(destMeta.TelemetryName(), expVars) } return &Destination{ @@ -150,8 +156,10 @@ func newDestination(endpoint config.Endpoint, retryLock: sync.Mutex{}, shouldRetry: shouldRetry, expVars: expVars, - telemetryName: telemetryName, + destMeta: destMeta, isMRF: endpoint.IsMRF, + pipelineMonitor: pipelineMonitor, + utilization: pipelineMonitor.MakeUtilizationMonitor(destMeta.MonitorTag()), } } @@ -175,6 +183,11 @@ func (d *Destination) Target() string { return d.url } +// Metadata returns the metadata of the destination +func (d *Destination) Metadata() *client.DestinationMetadata { + return d.destMeta +} + // Start starts reading the input channel func (d *Destination) Start(input chan *message.Payload, output chan *message.Payload, isRetrying chan bool) (stopChan <-chan struct{}) { stop := make(chan struct{}) @@ -186,22 +199,25 @@ func (d *Destination) run(input chan *message.Payload, output chan *message.Payl var startIdle = time.Now() for p := range input { + d.utilization.Start() idle := float64(time.Since(startIdle) / time.Millisecond) d.expVars.AddFloat(expVarIdleMsMapKey, idle) - tlmIdle.Add(idle, d.telemetryName) + tlmIdle.Add(idle, d.destMeta.TelemetryName()) var startInUse = time.Now() d.sendConcurrent(p, output, isRetrying) inUse := float64(time.Since(startInUse) / time.Millisecond) d.expVars.AddFloat(expVarInUseMsMapKey, inUse) - tlmInUse.Add(inUse, d.telemetryName) + tlmInUse.Add(inUse, d.destMeta.TelemetryName()) startIdle = time.Now() + d.utilization.Stop() } // Wait for any pending concurrent sends to finish or terminate d.wg.Wait() d.updateRetryState(nil, isRetrying) + d.utilization.Cancel() stopChan <- struct{}{} } @@ -348,6 +364,7 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) { // internal error. We should retry these requests. return client.NewRetryableError(errServer) } else { + d.pipelineMonitor.ReportComponentEgress(payload, d.destMeta.MonitorTag()) return nil } } @@ -422,7 +439,7 @@ func getMessageTimestamp(messages []*message.Message) int64 { func prepareCheckConnectivity(endpoint config.Endpoint, cfg pkgconfigmodel.Reader) (*client.DestinationsContext, *Destination) { ctx := client.NewDestinationsContext() // Lower the timeout to 5s because HTTP connectivity test is done synchronously during the agent bootstrap sequence - destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, 0, false, "", cfg) + destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, 0, false, client.NewNoopDestinationMetadata(), cfg, metrics.NewNoopPipelineMonitor("")) return ctx, destination } diff --git a/pkg/logs/client/http/destination_test.go b/pkg/logs/client/http/destination_test.go index 085845ff8f2ed..6adf3e7d3148f 100644 --- a/pkg/logs/client/http/destination_test.go +++ b/pkg/logs/client/http/destination_test.go @@ -16,6 +16,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/comp/logs/agent/config" configmock "github.com/DataDog/datadog-agent/pkg/config/mock" @@ -360,7 +361,7 @@ func TestDestinationHA(t *testing.T) { } isEndpointMRF := endpoint.IsMRF - dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), 1, false, "test", configmock.New(t)) + dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), 1, false, client.NewNoopDestinationMetadata(), configmock.New(t), metrics.NewNoopPipelineMonitor("")) isDestMRF := dest.IsMRF() assert.Equal(t, isEndpointMRF, isDestMRF) diff --git a/pkg/logs/client/http/sync_destination.go b/pkg/logs/client/http/sync_destination.go index 62625e6da611b..ed134f6896e8c 100644 --- a/pkg/logs/client/http/sync_destination.go +++ b/pkg/logs/client/http/sync_destination.go @@ -30,11 +30,11 @@ func NewSyncDestination(endpoint config.Endpoint, contentType string, destinationsContext *client.DestinationsContext, senderDoneChan chan *sync.WaitGroup, - telemetryName string, + destMeta *client.DestinationMetadata, cfg pkgconfigmodel.Reader) *SyncDestination { return &SyncDestination{ - destination: newDestination(endpoint, contentType, destinationsContext, time.Second*10, 1, false, telemetryName, cfg), + destination: newDestination(endpoint, contentType, destinationsContext, time.Second*10, 1, false, destMeta, cfg, metrics.NewNoopPipelineMonitor("0")), senderDoneChan: senderDoneChan, } } @@ -49,6 +49,11 @@ func (d *SyncDestination) Target() string { return d.destination.url } +// Metadata returns the metadata of the destination +func (d *SyncDestination) Metadata() *client.DestinationMetadata { + return d.destination.destMeta +} + // Start starts reading the input channel func (d *SyncDestination) Start(input chan *message.Payload, output chan *message.Payload, _ chan bool) (stopChan <-chan struct{}) { stop := make(chan struct{}) @@ -62,7 +67,7 @@ func (d *SyncDestination) run(input chan *message.Payload, output chan *message. for p := range input { idle := float64(time.Since(startIdle) / time.Millisecond) d.destination.expVars.AddFloat(expVarIdleMsMapKey, idle) - tlmIdle.Add(idle, d.destination.telemetryName) + tlmIdle.Add(idle, d.destination.destMeta.TelemetryName()) var startInUse = time.Now() err := d.destination.unconditionalSend(p) @@ -84,7 +89,7 @@ func (d *SyncDestination) run(input chan *message.Payload, output chan *message. inUse := float64(time.Since(startInUse) / time.Millisecond) d.destination.expVars.AddFloat(expVarInUseMsMapKey, inUse) - tlmInUse.Add(inUse, d.destination.telemetryName) + tlmInUse.Add(inUse, d.destination.destMeta.TelemetryName()) startIdle = time.Now() } diff --git a/pkg/logs/client/http/test_utils.go b/pkg/logs/client/http/test_utils.go index 98dea192077fb..c082ec06ed47a 100644 --- a/pkg/logs/client/http/test_utils.go +++ b/pkg/logs/client/http/test_utils.go @@ -15,6 +15,7 @@ import ( "github.com/DataDog/datadog-agent/comp/logs/agent/config" pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" "github.com/DataDog/datadog-agent/pkg/logs/client" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" ) // StatusCodeContainer is a lock around the status code to return @@ -79,7 +80,7 @@ func NewTestServerWithOptions(statusCode int, senders int, retryDestination bool endpoint.BackoffMax = 10 endpoint.RecoveryInterval = 1 - dest := NewDestination(endpoint, JSONContentType, destCtx, senders, retryDestination, "test", cfg) + dest := NewDestination(endpoint, JSONContentType, destCtx, senders, retryDestination, client.NewNoopDestinationMetadata(), cfg, metrics.NewNoopPipelineMonitor("")) return &TestServer{ httpServer: ts, DestCtx: destCtx, diff --git a/pkg/logs/client/tcp/destination.go b/pkg/logs/client/tcp/destination.go index f0ec9c1520649..1934ea2b3c930 100644 --- a/pkg/logs/client/tcp/destination.go +++ b/pkg/logs/client/tcp/destination.go @@ -58,6 +58,11 @@ func (d *Destination) Target() string { return d.connManager.address() } +// Metadata is not supported for TCP destinations +func (d *Destination) Metadata() *client.DestinationMetadata { + return client.NewNoopDestinationMetadata() +} + // Start reads from the input, transforms a message into a frame and sends it to a remote server, func (d *Destination) Start(input chan *message.Payload, output chan *message.Payload, isRetrying chan bool) (stopChan <-chan struct{}) { stop := make(chan struct{}) diff --git a/pkg/logs/diagnostic/go.mod b/pkg/logs/diagnostic/go.mod index 3a16868bf29c8..0ea147ef97297 100644 --- a/pkg/logs/diagnostic/go.mod +++ b/pkg/logs/diagnostic/go.mod @@ -46,6 +46,7 @@ replace ( require ( github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface v0.56.0-rc.3 github.com/DataDog/datadog-agent/comp/logs/agent/config v0.56.0-rc.3 + github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 github.com/DataDog/datadog-agent/pkg/logs/message v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/sources v0.56.0-rc.3 github.com/stretchr/testify v1.9.0 @@ -58,7 +59,6 @@ require ( github.com/DataDog/datadog-agent/pkg/config/env v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/model v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/nodetreemodel v0.0.0-00010101000000-000000000000 // indirect - github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/structure v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/teeconfig v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/utils v0.56.0-rc.3 // indirect diff --git a/pkg/logs/diagnostic/message_receiver.go b/pkg/logs/diagnostic/message_receiver.go index 6a08dddc229d1..3559130757c07 100644 --- a/pkg/logs/diagnostic/message_receiver.go +++ b/pkg/logs/diagnostic/message_receiver.go @@ -9,7 +9,7 @@ import ( "sync" "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface" - "github.com/DataDog/datadog-agent/comp/logs/agent/config" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/message" ) @@ -49,14 +49,14 @@ func NewBufferedMessageReceiver(f Formatter, hostname hostnameinterface.Componen } } return &BufferedMessageReceiver{ - inputChan: make(chan messagePair, config.ChanSize), + inputChan: make(chan messagePair, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")), formatter: f, } } // Start opens new input channel func (b *BufferedMessageReceiver) Start() { - b.inputChan = make(chan messagePair, config.ChanSize) + b.inputChan = make(chan messagePair, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) } // Stop closes the input channel @@ -109,7 +109,7 @@ func (b *BufferedMessageReceiver) HandleMessage(m *message.Message, rendered []b // Filter writes the buffered events from the input channel formatted as a string to the output channel func (b *BufferedMessageReceiver) Filter(filters *Filters, done <-chan struct{}) <-chan string { - out := make(chan string, config.ChanSize) + out := make(chan string, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) go func() { defer close(out) for { diff --git a/pkg/logs/launchers/file/launcher.go b/pkg/logs/launchers/file/launcher.go index 8b8e5075619e4..834749587af49 100644 --- a/pkg/logs/launchers/file/launcher.go +++ b/pkg/logs/launchers/file/launcher.go @@ -21,6 +21,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/launchers" fileprovider "github.com/DataDog/datadog-agent/pkg/logs/launchers/file/provider" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/pipeline" "github.com/DataDog/datadog-agent/pkg/logs/sources" status "github.com/DataDog/datadog-agent/pkg/logs/status/utils" @@ -311,7 +312,8 @@ func (s *Launcher) startNewTailer(file *tailer.File, m config.TailingMode) bool return false } - tailer := s.createTailer(file, s.pipelineProvider.NextPipelineChan()) + channel, monitor := s.pipelineProvider.NextPipelineChanWithMonitor() + tailer := s.createTailer(file, channel, monitor) var offset int64 var whence int @@ -382,16 +384,17 @@ func (s *Launcher) restartTailerAfterFileRotation(oldTailer *tailer.Tailer, file } // createTailer returns a new initialized tailer -func (s *Launcher) createTailer(file *tailer.File, outputChan chan *message.Message) *tailer.Tailer { +func (s *Launcher) createTailer(file *tailer.File, outputChan chan *message.Message, pipelineMonitor metrics.PipelineMonitor) *tailer.Tailer { tailerInfo := status.NewInfoRegistry() tailerOptions := &tailer.TailerOptions{ - OutputChan: outputChan, - File: file, - SleepDuration: s.tailerSleepDuration, - Decoder: decoder.NewDecoderFromSource(file.Source, tailerInfo), - Info: tailerInfo, - TagAdder: s.tagger, + OutputChan: outputChan, + File: file, + SleepDuration: s.tailerSleepDuration, + Decoder: decoder.NewDecoderFromSource(file.Source, tailerInfo), + Info: tailerInfo, + TagAdder: s.tagger, + PipelineMonitor: pipelineMonitor, } return tailer.NewTailer(tailerOptions) diff --git a/pkg/logs/message/message.go b/pkg/logs/message/message.go index e852a97d70ae2..0ad1f53a74b07 100644 --- a/pkg/logs/message/message.go +++ b/pkg/logs/message/message.go @@ -43,6 +43,20 @@ type Payload struct { UnencodedSize int } +// Count returns the number of messages +func (m *Payload) Count() int64 { + return int64(len(m.Messages)) +} + +// Size returns the size of the message. +func (m *Payload) Size() int64 { + var size int64 = 0 + for _, m := range m.Messages { + size += m.Size() + } + return size +} + // Message represents a log line sent to datadog, with its metadata type Message struct { MessageContent @@ -51,7 +65,9 @@ type Message struct { Status string IngestionTimestamp int64 // RawDataLen tracks the original size of the message content before any trimming/transformation. - // This is used when calculating the tailer offset - so this will NOT always be equal to `len(Content)`. + // This is used when calculating the tailer offset - so this will NOT always be equal to `len(Content)` + // This is also used to track the original content size before the message is processed and encoded later + // in the pipeline. RawDataLen int // Tags added on processing ProcessingTags []string @@ -210,6 +226,7 @@ func NewMessage(content []byte, origin *Origin, status string, ingestionTimestam }, Origin: origin, Status: status, + RawDataLen: len(content), IngestionTimestamp: ingestionTimestamp, } } @@ -355,6 +372,16 @@ func (m *Message) TagsToString() string { return m.Origin.TagsToString(m.ProcessingTags) } +// Count returns the number of messages +func (m *Message) Count() int64 { + return 1 +} + +// Size returns the size of the message. +func (m *Message) Size() int64 { + return int64(m.RawDataLen) +} + // TruncatedReasonTag returns a tag with the reason for truncation. func TruncatedReasonTag(reason string) string { return fmt.Sprintf("truncated:%s", reason) diff --git a/pkg/logs/metrics/capacity_monitor.go b/pkg/logs/metrics/capacity_monitor.go new file mode 100644 index 0000000000000..3952a1bef9b67 --- /dev/null +++ b/pkg/logs/metrics/capacity_monitor.go @@ -0,0 +1,81 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package metrics + +import ( + "sync" + "time" +) + +// CapacityMonitor samples the average capacity of a component over a given interval. +// Capacity is calculated as the difference between the ingress and egress of a payload. +// Because data moves very quickly through components, we need to sample and aggregate this value over time. +type CapacityMonitor struct { + sync.Mutex + ingress int64 + ingressBytes int64 + egress int64 + egressBytes int64 + avgItems float64 + avgBytes float64 + name string + instance string + tickChan <-chan time.Time +} + +// NewCapacityMonitor creates a new CapacityMonitor +func NewCapacityMonitor(name, instance string) *CapacityMonitor { + return newCapacityMonitorWithTick(name, instance, time.NewTicker(1*time.Second).C) +} + +// newCapacityMonitorWithTick is used for testing. +func newCapacityMonitorWithTick(name, instance string, tickChan <-chan time.Time) *CapacityMonitor { + return &CapacityMonitor{ + name: name, + instance: instance, + avgItems: 0, + avgBytes: 0, + tickChan: tickChan, + } +} + +// AddIngress records the ingress of a payload +func (i *CapacityMonitor) AddIngress(pl MeasurablePayload) { + i.Lock() + defer i.Unlock() + i.ingress += pl.Count() + i.ingressBytes += pl.Size() + i.sample() +} + +// AddEgress records the egress of a payload +func (i *CapacityMonitor) AddEgress(pl MeasurablePayload) { + i.Lock() + defer i.Unlock() + i.egress += pl.Count() + i.egressBytes += pl.Size() + i.sample() + +} + +func (i *CapacityMonitor) sample() { + select { + case <-i.tickChan: + i.avgItems = ewma(float64(i.ingress-i.egress), i.avgItems) + i.avgBytes = ewma(float64(i.ingressBytes-i.egressBytes), i.avgBytes) + i.report() + default: + } +} + +func ewma(newValue float64, oldValue float64) float64 { + return newValue*ewmaAlpha + (oldValue * (1 - ewmaAlpha)) +} + +func (i *CapacityMonitor) report() { + TlmUtilizationItems.Set(i.avgItems, i.name, i.instance) + TlmUtilizationBytes.Set(i.avgBytes, i.name, i.instance) +} diff --git a/pkg/logs/metrics/capacity_monitor_test.go b/pkg/logs/metrics/capacity_monitor_test.go new file mode 100644 index 0000000000000..939383cd9bbaf --- /dev/null +++ b/pkg/logs/metrics/capacity_monitor_test.go @@ -0,0 +1,56 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package metrics + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type mockPayload struct { + count int64 + size int64 +} + +func (m mockPayload) Size() int64 { + return m.size +} +func (m mockPayload) Count() int64 { + return m.count +} + +func TestCapacityMonitor(t *testing.T) { + + tickChan := make(chan time.Time, 1) + m := newCapacityMonitorWithTick("test", "test", tickChan) + + assert.Equal(t, m.avgItems, 0.0) + assert.Equal(t, m.avgBytes, 0.0) + + // Tick before ingress - causing sample and flush. + // Should converge on 10 + for i := 0; i < 60; i++ { + tickChan <- time.Now() + m.AddIngress(mockPayload{count: 10, size: 10}) + m.AddEgress(mockPayload{count: 10, size: 10}) + } + assert.Greater(t, m.avgItems, 9.0) + assert.Greater(t, m.avgBytes, 9.0) + + // Tick before egress - causing sample and flush. + // Should converge on 0 + for i := 0; i < 60; i++ { + m.AddIngress(mockPayload{count: 10, size: 10}) + tickChan <- time.Now() + m.AddEgress(mockPayload{count: 10, size: 10}) + } + + assert.Less(t, m.avgItems, 1.0) + assert.Less(t, m.avgBytes, 1.0) + +} diff --git a/pkg/logs/metrics/go.mod b/pkg/logs/metrics/go.mod index 993d59e4c15c0..967a7650c564f 100644 --- a/pkg/logs/metrics/go.mod +++ b/pkg/logs/metrics/go.mod @@ -8,10 +8,12 @@ replace ( github.com/DataDog/datadog-agent/pkg/telemetry => ../../telemetry github.com/DataDog/datadog-agent/pkg/util/fxutil => ../../util/fxutil github.com/DataDog/datadog-agent/pkg/util/optional => ../../util/optional + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../util/utilizationtracker ) require ( github.com/DataDog/datadog-agent/pkg/telemetry v0.56.0-rc.3 + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 github.com/stretchr/testify v1.9.0 ) @@ -20,6 +22,7 @@ require ( github.com/DataDog/datadog-agent/comp/def v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/fxutil v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/optional v0.55.0 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/pkg/logs/metrics/go.sum b/pkg/logs/metrics/go.sum index c930aa256d0e3..1e86541fbcdff 100644 --- a/pkg/logs/metrics/go.sum +++ b/pkg/logs/metrics/go.sum @@ -1,3 +1,5 @@ +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= diff --git a/pkg/logs/metrics/metrics.go b/pkg/logs/metrics/metrics.go index 49a6b30bbd597..0f8aaf35e2838 100644 --- a/pkg/logs/metrics/metrics.go +++ b/pkg/logs/metrics/metrics.go @@ -81,6 +81,16 @@ var ( // TlmLogsDiscardedFromSDSBuffer how many messages were dropped when waiting for an SDS configuration because the buffer is full TlmLogsDiscardedFromSDSBuffer = telemetry.NewCounter("logs", "sds__dropped_from_buffer", nil, "Count of messages dropped from the buffer while waiting for an SDS configuration") + + // TlmUtilizationRatio is the utilization ratio of a component. + // Utilization ratio is calculated as the ratio of time spent in use to the total time. + // This metric is internally sampled and exposed as an ewma in order to produce a useable value. + TlmUtilizationRatio = telemetry.NewGauge("logs_component_utilization", "ratio", []string{"name", "instance"}, "Gauge of the utilization ratio of a component") + // TlmUtilizationItems is the capacity of a component by number of elements + // Both the number of items and the number of bytes are aggregated and exposed as a ewma. + TlmUtilizationItems = telemetry.NewGauge("logs_component_utilization", "items", []string{"name", "instance"}, "Gauge of the number of items currently held in a component and it's bufferes") + // TlmUtilizationBytes is the capacity of a component by number of bytes + TlmUtilizationBytes = telemetry.NewGauge("logs_component_utilization", "bytes", []string{"name", "instance"}, "Gauge of the number of bytes currently held in a component and it's bufferes") ) func init() { diff --git a/pkg/logs/metrics/pipeline_monitor.go b/pkg/logs/metrics/pipeline_monitor.go new file mode 100644 index 0000000000000..e98f23c403733 --- /dev/null +++ b/pkg/logs/metrics/pipeline_monitor.go @@ -0,0 +1,112 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package metrics + +import ( + "sync" +) + +const ewmaAlpha = 2 / (float64(30) + 1) // ~ 0.0645 for a 30s window + +// MeasurablePayload represents a payload that can be measured in bytes and count +type MeasurablePayload interface { + Size() int64 + Count() int64 +} + +// PipelineMonitor is an interface for monitoring the capacity of a pipeline. +// Pipeline monitors are used to measure both capacity and utilization of components. +type PipelineMonitor interface { + ID() string + ReportComponentIngress(size MeasurablePayload, name string) + ReportComponentEgress(size MeasurablePayload, name string) + MakeUtilizationMonitor(name string) UtilizationMonitor +} + +// NoopPipelineMonitor is a no-op implementation of PipelineMonitor. +// Some instances of logs components do not need to report capacity metrics and +// should use this implementation. +type NoopPipelineMonitor struct { + instanceID string +} + +// NewNoopPipelineMonitor creates a new no-op pipeline monitor +func NewNoopPipelineMonitor(id string) *NoopPipelineMonitor { + return &NoopPipelineMonitor{ + instanceID: id, + } +} + +// ID returns the instance id of the monitor +func (n *NoopPipelineMonitor) ID() string { + return n.instanceID +} + +// ReportComponentIngress does nothing. +func (n *NoopPipelineMonitor) ReportComponentIngress(_ MeasurablePayload, _ string) {} + +// ReportComponentEgress does nothing. +func (n *NoopPipelineMonitor) ReportComponentEgress(_ MeasurablePayload, _ string) {} + +// MakeUtilizationMonitor returns a no-op utilization monitor. +func (n *NoopPipelineMonitor) MakeUtilizationMonitor(_ string) UtilizationMonitor { + return &NoopUtilizationMonitor{} +} + +// TelemetryPipelineMonitor is a PipelineMonitor that reports capacity metrics to telemetry +type TelemetryPipelineMonitor struct { + monitors map[string]*CapacityMonitor + instanceID string + lock sync.RWMutex +} + +// NewTelemetryPipelineMonitor creates a new pipeline monitort that reports capacity and utiilization metrics as telemetry +func NewTelemetryPipelineMonitor(instanceID string) *TelemetryPipelineMonitor { + return &TelemetryPipelineMonitor{ + monitors: make(map[string]*CapacityMonitor), + instanceID: instanceID, + lock: sync.RWMutex{}, + } +} + +func (c *TelemetryPipelineMonitor) getMonitor(name string) *CapacityMonitor { + key := name + c.instanceID + + c.lock.RLock() + monitor, exists := c.monitors[key] + c.lock.RUnlock() + + if !exists { + c.lock.Lock() + if c.monitors[key] == nil { + c.monitors[key] = NewCapacityMonitor(name, c.instanceID) + } + monitor = c.monitors[key] + c.lock.Unlock() + } + + return monitor +} + +// ID returns the instance id of the monitor +func (c *TelemetryPipelineMonitor) ID() string { + return c.instanceID +} + +// MakeUtilizationMonitor creates a new utilization monitor for a component. +func (c *TelemetryPipelineMonitor) MakeUtilizationMonitor(name string) UtilizationMonitor { + return NewTelemetryUtilizationMonitor(name, c.instanceID) +} + +// ReportComponentIngress reports the ingress of a payload to a component. +func (c *TelemetryPipelineMonitor) ReportComponentIngress(pl MeasurablePayload, name string) { + c.getMonitor(name).AddIngress(pl) +} + +// ReportComponentEgress reports the egress of a payload from a component. +func (c *TelemetryPipelineMonitor) ReportComponentEgress(pl MeasurablePayload, name string) { + c.getMonitor(name).AddEgress(pl) +} diff --git a/pkg/logs/metrics/pipeline_monitor_test.go b/pkg/logs/metrics/pipeline_monitor_test.go new file mode 100644 index 0000000000000..2f96f05c0d7be --- /dev/null +++ b/pkg/logs/metrics/pipeline_monitor_test.go @@ -0,0 +1,46 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package metrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPipelineMonitorTracksCorrectCapacity(t *testing.T) { + pm := NewTelemetryPipelineMonitor("test") + + pm.ReportComponentIngress(mockPayload{count: 1, size: 1}, "1") + pm.ReportComponentIngress(mockPayload{count: 5, size: 5}, "5") + pm.ReportComponentIngress(mockPayload{count: 10, size: 10}, "10") + + assert.Equal(t, pm.getMonitor("1").ingress, int64(1)) + assert.Equal(t, pm.getMonitor("1").ingressBytes, int64(1)) + + assert.Equal(t, pm.getMonitor("5").ingress, int64(5)) + assert.Equal(t, pm.getMonitor("5").ingressBytes, int64(5)) + + assert.Equal(t, pm.getMonitor("10").ingress, int64(10)) + assert.Equal(t, pm.getMonitor("10").ingressBytes, int64(10)) + + pm.ReportComponentEgress(mockPayload{count: 1, size: 1}, "1") + pm.ReportComponentEgress(mockPayload{count: 5, size: 5}, "5") + pm.ReportComponentEgress(mockPayload{count: 10, size: 10}, "10") + + assert.Equal(t, pm.getMonitor("1").egress, int64(1)) + assert.Equal(t, pm.getMonitor("1").egressBytes, int64(1)) + + assert.Equal(t, pm.getMonitor("5").egress, int64(5)) + assert.Equal(t, pm.getMonitor("5").egressBytes, int64(5)) + + assert.Equal(t, pm.getMonitor("10").egress, int64(10)) + assert.Equal(t, pm.getMonitor("10").egressBytes, int64(10)) + + assert.Equal(t, pm.getMonitor("1").ingress-pm.getMonitor("1").egress, int64(0)) + assert.Equal(t, pm.getMonitor("5").ingress-pm.getMonitor("5").egress, int64(0)) + assert.Equal(t, pm.getMonitor("10").ingress-pm.getMonitor("10").egress, int64(0)) +} diff --git a/pkg/logs/metrics/utilization_monitor.go b/pkg/logs/metrics/utilization_monitor.go new file mode 100644 index 0000000000000..704681d784f10 --- /dev/null +++ b/pkg/logs/metrics/utilization_monitor.go @@ -0,0 +1,113 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package metrics + +import ( + "time" + + "github.com/DataDog/datadog-agent/pkg/util/utilizationtracker" +) + +// UtilizationMonitor is an interface for monitoring the utilization of a component. +type UtilizationMonitor interface { + Start() + Stop() + Cancel() +} + +// NoopUtilizationMonitor is a no-op implementation of UtilizationMonitor. +type NoopUtilizationMonitor struct{} + +// Start does nothing. +func (n *NoopUtilizationMonitor) Start() {} + +// Stop does nothing. +func (n *NoopUtilizationMonitor) Stop() {} + +// Cancel does nothing. +func (n *NoopUtilizationMonitor) Cancel() {} + +// TelemetryUtilizationMonitor is a UtilizationMonitor that reports utilization metrics as telemetry. +type TelemetryUtilizationMonitor struct { + name string + instance string + started bool + ut *utilizationtracker.UtilizationTracker + cancel func() +} + +// NewTelemetryUtilizationMonitor creates a new TelemetryUtilizationMonitor. +func NewTelemetryUtilizationMonitor(name, instance string) *TelemetryUtilizationMonitor { + + utilizationTracker := utilizationtracker.NewUtilizationTracker(1*time.Second, ewmaAlpha) + cancel := startTrackerTicker(utilizationTracker, 1*time.Second) + + t := &TelemetryUtilizationMonitor{ + name: name, + instance: instance, + started: false, + ut: utilizationTracker, + cancel: cancel, + } + t.startUtilizationUpdater() + return t +} + +// Start tracks a start event in the utilization tracker. +func (u *TelemetryUtilizationMonitor) Start() { + if u.started { + return + } + u.started = true + u.ut.Started() +} + +// Stop tracks a finish event in the utilization tracker. +func (u *TelemetryUtilizationMonitor) Stop() { + if !u.started { + return + } + u.started = false + u.ut.Finished() +} + +// Cancel stops the monitor. +func (u *TelemetryUtilizationMonitor) Cancel() { + u.cancel() + u.ut.Stop() +} + +func startTrackerTicker(ut *utilizationtracker.UtilizationTracker, interval time.Duration) func() { + ticker := time.NewTicker(interval) + cancel := make(chan struct{}, 1) + done := make(chan struct{}) + go func() { + defer ticker.Stop() + defer close(done) + for { + select { + case <-ticker.C: + ut.Tick() + case <-cancel: + return + } + } + }() + + return func() { + cancel <- struct{}{} + <-done // make sure Tick will not be called after we return. + } +} + +func (u *TelemetryUtilizationMonitor) startUtilizationUpdater() { + TlmUtilizationRatio.Set(0, u.name, u.instance) + go func() { + for value := range u.ut.Output { + TlmUtilizationRatio.Set(value, u.name, u.instance) + } + }() +} diff --git a/pkg/logs/metrics/utilization_monitor_test.go b/pkg/logs/metrics/utilization_monitor_test.go new file mode 100644 index 0000000000000..c549cfaaab55a --- /dev/null +++ b/pkg/logs/metrics/utilization_monitor_test.go @@ -0,0 +1,19 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package metrics + +import ( + "testing" +) + +func TestUtilizationMonitorLifecycle(_ *testing.T) { + // The core logic of the UtilizationMonitor is tested in the utilizationtracker package. + // This test just ensures the lifecycle methods don't block. + um := NewTelemetryUtilizationMonitor("", "") + um.Start() + um.Stop() + um.Cancel() +} diff --git a/pkg/logs/pipeline/go.mod b/pkg/logs/pipeline/go.mod index bcabc871490b5..fdb27aa451753 100644 --- a/pkg/logs/pipeline/go.mod +++ b/pkg/logs/pipeline/go.mod @@ -51,6 +51,7 @@ replace ( github.com/DataDog/datadog-agent/pkg/util/system => ../../util/system github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../util/system/socket github.com/DataDog/datadog-agent/pkg/util/testutil => ../../util/testutil + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../util/utilizationtracker github.com/DataDog/datadog-agent/pkg/util/winutil => ../../util/winutil github.com/DataDog/datadog-agent/pkg/version => ../../version ) @@ -59,10 +60,12 @@ require ( github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface v0.56.0-rc.3 github.com/DataDog/datadog-agent/comp/logs/agent/config v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/config/model v0.57.0 + github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 github.com/DataDog/datadog-agent/pkg/logs/auditor v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/client v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/diagnostic v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/message v0.56.0-rc.3 + github.com/DataDog/datadog-agent/pkg/logs/metrics v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/processor v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/sds v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/sender v0.56.0-rc.3 @@ -83,11 +86,9 @@ require ( github.com/DataDog/datadog-agent/pkg/collector/check/defaults v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/env v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/nodetreemodel v0.0.0-00010101000000-000000000000 // indirect - github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/structure v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/teeconfig v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/utils v0.56.0-rc.3 // indirect - github.com/DataDog/datadog-agent/pkg/logs/metrics v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/logs/sources v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/telemetry v0.56.0-rc.3 // indirect @@ -103,6 +104,7 @@ require ( github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/system v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.0 // indirect + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect github.com/DataDog/datadog-agent/pkg/version v0.56.0-rc.3 // indirect github.com/DataDog/dd-sensitive-data-scanner/sds-go/go v0.0.0-20240816154533-f7f9beb53a42 // indirect diff --git a/pkg/logs/pipeline/mock/mock.go b/pkg/logs/pipeline/mock/mock.go index 3d07560754a79..448ea1fb2416f 100644 --- a/pkg/logs/pipeline/mock/mock.go +++ b/pkg/logs/pipeline/mock/mock.go @@ -10,6 +10,7 @@ import ( "context" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/pipeline" ) @@ -52,3 +53,8 @@ func (p *mockProvider) Flush(_ context.Context) {} func (p *mockProvider) NextPipelineChan() chan *message.Message { return p.msgChan } + +// NextPipelineChanWithInstance returns the next pipeline +func (p *mockProvider) NextPipelineChanWithMonitor() (chan *message.Message, metrics.PipelineMonitor) { + return p.msgChan, metrics.NewNoopPipelineMonitor("") +} diff --git a/pkg/logs/pipeline/pipeline.go b/pkg/logs/pipeline/pipeline.go index 0a050d38481ad..b0136dac860d9 100644 --- a/pkg/logs/pipeline/pipeline.go +++ b/pkg/logs/pipeline/pipeline.go @@ -8,17 +8,19 @@ package pipeline import ( "context" - "fmt" + "strconv" "sync" "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface" "github.com/DataDog/datadog-agent/comp/logs/agent/config" pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/client/http" "github.com/DataDog/datadog-agent/pkg/logs/client/tcp" "github.com/DataDog/datadog-agent/pkg/logs/diagnostic" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/processor" "github.com/DataDog/datadog-agent/pkg/logs/sender" "github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface" @@ -26,13 +28,14 @@ import ( // Pipeline processes and sends messages to the backend type Pipeline struct { - InputChan chan *message.Message - flushChan chan struct{} - processor *processor.Processor - strategy sender.Strategy - sender *sender.Sender - serverless bool - flushWg *sync.WaitGroup + InputChan chan *message.Message + flushChan chan struct{} + processor *processor.Processor + strategy sender.Strategy + sender *sender.Sender + serverless bool + flushWg *sync.WaitGroup + pipelineMonitor metrics.PipelineMonitor } // NewPipeline returns a new Pipeline @@ -53,10 +56,11 @@ func NewPipeline(outputChan chan *message.Payload, senderDoneChan = make(chan *sync.WaitGroup) flushWg = &sync.WaitGroup{} } + pipelineMonitor := metrics.NewTelemetryPipelineMonitor(strconv.Itoa(pipelineID)) - mainDestinations := getDestinations(endpoints, destinationsContext, pipelineID, serverless, senderDoneChan, status, cfg) + mainDestinations := getDestinations(endpoints, destinationsContext, pipelineMonitor, serverless, senderDoneChan, status, cfg) - strategyInput := make(chan *message.Message, config.ChanSize) + strategyInput := make(chan *message.Message, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) senderInput := make(chan *message.Payload, 1) // Only buffer 1 message since payloads can be large flushChan := make(chan struct{}) @@ -73,22 +77,23 @@ func NewPipeline(outputChan chan *message.Payload, encoder = processor.RawEncoder } - strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, flushWg, pipelineID) - logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, config.DestinationPayloadChanSize, senderDoneChan, flushWg) + strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, flushWg, pipelineMonitor) + logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, pkgconfigsetup.Datadog().GetInt("logs_config.payload_channel_size"), senderDoneChan, flushWg, pipelineMonitor) - inputChan := make(chan *message.Message, config.ChanSize) + inputChan := make(chan *message.Message, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) processor := processor.New(cfg, inputChan, strategyInput, processingRules, - encoder, diagnosticMessageReceiver, hostname, pipelineID) + encoder, diagnosticMessageReceiver, hostname, pipelineMonitor) return &Pipeline{ - InputChan: inputChan, - flushChan: flushChan, - processor: processor, - strategy: strategy, - sender: logsSender, - serverless: serverless, - flushWg: flushWg, + InputChan: inputChan, + flushChan: flushChan, + processor: processor, + strategy: strategy, + sender: logsSender, + serverless: serverless, + flushWg: flushWg, + pipelineMonitor: pipelineMonitor, } } @@ -117,25 +122,25 @@ func (p *Pipeline) Flush(ctx context.Context) { } } -func getDestinations(endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, pipelineID int, serverless bool, senderDoneChan chan *sync.WaitGroup, status statusinterface.Status, cfg pkgconfigmodel.Reader) *client.Destinations { +func getDestinations(endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, pipelineMonitor metrics.PipelineMonitor, serverless bool, senderDoneChan chan *sync.WaitGroup, status statusinterface.Status, cfg pkgconfigmodel.Reader) *client.Destinations { reliable := []client.Destination{} additionals := []client.Destination{} if endpoints.UseHTTP { for i, endpoint := range endpoints.GetReliableEndpoints() { - telemetryName := fmt.Sprintf("logs_%d_reliable_%d", pipelineID, i) + destMeta := client.NewDestinationMetadata("logs", pipelineMonitor.ID(), "reliable", strconv.Itoa(i)) if serverless { - reliable = append(reliable, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, telemetryName, cfg)) + reliable = append(reliable, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, destMeta, cfg)) } else { - reliable = append(reliable, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, telemetryName, cfg)) + reliable = append(reliable, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, destMeta, cfg, pipelineMonitor)) } } for i, endpoint := range endpoints.GetUnReliableEndpoints() { - telemetryName := fmt.Sprintf("logs_%d_unreliable_%d", pipelineID, i) + destMeta := client.NewDestinationMetadata("logs", pipelineMonitor.ID(), "unreliable", strconv.Itoa(i)) if serverless { - additionals = append(additionals, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, telemetryName, cfg)) + additionals = append(additionals, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, destMeta, cfg)) } else { - additionals = append(additionals, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, telemetryName, cfg)) + additionals = append(additionals, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, destMeta, cfg, pipelineMonitor)) } } return client.NewDestinations(reliable, additionals) @@ -151,13 +156,13 @@ func getDestinations(endpoints *config.Endpoints, destinationsContext *client.De } //nolint:revive // TODO(AML) Fix revive linter -func getStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, endpoints *config.Endpoints, serverless bool, flushWg *sync.WaitGroup, _ int) sender.Strategy { +func getStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, endpoints *config.Endpoints, serverless bool, flushWg *sync.WaitGroup, pipelineMonitor metrics.PipelineMonitor) sender.Strategy { if endpoints.UseHTTP || serverless { encoder := sender.IdentityContentType if endpoints.Main.UseCompression { encoder = sender.NewGzipContentEncoding(endpoints.Main.CompressionLevel) } - return sender.NewBatchStrategy(inputChan, outputChan, flushChan, serverless, flushWg, sender.ArraySerializer, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder) + return sender.NewBatchStrategy(inputChan, outputChan, flushChan, serverless, flushWg, sender.ArraySerializer, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder, pipelineMonitor) } return sender.NewStreamStrategy(inputChan, outputChan, sender.IdentityContentType) } diff --git a/pkg/logs/pipeline/provider.go b/pkg/logs/pipeline/provider.go index 54d3b947a1313..9ee6ec8a5dfa0 100644 --- a/pkg/logs/pipeline/provider.go +++ b/pkg/logs/pipeline/provider.go @@ -18,6 +18,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/diagnostic" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sds" "github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface" "github.com/DataDog/datadog-agent/pkg/util/log" @@ -32,6 +33,7 @@ type Provider interface { ReconfigureSDSAgentConfig(config []byte) (bool, error) StopSDSProcessing() error NextPipelineChan() chan *message.Message + NextPipelineChanWithMonitor() (chan *message.Message, metrics.PipelineMonitor) // Flush flushes all pipeline contained in this Provider Flush(ctx context.Context) } @@ -181,6 +183,17 @@ func (p *provider) NextPipelineChan() chan *message.Message { return nextPipeline.InputChan } +// NextPipelineChanWithMonitor returns the next pipeline input channel with it's monitor. +func (p *provider) NextPipelineChanWithMonitor() (chan *message.Message, metrics.PipelineMonitor) { + pipelinesLen := len(p.pipelines) + if pipelinesLen == 0 { + return nil, nil + } + index := p.currentPipelineIndex.Inc() % uint32(pipelinesLen) + nextPipeline := p.pipelines[index] + return nextPipeline.InputChan, nextPipeline.pipelineMonitor +} + // Flush flushes synchronously all the contained pipeline of this provider. func (p *provider) Flush(ctx context.Context) { for _, p := range p.pipelines { diff --git a/pkg/logs/processor/go.mod b/pkg/logs/processor/go.mod index aed531f08a410..6822a25264eb9 100644 --- a/pkg/logs/processor/go.mod +++ b/pkg/logs/processor/go.mod @@ -42,6 +42,7 @@ replace ( github.com/DataDog/datadog-agent/pkg/util/system => ../../util/system github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../util/system/socket github.com/DataDog/datadog-agent/pkg/util/testutil => ../../util/testutil + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../util/utilizationtracker github.com/DataDog/datadog-agent/pkg/util/winutil => ../../util/winutil github.com/DataDog/datadog-agent/pkg/version => ../../version ) @@ -83,11 +84,13 @@ require ( github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/system v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.0 // indirect + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect github.com/DataDog/datadog-agent/pkg/version v0.56.0-rc.3 // indirect github.com/DataDog/dd-sensitive-data-scanner/sds-go/go v0.0.0-20240816154533-f7f9beb53a42 // indirect github.com/DataDog/viper v1.13.5 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect diff --git a/pkg/logs/processor/go.sum b/pkg/logs/processor/go.sum index 3bb0e66aa4a99..8cd52742efd7b 100644 --- a/pkg/logs/processor/go.sum +++ b/pkg/logs/processor/go.sum @@ -16,6 +16,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/pkg/logs/processor/processor.go b/pkg/logs/processor/processor.go index f66a8c0c48a4d..186b022fc0572 100644 --- a/pkg/logs/processor/processor.go +++ b/pkg/logs/processor/processor.go @@ -26,7 +26,6 @@ const UnstructuredProcessingMetricName = "datadog.logs_agent.tailer.unstructured // A Processor updates messages from an inputChan and pushes // in an outputChan. type Processor struct { - pipelineID int inputChan chan *message.Message outputChan chan *message.Message // strategy input // ReconfigChan transports rules to use in order to reconfigure @@ -40,6 +39,10 @@ type Processor struct { hostname hostnameinterface.Component sds sdsProcessor + + // Telemetry + pipelineMonitor metrics.PipelineMonitor + utilization metrics.UtilizationMonitor } type sdsProcessor struct { @@ -58,13 +61,12 @@ type sdsProcessor struct { // New returns an initialized Processor. func New(cfg pkgconfigmodel.Reader, inputChan, outputChan chan *message.Message, processingRules []*config.ProcessingRule, encoder Encoder, diagnosticMessageReceiver diagnostic.MessageReceiver, hostname hostnameinterface.Component, - pipelineID int) *Processor { + pipelineMonitor metrics.PipelineMonitor) *Processor { waitForSDSConfig := sds.ShouldBufferUntilSDSConfiguration(cfg) maxBufferSize := sds.WaitForConfigurationBufferMaxSize(cfg) return &Processor{ - pipelineID: pipelineID, inputChan: inputChan, outputChan: outputChan, // strategy input ReconfigChan: make(chan sds.ReconfigureOrder), @@ -73,12 +75,14 @@ func New(cfg pkgconfigmodel.Reader, inputChan, outputChan chan *message.Message, done: make(chan struct{}), diagnosticMessageReceiver: diagnosticMessageReceiver, hostname: hostname, + pipelineMonitor: pipelineMonitor, + utilization: pipelineMonitor.MakeUtilizationMonitor("processor"), sds: sdsProcessor{ // will immediately starts buffering if it has been configured as so buffering: waitForSDSConfig, maxBufferSize: maxBufferSize, - scanner: sds.CreateScanner(pipelineID), + scanner: sds.CreateScanner(pipelineMonitor.ID()), }, } } @@ -124,6 +128,7 @@ func (p *Processor) Flush(ctx context.Context) { func (p *Processor) run() { defer func() { p.done <- struct{}{} + p.utilization.Cancel() }() for { @@ -217,6 +222,9 @@ func (s *sdsProcessor) resetBuffer() { } func (p *Processor) processMessage(msg *message.Message) { + p.utilization.Start() + defer p.utilization.Stop() + defer p.pipelineMonitor.ReportComponentEgress(msg, "processor") metrics.LogsDecoded.Add(1) metrics.TlmLogsDecoded.Inc() @@ -241,8 +249,11 @@ func (p *Processor) processMessage(msg *message.Message) { return } + p.utilization.Stop() // Explicitly call stop here to avoid counting writing on the output channel as processing time p.outputChan <- msg + p.pipelineMonitor.ReportComponentIngress(msg, "strategy") } + } // applyRedactingRules returns given a message if we should process it or not, diff --git a/pkg/logs/processor/processor_test.go b/pkg/logs/processor/processor_test.go index bb2ff56b02461..236246c174c14 100644 --- a/pkg/logs/processor/processor_test.go +++ b/pkg/logs/processor/processor_test.go @@ -17,6 +17,7 @@ import ( "github.com/DataDog/datadog-agent/comp/logs/agent/config" "github.com/DataDog/datadog-agent/pkg/logs/diagnostic" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sds" "github.com/DataDog/datadog-agent/pkg/logs/sources" ) @@ -314,6 +315,7 @@ func TestBuffering(t *testing.T) { } hostnameComponent, _ := hostnameinterface.NewMock("testHostnameFromEnvVar") + pm := metrics.NewNoopPipelineMonitor("") p := &Processor{ encoder: JSONEncoder, @@ -326,8 +328,10 @@ func TestBuffering(t *testing.T) { sds: sdsProcessor{ maxBufferSize: len("hello1world") + len("hello2world") + len("hello3world") + 1, buffering: true, - scanner: sds.CreateScanner(42), + scanner: sds.CreateScanner("42"), }, + pipelineMonitor: pm, + utilization: pm.MakeUtilizationMonitor("processor"), } var processedMessages atomic.Int32 diff --git a/pkg/logs/sds/scanner.go b/pkg/logs/sds/scanner.go index 581fe810a7fbb..b0caf689efbfd 100644 --- a/pkg/logs/sds/scanner.go +++ b/pkg/logs/sds/scanner.go @@ -11,15 +11,15 @@ package sds import ( "encoding/json" "fmt" - "strconv" "strings" "sync" "time" + sds "github.com/DataDog/dd-sensitive-data-scanner/sds-go/go" + "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" - sds "github.com/DataDog/dd-sensitive-data-scanner/sds-go/go" ) const ScannedTag = "sds_agent:true" @@ -34,7 +34,7 @@ var ( tlmSDSReconfigSuccess = telemetry.NewCounterWithOpts("sds", "reconfiguration_success", []string{"pipeline", "type"}, "Count of SDS reconfiguration success.", telemetry.Options{DefaultMetric: true}) tlmSDSProcessingLatency = telemetry.NewSimpleHistogram("sds", "processing_latency", "Processing latency histogram", - []float64{10, 250, 500, 2000, 5000, 10000}) // unit: us + []float64{10, 250, 500, 2000, 5000, 10000}) // unit: us ) // Scanner wraps an SDS Scanner implementation, adds reconfiguration @@ -63,8 +63,8 @@ type Scanner struct { // CreateScanner creates an SDS scanner. // Use `Reconfigure` to configure it manually. -func CreateScanner(pipelineID int) *Scanner { - scanner := &Scanner{pipelineID: strconv.Itoa(pipelineID)} +func CreateScanner(pipelineID string) *Scanner { + scanner := &Scanner{pipelineID: pipelineID} log.Debugf("creating a new SDS scanner (internal id: %p)", scanner) return scanner } diff --git a/pkg/logs/sds/scanner_nosds.go b/pkg/logs/sds/scanner_nosds.go index 0f1d256f6917a..c1db02cdea4b7 100644 --- a/pkg/logs/sds/scanner_nosds.go +++ b/pkg/logs/sds/scanner_nosds.go @@ -24,7 +24,7 @@ type Match struct { } // CreateScanner creates a scanner for unsupported platforms/architectures. -func CreateScanner(_ int) *Scanner { +func CreateScanner(_ string) *Scanner { return nil } diff --git a/pkg/logs/sds/scanner_test.go b/pkg/logs/sds/scanner_test.go index bf27ea97ae8e0..4e099d2aec7cb 100644 --- a/pkg/logs/sds/scanner_test.go +++ b/pkg/logs/sds/scanner_test.go @@ -13,9 +13,10 @@ import ( "testing" "time" - "github.com/DataDog/datadog-agent/pkg/logs/message" sds "github.com/DataDog/dd-sensitive-data-scanner/sds-go/go" "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/pkg/logs/message" ) func TestCreateScanner(t *testing.T) { @@ -68,7 +69,7 @@ func TestCreateScanner(t *testing.T) { // scanner creation // ----- - s := CreateScanner(0) + s := CreateScanner("") require.NotNil(s, "the scanner should not be nil after a creation") @@ -245,7 +246,7 @@ func TestEmptyConfiguration(t *testing.T) { ]} `) - s := CreateScanner(0) + s := CreateScanner("") require.NotNil(s, "the scanner should not be nil after a creation") @@ -350,7 +351,7 @@ func TestIsReady(t *testing.T) { // scanner creation // ----- - s := CreateScanner(0) + s := CreateScanner("") require.NotNil(s, "the scanner should not be nil after a creation") require.False(s.IsReady(), "at this stage, the scanner should not be considered ready, no definitions received") @@ -420,7 +421,7 @@ func TestScan(t *testing.T) { // scanner creation // ----- - s := CreateScanner(0) + s := CreateScanner("") require.NotNil(s, "the returned scanner should not be nil") isActive, _ := s.Reconfigure(ReconfigureOrder{ @@ -509,7 +510,7 @@ func TestCloseCycleScan(t *testing.T) { // ----- for i := 0; i < 10; i++ { - s := CreateScanner(0) + s := CreateScanner("") require.NotNil(s, "the returned scanner should not be nil") _, _ = s.Reconfigure(ReconfigureOrder{ diff --git a/pkg/logs/sender/batch_strategy.go b/pkg/logs/sender/batch_strategy.go index 4949f4a4e708f..cfb2ef8655d82 100644 --- a/pkg/logs/sender/batch_strategy.go +++ b/pkg/logs/sender/batch_strategy.go @@ -13,6 +13,7 @@ import ( "github.com/benbjohnson/clock" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -36,6 +37,10 @@ type batchStrategy struct { contentEncoding ContentEncoding stopChan chan struct{} // closed when the goroutine has finished clock clock.Clock + + // Telemtry + pipelineMonitor metrics.PipelineMonitor + utilization metrics.UtilizationMonitor } // NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits @@ -49,8 +54,9 @@ func NewBatchStrategy(inputChan chan *message.Message, maxBatchSize int, maxContentSize int, pipelineName string, - contentEncoding ContentEncoding) Strategy { - return newBatchStrategyWithClock(inputChan, outputChan, flushChan, serverless, flushWg, serializer, batchWait, maxBatchSize, maxContentSize, pipelineName, clock.New(), contentEncoding) + contentEncoding ContentEncoding, + pipelineMonitor metrics.PipelineMonitor) Strategy { + return newBatchStrategyWithClock(inputChan, outputChan, flushChan, serverless, flushWg, serializer, batchWait, maxBatchSize, maxContentSize, pipelineName, clock.New(), contentEncoding, pipelineMonitor) } func newBatchStrategyWithClock(inputChan chan *message.Message, @@ -64,7 +70,8 @@ func newBatchStrategyWithClock(inputChan chan *message.Message, maxContentSize int, pipelineName string, clock clock.Clock, - contentEncoding ContentEncoding) Strategy { + contentEncoding ContentEncoding, + pipelineMonitor metrics.PipelineMonitor) Strategy { return &batchStrategy{ inputChan: inputChan, @@ -79,6 +86,8 @@ func newBatchStrategyWithClock(inputChan chan *message.Message, stopChan: make(chan struct{}), pipelineName: pipelineName, clock: clock, + pipelineMonitor: pipelineMonitor, + utilization: pipelineMonitor.MakeUtilizationMonitor("strategy"), } } @@ -98,6 +107,7 @@ func (s *batchStrategy) Start() { defer func() { s.flushBuffer(s.outputChan) flushTicker.Stop() + s.utilization.Cancel() close(s.stopChan) }() for { @@ -144,6 +154,7 @@ func (s *batchStrategy) flushBuffer(outputChan chan *message.Payload) { if s.buffer.IsEmpty() { return } + s.utilization.Start() messages := s.buffer.GetMessages() s.buffer.Clear() // Logging specifically for DBM pipelines, which seem to fail to send more often than other pipelines. @@ -161,6 +172,7 @@ func (s *batchStrategy) sendMessages(messages []*message.Message, outputChan cha encodedPayload, err := s.contentEncoding.encode(serializedMessage) if err != nil { log.Warn("Encoding failed - dropping payload", err) + s.utilization.Stop() return } @@ -169,10 +181,14 @@ func (s *batchStrategy) sendMessages(messages []*message.Message, outputChan cha s.flushWg.Add(1) } - outputChan <- &message.Payload{ + p := &message.Payload{ Messages: messages, Encoded: encodedPayload, Encoding: s.contentEncoding.name(), UnencodedSize: len(serializedMessage), } + s.utilization.Stop() + outputChan <- p + s.pipelineMonitor.ReportComponentEgress(p, "strategy") + s.pipelineMonitor.ReportComponentIngress(p, "sender") } diff --git a/pkg/logs/sender/batch_strategy_test.go b/pkg/logs/sender/batch_strategy_test.go index ff1f6bae1b107..34cb6be7aa4e9 100644 --- a/pkg/logs/sender/batch_strategy_test.go +++ b/pkg/logs/sender/batch_strategy_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" ) func TestBatchStrategySendsPayloadWhenBufferIsFull(t *testing.T) { @@ -20,7 +21,7 @@ func TestBatchStrategySendsPayloadWhenBufferIsFull(t *testing.T) { output := make(chan *message.Payload) flushChan := make(chan struct{}) - s := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{}) + s := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{}, metrics.NewNoopPipelineMonitor("")) s.Start() message1 := message.NewMessage([]byte("a"), nil, "", 0) @@ -52,7 +53,7 @@ func TestBatchStrategySendsPayloadWhenBufferIsOutdated(t *testing.T) { timerInterval := 100 * time.Millisecond clk := clock.NewMock() - s := newBatchStrategyWithClock(input, output, flushChan, false, nil, LineSerializer, timerInterval, 100, 100, "test", clk, &identityContentType{}) + s := newBatchStrategyWithClock(input, output, flushChan, false, nil, LineSerializer, timerInterval, 100, 100, "test", clk, &identityContentType{}, metrics.NewNoopPipelineMonitor("")) s.Start() for round := 0; round < 3; round++ { @@ -77,7 +78,7 @@ func TestBatchStrategySendsPayloadWhenClosingInput(t *testing.T) { flushChan := make(chan struct{}) clk := clock.NewMock() - s := newBatchStrategyWithClock(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", clk, &identityContentType{}) + s := newBatchStrategyWithClock(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", clk, &identityContentType{}, metrics.NewNoopPipelineMonitor("")) s.Start() message := message.NewMessage([]byte("a"), nil, "", 0) @@ -102,7 +103,7 @@ func TestBatchStrategyShouldNotBlockWhenStoppingGracefully(t *testing.T) { output := make(chan *message.Payload) flushChan := make(chan struct{}) - s := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{}) + s := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{}, metrics.NewNoopPipelineMonitor("")) s.Start() message := message.NewMessage([]byte{}, nil, "", 0) @@ -126,7 +127,7 @@ func TestBatchStrategySynchronousFlush(t *testing.T) { // batch size is large so it will not flush until we trigger it manually // flush time is large so it won't automatically trigger during this test - strategy := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{}) + strategy := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{}, metrics.NewNoopPipelineMonitor("")) strategy.Start() // all of these messages will get buffered @@ -171,7 +172,7 @@ func TestBatchStrategyFlushChannel(t *testing.T) { // batch size is large so it will not flush until we trigger it manually // flush time is large so it won't automatically trigger during this test - strategy := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{}) + strategy := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{}, metrics.NewNoopPipelineMonitor("")) strategy.Start() // all of these messages will get buffered diff --git a/pkg/logs/sender/destination_sender_test.go b/pkg/logs/sender/destination_sender_test.go index d2ab54715a4f0..3aa930e437e54 100644 --- a/pkg/logs/sender/destination_sender_test.go +++ b/pkg/logs/sender/destination_sender_test.go @@ -13,6 +13,7 @@ import ( configmock "github.com/DataDog/datadog-agent/pkg/config/mock" pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" + "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/message" ) @@ -32,6 +33,10 @@ func (m *mockDestination) Target() string { return "mock-dest" } +func (m *mockDestination) Metadata() *client.DestinationMetadata { + return client.NewNoopDestinationMetadata() +} + func (m *mockDestination) Start(input chan *message.Payload, output chan *message.Payload, isRetrying chan bool) (stopChan <-chan struct{}) { m.input = input m.output = output diff --git a/pkg/logs/sender/go.mod b/pkg/logs/sender/go.mod index 750b501605d92..8a1b84cfc1339 100644 --- a/pkg/logs/sender/go.mod +++ b/pkg/logs/sender/go.mod @@ -44,6 +44,7 @@ replace ( github.com/DataDog/datadog-agent/pkg/util/system => ../../util/system github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../util/system/socket github.com/DataDog/datadog-agent/pkg/util/testutil => ../../util/testutil + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../util/utilizationtracker github.com/DataDog/datadog-agent/pkg/util/winutil => ../../util/winutil github.com/DataDog/datadog-agent/pkg/version => ../../version ) @@ -54,6 +55,7 @@ require ( github.com/DataDog/datadog-agent/pkg/config/model v0.57.0 github.com/DataDog/datadog-agent/pkg/logs/client v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/message v0.56.0-rc.3 + github.com/DataDog/datadog-agent/pkg/logs/metrics v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/sources v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/telemetry v0.56.0-rc.3 @@ -73,7 +75,6 @@ require ( github.com/DataDog/datadog-agent/pkg/config/structure v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/teeconfig v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/utils v0.56.0-rc.3 // indirect - github.com/DataDog/datadog-agent/pkg/logs/metrics v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/backoff v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/executable v0.57.1 // indirect @@ -87,6 +88,7 @@ require ( github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/system v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.0 // indirect + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect github.com/DataDog/datadog-agent/pkg/version v0.56.0-rc.3 // indirect github.com/DataDog/viper v1.13.5 // indirect diff --git a/pkg/logs/sender/sender.go b/pkg/logs/sender/sender.go index 31fa4db0bb382..48e9c6b22d936 100644 --- a/pkg/logs/sender/sender.go +++ b/pkg/logs/sender/sender.go @@ -13,6 +13,7 @@ import ( pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/telemetry" ) @@ -38,10 +39,13 @@ type Sender struct { bufferSize int senderDoneChan chan *sync.WaitGroup flushWg *sync.WaitGroup + + pipelineMonitor metrics.PipelineMonitor + utilization metrics.UtilizationMonitor } // NewSender returns a new sender. -func NewSender(config pkgconfigmodel.Reader, inputChan chan *message.Payload, outputChan chan *message.Payload, destinations *client.Destinations, bufferSize int, senderDoneChan chan *sync.WaitGroup, flushWg *sync.WaitGroup) *Sender { +func NewSender(config pkgconfigmodel.Reader, inputChan chan *message.Payload, outputChan chan *message.Payload, destinations *client.Destinations, bufferSize int, senderDoneChan chan *sync.WaitGroup, flushWg *sync.WaitGroup, pipelineMonitor metrics.PipelineMonitor) *Sender { return &Sender{ config: config, inputChan: inputChan, @@ -51,6 +55,10 @@ func NewSender(config pkgconfigmodel.Reader, inputChan chan *message.Payload, ou bufferSize: bufferSize, senderDoneChan: senderDoneChan, flushWg: flushWg, + + // Telemetry + pipelineMonitor: pipelineMonitor, + utilization: pipelineMonitor.MakeUtilizationMonitor("sender"), } } @@ -73,6 +81,7 @@ func (s *Sender) run() { unreliableDestinations := buildDestinationSenders(s.config, s.destinations.Unreliable, sink, s.bufferSize) for payload := range s.inputChan { + s.utilization.Start() var startInUse = time.Now() senderDoneWg := &sync.WaitGroup{} @@ -80,6 +89,9 @@ func (s *Sender) run() { for !sent { for _, destSender := range reliableDestinations { if destSender.Send(payload) { + if destSender.destination.Metadata().ReportingEnabled { + s.pipelineMonitor.ReportComponentIngress(payload, destSender.destination.Metadata().MonitorTag()) + } sent = true if s.senderDoneChan != nil { senderDoneWg.Add(1) @@ -121,6 +133,7 @@ func (s *Sender) run() { inUse := float64(time.Since(startInUse) / time.Millisecond) tlmSendWaitTime.Add(inUse) + s.utilization.Stop() if s.senderDoneChan != nil && s.flushWg != nil { // Wait for all destinations to finish sending the payload @@ -128,6 +141,7 @@ func (s *Sender) run() { // Decrement the wait group when this payload has been sent s.flushWg.Done() } + s.pipelineMonitor.ReportComponentEgress(payload, "sender") } // Cleanup the destinations @@ -138,6 +152,7 @@ func (s *Sender) run() { destSender.Stop() } close(sink) + s.utilization.Cancel() s.done <- struct{}{} } diff --git a/pkg/logs/sender/sender_test.go b/pkg/logs/sender/sender_test.go index 5fd09caf501d4..4f35558a46974 100644 --- a/pkg/logs/sender/sender_test.go +++ b/pkg/logs/sender/sender_test.go @@ -17,6 +17,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/client/mock" "github.com/DataDog/datadog-agent/pkg/logs/client/tcp" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sources" "github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface" ) @@ -45,7 +46,7 @@ func TestSender(t *testing.T) { destinations := client.NewDestinations([]client.Destination{destination}, nil) cfg := configmock.New(t) - sender := NewSender(cfg, input, output, destinations, 0, nil, nil) + sender := NewSender(cfg, input, output, destinations, 0, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() expectedMessage := newMessage([]byte("fake line"), source, "") @@ -73,7 +74,7 @@ func TestSenderSingleDestination(t *testing.T) { destinations := client.NewDestinations([]client.Destination{server.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} @@ -103,7 +104,7 @@ func TestSenderDualReliableDestination(t *testing.T) { destinations := client.NewDestinations([]client.Destination{server1.Destination, server2.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} @@ -138,7 +139,7 @@ func TestSenderUnreliableAdditionalDestination(t *testing.T) { destinations := client.NewDestinations([]client.Destination{server1.Destination}, []client.Destination{server2.Destination}) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} @@ -170,7 +171,7 @@ func TestSenderUnreliableStopsWhenMainFails(t *testing.T) { destinations := client.NewDestinations([]client.Destination{reliableServer.Destination}, []client.Destination{unreliableServer.Destination}) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} @@ -219,7 +220,7 @@ func TestSenderReliableContinuseWhenOneFails(t *testing.T) { destinations := client.NewDestinations([]client.Destination{reliableServer1.Destination, reliableServer2.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} @@ -265,7 +266,7 @@ func TestSenderReliableWhenOneFailsAndRecovers(t *testing.T) { destinations := client.NewDestinations([]client.Destination{reliableServer1.Destination, reliableServer2.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} diff --git a/pkg/logs/tailers/file/tailer.go b/pkg/logs/tailers/file/tailer.go index b9d96b982fcf8..5bf78c627b04c 100644 --- a/pkg/logs/tailers/file/tailer.go +++ b/pkg/logs/tailers/file/tailer.go @@ -116,20 +116,22 @@ type Tailer struct { // blocked sending to the tailer's outputChan. stopForward context.CancelFunc - info *status.InfoRegistry - bytesRead *status.CountInfo - movingSum *util.MovingSum + info *status.InfoRegistry + bytesRead *status.CountInfo + movingSum *util.MovingSum + PipelineMonitor metrics.PipelineMonitor } // TailerOptions holds all possible parameters that NewTailer requires in addition to optional parameters that can be optionally passed into. This can be used for more optional parameters if required in future type TailerOptions struct { - OutputChan chan *message.Message // Required - File *File // Required - SleepDuration time.Duration // Required - Decoder *decoder.Decoder // Required - Info *status.InfoRegistry // Required - Rotated bool // Optional - TagAdder tag.EntityTagAdder // Required + OutputChan chan *message.Message // Required + File *File // Required + SleepDuration time.Duration // Required + Decoder *decoder.Decoder // Required + Info *status.InfoRegistry // Required + Rotated bool // Optional + TagAdder tag.EntityTagAdder // Required + PipelineMonitor metrics.PipelineMonitor // Required } // NewTailer returns an initialized Tailer, read to be started. @@ -182,6 +184,7 @@ func NewTailer(opts *TailerOptions) *Tailer { info: opts.Info, bytesRead: bytesRead, movingSum: movingSum, + PipelineMonitor: opts.PipelineMonitor, } if fileRotated { @@ -202,13 +205,14 @@ func addToTailerInfo(k, m string, tailerInfo *status.InfoRegistry) { // messages to the same channel but using an updated file and decoder. func (t *Tailer) NewRotatedTailer(file *File, decoder *decoder.Decoder, info *status.InfoRegistry, tagAdder tag.EntityTagAdder) *Tailer { options := &TailerOptions{ - OutputChan: t.outputChan, - File: file, - SleepDuration: t.sleepDuration, - Decoder: decoder, - Info: info, - Rotated: true, - TagAdder: tagAdder, + OutputChan: t.outputChan, + File: file, + SleepDuration: t.sleepDuration, + Decoder: decoder, + Info: info, + Rotated: true, + TagAdder: tagAdder, + PipelineMonitor: t.PipelineMonitor, } return NewTailer(options) @@ -359,13 +363,15 @@ func (t *Tailer) forwardMessages() { if len(output.GetContent()) == 0 { continue } + + msg := message.NewMessage(output.GetContent(), origin, output.Status, output.IngestionTimestamp) // Make the write to the output chan cancellable to be able to stop the tailer // after a file rotation when it is stuck on it. // We don't return directly to keep the same shutdown sequence that in the // normal case. select { - // XXX(remy): is it ok recreating a message like this here? - case t.outputChan <- message.NewMessage(output.GetContent(), origin, output.Status, output.IngestionTimestamp): + case t.outputChan <- msg: + t.PipelineMonitor.ReportComponentIngress(msg, "processor") case <-t.forwardContext.Done(): } } diff --git a/pkg/logs/tailers/file/tailer_nix.go b/pkg/logs/tailers/file/tailer_nix.go index a4af026781133..681396e03fbca 100644 --- a/pkg/logs/tailers/file/tailer_nix.go +++ b/pkg/logs/tailers/file/tailer_nix.go @@ -56,6 +56,7 @@ func (t *Tailer) read() (int, error) { return 0, nil } t.lastReadOffset.Add(int64(n)) - t.decoder.InputChan <- decoder.NewInput(inBuf[:n]) + msg := decoder.NewInput(inBuf[:n]) + t.decoder.InputChan <- msg return n, nil } diff --git a/pkg/logs/tailers/file/tailer_test.go b/pkg/logs/tailers/file/tailer_test.go index d26c17d25224c..794a6df4557f9 100644 --- a/pkg/logs/tailers/file/tailer_test.go +++ b/pkg/logs/tailers/file/tailer_test.go @@ -21,6 +21,7 @@ import ( pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/internal/decoder" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sources" status "github.com/DataDog/datadog-agent/pkg/logs/status/utils" ) @@ -57,11 +58,12 @@ func (suite *TailerTestSuite) SetupTest() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -111,11 +113,12 @@ func (suite *TailerTestSuite) TestTailerTimeDurationConfig() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } tailer := NewTailer(tailerOptions) @@ -278,11 +281,12 @@ func (suite *TailerTestSuite) TestDirTagWhenTailingFiles() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, dirTaggedSource, true), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, dirTaggedSource, true), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -308,11 +312,12 @@ func (suite *TailerTestSuite) TestBuildTagsFileOnly() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, dirTaggedSource, false), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, dirTaggedSource, false), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -335,11 +340,12 @@ func (suite *TailerTestSuite) TestBuildTagsFileDir() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, dirTaggedSource, true), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, dirTaggedSource, true), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -366,11 +372,12 @@ func (suite *TailerTestSuite) TestTruncatedTag() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, source, true), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, source, true), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -398,11 +405,12 @@ func (suite *TailerTestSuite) TestMutliLineAutoDetect() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, suite.source.UnderlyingSource(), true), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, suite.source.UnderlyingSource(), true), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -433,11 +441,12 @@ func (suite *TailerTestSuite) TestDidRotateNilFullpath() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } tailer := NewTailer(tailerOptions) diff --git a/pkg/util/utilizationtracker/doc.go b/pkg/util/utilizationtracker/doc.go new file mode 100644 index 0000000000000..f039c62deb991 --- /dev/null +++ b/pkg/util/utilizationtracker/doc.go @@ -0,0 +1,7 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// Package utilizationtracker provides a utility to track the utilization of a component. +package utilizationtracker diff --git a/pkg/util/utilizationtracker/go.mod b/pkg/util/utilizationtracker/go.mod new file mode 100644 index 0000000000000..fd269ca0657a7 --- /dev/null +++ b/pkg/util/utilizationtracker/go.mod @@ -0,0 +1,14 @@ +module github.com/DataDog/datadog-agent/pkg/util/utilizationtracker + +go 1.22.0 + +require ( + github.com/benbjohnson/clock v1.3.5 + github.com/stretchr/testify v1.9.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/pkg/util/utilizationtracker/go.sum b/pkg/util/utilizationtracker/go.sum new file mode 100644 index 0000000000000..29fbd520c2821 --- /dev/null +++ b/pkg/util/utilizationtracker/go.sum @@ -0,0 +1,12 @@ +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/collector/worker/utilization_tracker.go b/pkg/util/utilizationtracker/utilization_tracker.go similarity index 61% rename from pkg/collector/worker/utilization_tracker.go rename to pkg/util/utilizationtracker/utilization_tracker.go index 24127081983d4..b57d14b4b4778 100644 --- a/pkg/collector/worker/utilization_tracker.go +++ b/pkg/util/utilizationtracker/utilization_tracker.go @@ -3,7 +3,8 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016-present Datadog, Inc. -package worker +// Package utilizationtracker provides a utility to track the utilization of a component. +package utilizationtracker import ( "time" @@ -14,12 +15,12 @@ import ( type trackerEvent int const ( - checkStarted trackerEvent = iota - checkStopped + started trackerEvent = iota + stopped trackerTick ) -//nolint:revive // TODO(AML) Fix revive linter +// UtilizationTracker tracks the utilization of a component. type UtilizationTracker struct { Output chan float64 @@ -32,9 +33,9 @@ type UtilizationTracker struct { // alpha is the ewma smoothing factor. alpha float64 - checkStarted time.Time - nextTick time.Time - interval time.Duration + started time.Time + nextTick time.Time + interval time.Duration clock clock.Clock } @@ -42,23 +43,20 @@ type UtilizationTracker struct { // NewUtilizationTracker instantiates and configures a utilization tracker that // calculates the values and publishes them to expvars func NewUtilizationTracker( - workerName string, interval time.Duration, + alpha float64, ) *UtilizationTracker { return newUtilizationTrackerWithClock( - workerName, interval, clock.New(), + alpha, ) } // newUtilizationTrackerWithClock is primarely used for testing. -// // Does not start the background goroutines, so that the tests can call update() to get // deterministic results. -// -//nolint:revive // TODO(AML) Fix revive linter -func newUtilizationTrackerWithClock(_ string, interval time.Duration, clk clock.Clock) *UtilizationTracker { +func newUtilizationTrackerWithClock(interval time.Duration, clk clock.Clock, alpha float64) *UtilizationTracker { ut := &UtilizationTracker{ clock: clk, @@ -66,9 +64,8 @@ func newUtilizationTrackerWithClock(_ string, interval time.Duration, clk clock. nextTick: clk.Now(), interval: interval, - alpha: 0.25, // converges to 99.98% of constant input in 30 iterations. - - Output: make(chan float64, 1), + alpha: alpha, + Output: make(chan float64, 1), } go ut.run() @@ -86,12 +83,12 @@ func (ut *UtilizationTracker) run() { // invariant: ut.nextTick > now switch ev { - case checkStarted: - // invariant: ut.nextTick > ut.checkStarted - ut.checkStarted = now - case checkStopped: - ut.busy += now.Sub(ut.checkStarted) - ut.checkStarted = time.Time{} + case started: + // invariant: ut.nextTick > ut.started + ut.started = now + case stopped: + ut.busy += now.Sub(ut.started) + ut.started = time.Time{} case trackerTick: // nothing, just tick } @@ -100,10 +97,10 @@ func (ut *UtilizationTracker) run() { func (ut *UtilizationTracker) update(now time.Time) { for ut.nextTick.Before(now) { - if !ut.checkStarted.IsZero() { - // invariant: ut.nextTick > ut.checkStarted - ut.busy += ut.nextTick.Sub(ut.checkStarted) - ut.checkStarted = ut.nextTick + if !ut.started.IsZero() { + // invariant: ut.nextTick > ut.started + ut.busy += ut.nextTick.Sub(ut.started) + ut.started = ut.nextTick } update := float64(ut.busy) / float64(ut.interval) @@ -116,32 +113,32 @@ func (ut *UtilizationTracker) update(now time.Time) { ut.Output <- ut.value } -// Stop should be invoked when a worker is about to exit -// so that we can remove the instance's expvars +// Stop should be invoked when a component is about to exit +// so that we can clean up the instances resources. func (ut *UtilizationTracker) Stop() { // The user will not send anything anymore close(ut.eventsChan) } -// Tick updates to the utilization during intervals where no check were started or stopped. +// Tick updates to the utilization during intervals where no component were started or stopped. // // Produces one value on the Output channel. func (ut *UtilizationTracker) Tick() { ut.eventsChan <- trackerTick } -// CheckStarted should be invoked when a worker's check is about to run so that we can track the +// Started should be invoked when a compnent's work is about to being so that we can track the // start time and the utilization. // // Produces one value on the Output channel. -func (ut *UtilizationTracker) CheckStarted() { - ut.eventsChan <- checkStarted +func (ut *UtilizationTracker) Started() { + ut.eventsChan <- started } -// CheckFinished should be invoked when a worker's check is complete so that we can calculate the -// utilization of the linked worker. +// Finished should be invoked when a compnent's work is complete so that we can calculate the +// utilization of the compoennt. // // Produces one value on the Output channel. -func (ut *UtilizationTracker) CheckFinished() { - ut.eventsChan <- checkStopped +func (ut *UtilizationTracker) Finished() { + ut.eventsChan <- stopped } diff --git a/pkg/collector/worker/utilization_tracker_test.go b/pkg/util/utilizationtracker/utilization_tracker_test.go similarity index 94% rename from pkg/collector/worker/utilization_tracker_test.go rename to pkg/util/utilizationtracker/utilization_tracker_test.go index 9fef376d2c6b4..52af4667d6fdb 100644 --- a/pkg/collector/worker/utilization_tracker_test.go +++ b/pkg/util/utilizationtracker/utilization_tracker_test.go @@ -3,7 +3,8 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016-present Datadog, Inc. -package worker +// Package utilizationtracker provides a utility to track the utilization of a component. +package utilizationtracker import ( "math/rand" @@ -21,9 +22,9 @@ import ( func newTracker(_ *testing.T) (*UtilizationTracker, *clock.Mock) { clk := clock.NewMock() ut := newUtilizationTrackerWithClock( - "worker", 100*time.Millisecond, clk, + 0.25, ) return ut, clk @@ -49,7 +50,7 @@ func TestUtilizationTracker(t *testing.T) { clk.Add(300 * time.Millisecond) // Ramp up the expected utilization - ut.CheckStarted() + ut.Started() //nolint:revive // TODO(AML) Fix revive linter old, new = new, <-ut.Output require.Equal(t, old, new) @@ -67,7 +68,7 @@ func TestUtilizationTracker(t *testing.T) { require.Greater(t, new, old) // Ramp down the expected utilization - ut.CheckFinished() + ut.Finished() //nolint:revive // TODO(AML) Fix revive linter old, new = new, <-ut.Output require.Equal(t, old, new) //no time have passed @@ -99,7 +100,7 @@ func TestUtilizationTrackerCheckLifecycle(t *testing.T) { for idx := 0; idx < 3; idx++ { // Ramp up utilization - ut.CheckStarted() + ut.Started() //nolint:revive // TODO(AML) Fix revive linter old, new = new, <-ut.Output assert.Equal(t, old, new) @@ -117,7 +118,7 @@ func TestUtilizationTrackerCheckLifecycle(t *testing.T) { assert.Greater(t, new, old) // Ramp down utilization - ut.CheckFinished() + ut.Finished() //nolint:revive // TODO(AML) Fix revive linter old, new = new, <-ut.Output assert.Equal(t, new, old) @@ -151,13 +152,13 @@ func TestUtilizationTrackerAccuracy(t *testing.T) { totalMs := r.Int31n(100) + 100 runtimeMs := (totalMs * 30) / 100 - ut.CheckStarted() + ut.Started() <-ut.Output runtimeDuration := time.Duration(runtimeMs) * time.Millisecond clk.Add(runtimeDuration) - ut.CheckFinished() + ut.Finished() val = <-ut.Output idleDuration := time.Duration(totalMs-runtimeMs) * time.Millisecond diff --git a/test/otel/go.mod b/test/otel/go.mod index 74eae7bb38d3f..c946c20d60843 100644 --- a/test/otel/go.mod +++ b/test/otel/go.mod @@ -88,6 +88,7 @@ replace ( github.com/DataDog/datadog-agent/pkg/util/system => ./../../pkg/util/system github.com/DataDog/datadog-agent/pkg/util/system/socket => ./../../pkg/util/system/socket github.com/DataDog/datadog-agent/pkg/util/testutil => ./../../pkg/util/testutil + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ./../../pkg/util/utilizationtracker github.com/DataDog/datadog-agent/pkg/util/winutil => ./../../pkg/util/winutil github.com/DataDog/datadog-agent/pkg/version => ./../../pkg/version ) @@ -174,6 +175,7 @@ require ( github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/system v0.57.1 // indirect github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.1 // indirect + github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect github.com/DataDog/datadog-agent/pkg/version v0.57.1 // indirect github.com/DataDog/datadog-api-client-go/v2 v2.26.0 // indirect diff --git a/test/regression/cases/file_to_blackhole_0ms_latency/experiment.yaml b/test/regression/cases/file_to_blackhole_0ms_latency/experiment.yaml index 4884b1e7a2964..b445834c940d6 100644 --- a/test/regression/cases/file_to_blackhole_0ms_latency/experiment.yaml +++ b/test/regression/cases/file_to_blackhole_0ms_latency/experiment.yaml @@ -36,4 +36,4 @@ checks: description: "Available bytes not polled by log Agent" bounds: series: lost_bytes - upper_bound: 0KB + upper_bound: 0KiB diff --git a/test/regression/cases/file_to_blackhole_1000ms_latency/experiment.yaml b/test/regression/cases/file_to_blackhole_1000ms_latency/experiment.yaml index 215dcfa86cc48..e19b8685b4079 100644 --- a/test/regression/cases/file_to_blackhole_1000ms_latency/experiment.yaml +++ b/test/regression/cases/file_to_blackhole_1000ms_latency/experiment.yaml @@ -31,3 +31,9 @@ checks: series: total_rss_bytes # The machine has 12GiB free. upper_bound: 1.2GiB + + - name: lost_bytes + description: "Allowable bytes not polled by log Agent" + bounds: + series: lost_bytes + upper_bound: 0KiB diff --git a/test/regression/cases/file_to_blackhole_100ms_latency/experiment.yaml b/test/regression/cases/file_to_blackhole_100ms_latency/experiment.yaml index 64fe41015db6f..e19b8685b4079 100644 --- a/test/regression/cases/file_to_blackhole_100ms_latency/experiment.yaml +++ b/test/regression/cases/file_to_blackhole_100ms_latency/experiment.yaml @@ -36,4 +36,4 @@ checks: description: "Allowable bytes not polled by log Agent" bounds: series: lost_bytes - upper_bound: 0KB + upper_bound: 0KiB diff --git a/test/regression/cases/file_to_blackhole_300ms_latency/experiment.yaml b/test/regression/cases/file_to_blackhole_300ms_latency/experiment.yaml index 215dcfa86cc48..e19b8685b4079 100644 --- a/test/regression/cases/file_to_blackhole_300ms_latency/experiment.yaml +++ b/test/regression/cases/file_to_blackhole_300ms_latency/experiment.yaml @@ -31,3 +31,9 @@ checks: series: total_rss_bytes # The machine has 12GiB free. upper_bound: 1.2GiB + + - name: lost_bytes + description: "Allowable bytes not polled by log Agent" + bounds: + series: lost_bytes + upper_bound: 0KiB diff --git a/test/regression/cases/file_to_blackhole_500ms_latency/experiment.yaml b/test/regression/cases/file_to_blackhole_500ms_latency/experiment.yaml index 215dcfa86cc48..e19b8685b4079 100644 --- a/test/regression/cases/file_to_blackhole_500ms_latency/experiment.yaml +++ b/test/regression/cases/file_to_blackhole_500ms_latency/experiment.yaml @@ -31,3 +31,9 @@ checks: series: total_rss_bytes # The machine has 12GiB free. upper_bound: 1.2GiB + + - name: lost_bytes + description: "Allowable bytes not polled by log Agent" + bounds: + series: lost_bytes + upper_bound: 0KiB