From 9b31710f2fa852724f3688a26a1af641ff8e0d8d Mon Sep 17 00:00:00 2001 From: Alexandre Yang Date: Sun, 17 Nov 2024 20:09:15 +0100 Subject: [PATCH] add comp --- .../demultiplexerimpl/demultiplexer.go | 3 +++ .../test_agent_demultiplexer.go | 2 +- .../sendermanager.go | 10 +------- pkg/aggregator/aggregator.go | 9 ++++--- pkg/aggregator/aggregator_test.go | 10 ++++---- pkg/aggregator/check_sampler_bench_test.go | 2 +- pkg/aggregator/demultiplexer_agent.go | 25 ++++--------------- pkg/aggregator/demultiplexer_agent_test.go | 4 +-- pkg/aggregator/demultiplexer_mock.go | 2 +- pkg/aggregator/demultiplexer_test.go | 4 +-- pkg/aggregator/mocksender/mocksender.go | 2 +- pkg/aggregator/sender_test.go | 2 +- .../snmp/integration_profile_bundle_test.go | 2 +- .../snmp/integration_profile_metadata_test.go | 2 +- .../snmp/integration_topology_test.go | 6 ++--- .../snmp/internal/checkconfig/config_test.go | 4 +-- .../corechecks/systemd/systemd_test.go | 2 +- test/benchmarks/kubernetes_state/main.go | 2 +- 18 files changed, 38 insertions(+), 55 deletions(-) diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go index c50c6528029a80..b029f3f546f9a6 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go @@ -9,6 +9,7 @@ package demultiplexerimpl import ( "context" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "go.uber.org/fx" demultiplexerComp "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" @@ -42,6 +43,7 @@ type dependencies struct { SharedForwarder defaultforwarder.Component OrchestratorForwarder orchestratorforwarder.Component EventPlatformForwarder eventplatform.Component + HaAgent haagent.Component Compressor compression.Component Tagger tagger.Component @@ -86,6 +88,7 @@ func newDemultiplexer(deps dependencies) (provides, error) { deps.OrchestratorForwarder, options, deps.EventPlatformForwarder, + deps.HaAgent, deps.Compressor, deps.Tagger, hostnameDetected, diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go b/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go index 4ac27d7d644600..019f6e6331af3f 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go @@ -185,6 +185,6 @@ func initTestAgentDemultiplexerWithFlushInterval(log log.Component, hostname hos sharedForwarder := defaultforwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), log, sharedForwarderOptions) orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostname)) - demux := aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, compressor, noopimpl.NewComponent(), "hostname") + demux := aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, nil, compressor, noopimpl.NewComponent(), "hostname") return NewTestAgentDemultiplexer(demux) } diff --git a/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go b/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go index 73de2ac99cca78..d4cb2080430602 100644 --- a/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go +++ b/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go @@ -72,15 +72,7 @@ func (sender *diagnoseSenderManager) LazyGetSenderManager() (sender.SenderManage forwarder := defaultforwarder.NewDefaultForwarder(config, log, defaultforwarder.NewOptions(config, log, nil)) orchestratorForwarder := optional.NewOptionPtr[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(sender.deps.Hostname)) - senderManager = aggregator.InitAndStartAgentDemultiplexer( - log, - forwarder, - orchestratorForwarder, - opts, - eventPlatformForwarder, - sender.deps.Compressor, - sender.deps.Tagger, - hostnameDetected) + senderManager = aggregator.InitAndStartAgentDemultiplexer(log, forwarder, orchestratorForwarder, opts, eventPlatformForwarder, nil, sender.deps.Compressor, sender.deps.Tagger, hostnameDetected) sender.senderManager.Set(senderManager) return senderManager, nil diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index a2b2e8406b00ff..becf0338970422 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -16,6 +16,7 @@ import ( tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" "github.com/DataDog/datadog-agent/comp/core/tagger/types" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" "github.com/DataDog/datadog-agent/pkg/config/model" @@ -252,6 +253,7 @@ type BufferedAggregator struct { flushMutex sync.Mutex // to start multiple flushes in parallel serializer serializer.MetricSerializer eventPlatformForwarder eventplatform.Component + haAgent haagent.Component hostname string hostnameUpdate chan string hostnameUpdateDone chan struct{} // signals that the hostname update is finished @@ -283,7 +285,7 @@ func NewFlushAndSerializeInParallel(config model.Config) FlushAndSerializeInPara } // NewBufferedAggregator instantiates a BufferedAggregator -func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator { +func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator { bufferSize := pkgconfigsetup.Datadog().GetInt("aggregator_buffer_size") agentName := flavor.GetFlavor() @@ -326,6 +328,7 @@ func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder flushInterval: flushInterval, serializer: s, eventPlatformForwarder: eventPlatformForwarder, + haAgent: haAgent, hostname: hostname, hostnameUpdate: make(chan string), hostnameUpdateDone: make(chan struct{}), @@ -861,8 +864,8 @@ func (agg *BufferedAggregator) tags(withVersion bool) []string { tags = append(tags, "package_version:"+version.AgentPackageVersion) } } - if haagent.IsEnabled() { - tags = append(tags, "ha_agent_group:"+haagent.GetGroup()) + if agg.haAgent != nil && agg.haAgent.Enabled() { + tags = append(tags, "ha_agent_group:"+agg.haAgent.GetGroup()) } // nil to empty string // This is expected by other components/tests diff --git a/pkg/aggregator/aggregator_test.go b/pkg/aggregator/aggregator_test.go index 64c33496891c7c..7c27d7fb9a320f 100644 --- a/pkg/aggregator/aggregator_test.go +++ b/pkg/aggregator/aggregator_test.go @@ -146,7 +146,7 @@ func TestAddServiceCheckDefaultValues(t *testing.T) { s := &MockSerializerIterableSerie{} taggerComponent := taggerMock.SetupFakeTagger(t) - agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) + agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) agg.addServiceCheck(servicecheck.ServiceCheck{ // leave Host and Ts fields blank @@ -179,7 +179,7 @@ func TestAddEventDefaultValues(t *testing.T) { s := &MockSerializerIterableSerie{} taggerComponent := taggerMock.SetupFakeTagger(t) - agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) + agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) agg.addEvent(event.Event{ // only populate required fields @@ -229,7 +229,7 @@ func TestDefaultData(t *testing.T) { s := &MockSerializerIterableSerie{} taggerComponent := taggerMock.SetupFakeTagger(t) - agg := NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval) + agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "hostname", DefaultFlushInterval) start := time.Now() @@ -585,7 +585,7 @@ func TestTags(t *testing.T) { taggerComponent := taggerMock.SetupFakeTagger(t) - agg := NewBufferedAggregator(nil, nil, taggerComponent, tt.hostname, time.Second) + agg := NewBufferedAggregator(nil, nil, nil, taggerComponent, tt.hostname, time.Second) agg.agentTags = tt.agentTags agg.globalTags = tt.globalTags assert.ElementsMatch(t, tt.want, agg.tags(tt.withVersion)) @@ -619,7 +619,7 @@ func TestAddDJMRecurrentSeries(t *testing.T) { s := &MockSerializerIterableSerie{} // NewBufferedAggregator with DJM enable will create a new recurrentSeries taggerComponent := taggerMock.SetupFakeTagger(t) - NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval) + NewBufferedAggregator(s, nil, nil, taggerComponent, "hostname", DefaultFlushInterval) expectedRecurrentSeries := metrics.Series{&metrics.Serie{ Name: "datadog.djm.agent_host", diff --git a/pkg/aggregator/check_sampler_bench_test.go b/pkg/aggregator/check_sampler_bench_test.go index dbb6e8b57bf450..d77198b5847823 100644 --- a/pkg/aggregator/check_sampler_bench_test.go +++ b/pkg/aggregator/check_sampler_bench_test.go @@ -51,7 +51,7 @@ func benchmarkAddBucket(bucketValue int64, b *testing.B) { sharedForwarder := forwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), deps.Log, forwarderOpts) orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname)) - demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.Compressor, taggerComponent, "hostname") + demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, nil, deps.Compressor, taggerComponent, "hostname") defer demux.Stop(true) checkSampler := newCheckSampler(1, true, true, 1000, tags.NewStore(true, "bench"), checkid.ID("hello:world:1234"), taggerComponent) diff --git a/pkg/aggregator/demultiplexer_agent.go b/pkg/aggregator/demultiplexer_agent.go index a420537fccd254..37792ef7159ee8 100644 --- a/pkg/aggregator/demultiplexer_agent.go +++ b/pkg/aggregator/demultiplexer_agent.go @@ -17,6 +17,7 @@ import ( forwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/comp/serializer/compression" "github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" @@ -119,29 +120,13 @@ type dataOutputs struct { // InitAndStartAgentDemultiplexer creates a new Demultiplexer and runs what's necessary // in goroutines. As of today, only the embedded BufferedAggregator needs a separate goroutine. // In the future, goroutines will be started for the event platform forwarder and/or orchestrator forwarder. -func InitAndStartAgentDemultiplexer( - log log.Component, - sharedForwarder forwarder.Forwarder, - orchestratorForwarder orchestratorforwarder.Component, - options AgentDemultiplexerOptions, - eventPlatformForwarder eventplatform.Component, - compressor compression.Component, - tagger tagger.Component, - hostname string) *AgentDemultiplexer { - demux := initAgentDemultiplexer(log, sharedForwarder, orchestratorForwarder, options, eventPlatformForwarder, compressor, tagger, hostname) +func InitAndStartAgentDemultiplexer(log log.Component, sharedForwarder forwarder.Forwarder, orchestratorForwarder orchestratorforwarder.Component, options AgentDemultiplexerOptions, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, compressor compression.Component, tagger tagger.Component, hostname string) *AgentDemultiplexer { + demux := initAgentDemultiplexer(log, sharedForwarder, orchestratorForwarder, options, eventPlatformForwarder, haAgent, compressor, tagger, hostname) go demux.run() return demux } -func initAgentDemultiplexer( - log log.Component, - sharedForwarder forwarder.Forwarder, - orchestratorForwarder orchestratorforwarder.Component, - options AgentDemultiplexerOptions, - eventPlatformForwarder eventplatform.Component, - compressor compression.Component, - tagger tagger.Component, - hostname string) *AgentDemultiplexer { +func initAgentDemultiplexer(log log.Component, sharedForwarder forwarder.Forwarder, orchestratorForwarder orchestratorforwarder.Component, options AgentDemultiplexerOptions, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, compressor compression.Component, tagger tagger.Component, hostname string) *AgentDemultiplexer { // prepare the multiple forwarders // ------------------------------- if pkgconfigsetup.Datadog().GetBool("telemetry.enabled") && pkgconfigsetup.Datadog().GetBool("telemetry.dogstatsd_origin") && !pkgconfigsetup.Datadog().GetBool("aggregator_use_tags_store") { @@ -157,7 +142,7 @@ func initAgentDemultiplexer( // prepare the embedded aggregator // -- - agg := NewBufferedAggregator(sharedSerializer, eventPlatformForwarder, tagger, hostname, options.FlushInterval) + agg := NewBufferedAggregator(sharedSerializer, eventPlatformForwarder, haAgent, tagger, hostname, options.FlushInterval) // statsd samplers // --------------- diff --git a/pkg/aggregator/demultiplexer_agent_test.go b/pkg/aggregator/demultiplexer_agent_test.go index f2703abc6ec174..a14edcb65e0b0e 100644 --- a/pkg/aggregator/demultiplexer_agent_test.go +++ b/pkg/aggregator/demultiplexer_agent_test.go @@ -65,7 +65,7 @@ func TestDemuxNoAggOptionDisabled(t *testing.T) { opts := demuxTestOptions() deps := createDemultiplexerAgentTestDeps(t) - demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, deps.Compressor, deps.Tagger, "") + demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, nil, deps.Compressor, deps.Tagger, "") batch := testDemuxSamples(t) @@ -87,7 +87,7 @@ func TestDemuxNoAggOptionEnabled(t *testing.T) { mockSerializer.On("AreSketchesEnabled").Return(true) opts.EnableNoAggregationPipeline = true deps := createDemultiplexerAgentTestDeps(t) - demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, deps.Compressor, deps.Tagger, "") + demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, nil, deps.Compressor, deps.Tagger, "") demux.statsd.noAggStreamWorker.serializer = mockSerializer // the no agg pipeline will use our mocked serializer go demux.run() diff --git a/pkg/aggregator/demultiplexer_mock.go b/pkg/aggregator/demultiplexer_mock.go index 917a7887959578..4299e9e4c97300 100644 --- a/pkg/aggregator/demultiplexer_mock.go +++ b/pkg/aggregator/demultiplexer_mock.go @@ -33,5 +33,5 @@ type TestDeps struct { func InitAndStartAgentDemultiplexerForTest(deps TestDeps, options AgentDemultiplexerOptions, hostname string) *AgentDemultiplexer { orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname)) - return InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.Compressor, nooptagger.NewComponent(), hostname) + return InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, nil, deps.Compressor, nooptagger.NewComponent(), hostname) } diff --git a/pkg/aggregator/demultiplexer_test.go b/pkg/aggregator/demultiplexer_test.go index 6c3dd77976bdff..7c1ced19fa84f5 100644 --- a/pkg/aggregator/demultiplexer_test.go +++ b/pkg/aggregator/demultiplexer_test.go @@ -173,7 +173,7 @@ func TestDemuxFlushAggregatorToSerializer(t *testing.T) { opts := demuxTestOptions() opts.FlushInterval = time.Hour deps := createDemuxDeps(t, opts, eventplatformimpl.NewDefaultParams()) - demux := initAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorFwd, opts, deps.EventPlatformFwd, deps.Compressor, nooptagger.NewComponent(), "") + demux := initAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorFwd, opts, deps.EventPlatformFwd, nil, deps.Compressor, nooptagger.NewComponent(), "") demux.Aggregator().tlmContainerTagsEnabled = false require.NotNil(demux) require.NotNil(demux.aggregator) @@ -300,7 +300,7 @@ func createDemuxDepsWithOrchestratorFwd( return aggregatorDeps{ TestDeps: deps.TestDeps, - Demultiplexer: InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorForwarder, opts, deps.Eventplatform, deps.Compressor, nooptagger.NewComponent(), ""), + Demultiplexer: InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorForwarder, opts, deps.Eventplatform, nil, deps.Compressor, nooptagger.NewComponent(), ""), OrchestratorFwd: deps.OrchestratorForwarder, EventPlatformFwd: deps.Eventplatform, } diff --git a/pkg/aggregator/mocksender/mocksender.go b/pkg/aggregator/mocksender/mocksender.go index e4403bdc352ef6..5a19a4c9dec8a5 100644 --- a/pkg/aggregator/mocksender/mocksender.go +++ b/pkg/aggregator/mocksender/mocksender.go @@ -43,7 +43,7 @@ func CreateDefaultDemultiplexer() *aggregator.AgentDemultiplexer { orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostnameimpl.NewHostnameService())) taggerComponent := nooptagger.NewComponent() - return aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, compressionimpl.NewMockCompressor(), taggerComponent, "") + return aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, nil, compressionimpl.NewMockCompressor(), taggerComponent, "") } diff --git a/pkg/aggregator/sender_test.go b/pkg/aggregator/sender_test.go index 17713653a59bcc..2b5fff4ab521db 100644 --- a/pkg/aggregator/sender_test.go +++ b/pkg/aggregator/sender_test.go @@ -58,7 +58,7 @@ func testDemux(log log.Component, hostname hostname.Component) *AgentDemultiplex opts.DontStartForwarders = true orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostname)) - demux := initAgentDemultiplexer(log, NewForwarderTest(log), &orchestratorForwarder, opts, eventPlatformForwarder, compressionimpl.NewMockCompressor(), nooptagger.NewComponent(), defaultHostname) + demux := initAgentDemultiplexer(log, NewForwarderTest(log), &orchestratorForwarder, opts, eventPlatformForwarder, nil, compressionimpl.NewMockCompressor(), nooptagger.NewComponent(), defaultHostname) return demux } diff --git a/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go b/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go index 720f4f713a35bc..6e7c55f230fcc9 100644 --- a/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go +++ b/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go @@ -25,7 +25,7 @@ import ( func TestProfileBundleJsonZip(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "zipprofiles.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) diff --git a/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go b/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go index 06b55c7df2180a..68e82058528ddb 100644 --- a/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go +++ b/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go @@ -32,7 +32,7 @@ import ( func TestProfileMetadata_f5(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) diff --git a/pkg/collector/corechecks/snmp/integration_topology_test.go b/pkg/collector/corechecks/snmp/integration_topology_test.go index 0f5ccfbc2e5f4f..b4f0f42ceb6d0d 100644 --- a/pkg/collector/corechecks/snmp/integration_topology_test.go +++ b/pkg/collector/corechecks/snmp/integration_topology_test.go @@ -32,7 +32,7 @@ import ( func TestTopologyPayload_LLDP(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) @@ -734,7 +734,7 @@ profiles: func TestTopologyPayload_CDP(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) @@ -1427,7 +1427,7 @@ profiles: // we have different data for LLDP and CDP to test that we're only using LLDP to build the links func TestTopologyPayload_LLDP_CDP(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) diff --git a/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go b/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go index 997aadd2103908..2dc78dd76b5477 100644 --- a/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go +++ b/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go @@ -25,7 +25,7 @@ import ( func TestConfigurations(t *testing.T) { profile.SetConfdPathAndCleanProfiles() - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) // language=yaml rawInstanceConfig := []byte(` @@ -326,7 +326,7 @@ profiles: func TestInlineProfileConfiguration(t *testing.T) { profile.SetConfdPathAndCleanProfiles() - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) // language=yaml rawInstanceConfig := []byte(` diff --git a/pkg/collector/corechecks/systemd/systemd_test.go b/pkg/collector/corechecks/systemd/systemd_test.go index 6f91dc4edd2fea..7fb9d67df981ab 100644 --- a/pkg/collector/corechecks/systemd/systemd_test.go +++ b/pkg/collector/corechecks/systemd/systemd_test.go @@ -1087,7 +1087,7 @@ unit_names: func TestCheckID(t *testing.T) { check1 := newCheck() check2 := newCheck() - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) // language=yaml rawInstanceConfig1 := []byte(` diff --git a/test/benchmarks/kubernetes_state/main.go b/test/benchmarks/kubernetes_state/main.go index 08b83d358855e7..a1ea844898a9ba 100644 --- a/test/benchmarks/kubernetes_state/main.go +++ b/test/benchmarks/kubernetes_state/main.go @@ -207,7 +207,7 @@ func main() { * As it has a `nil` serializer, it will panic if it tries to flush the metrics. * That’s why we need a big enough flush interval */ - aggregator.NewBufferedAggregator(nil, "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, "", nil, 1*time.Hour) /* * Wait for informers to get populated