diff --git a/comp/aggregator/bundle_test.go b/comp/aggregator/bundle_test.go index 3a426385bccef..2879750ebf4a2 100644 --- a/comp/aggregator/bundle_test.go +++ b/comp/aggregator/bundle_test.go @@ -14,6 +14,7 @@ import ( "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" + noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop" "github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl" "github.com/DataDog/datadog-agent/pkg/util/fxutil" ) @@ -26,5 +27,6 @@ func TestBundleDependencies(t *testing.T) { orchestratorForwarderImpl.MockModule(), eventplatformimpl.MockModule(), nooptagger.Module(), + noophaagent.Module(), ) } diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go index c50c6528029a8..80a4817d42804 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go @@ -20,6 +20,7 @@ import ( "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/comp/serializer/compression" "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" @@ -42,6 +43,7 @@ type dependencies struct { SharedForwarder defaultforwarder.Component OrchestratorForwarder orchestratorforwarder.Component EventPlatformForwarder eventplatform.Component + HaAgent haagent.Component Compressor compression.Component Tagger tagger.Component @@ -86,6 +88,7 @@ func newDemultiplexer(deps dependencies) (provides, error) { deps.OrchestratorForwarder, options, deps.EventPlatformForwarder, + deps.HaAgent, deps.Compressor, deps.Tagger, hostnameDetected, diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/status_test.go b/comp/aggregator/demultiplexer/demultiplexerimpl/status_test.go index e66c099ac36aa..cfd81d9c8033c 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/status_test.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/status_test.go @@ -10,6 +10,7 @@ import ( "bytes" "testing" + noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop" "github.com/stretchr/testify/require" "go.uber.org/fx" @@ -61,6 +62,7 @@ func TestStatusOutPut(t *testing.T) { core.MockBundle(), compressionimpl.MockModule(), defaultforwarder.MockModule(), + noophaagent.Module(), orchestratorimpl.MockModule(), eventplatformimpl.MockModule(), fx.Provide(func() tagger.Component { diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go b/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go index 4ac27d7d64460..de6248f81c9d2 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go @@ -18,6 +18,7 @@ import ( "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" + noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop" "github.com/DataDog/datadog-agent/comp/serializer/compression" "github.com/DataDog/datadog-agent/pkg/aggregator" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" @@ -185,6 +186,6 @@ func initTestAgentDemultiplexerWithFlushInterval(log log.Component, hostname hos sharedForwarder := defaultforwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), log, sharedForwarderOptions) orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostname)) - demux := aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, compressor, noopimpl.NewComponent(), "hostname") + demux := aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, noophaagent.NewNoopHaAgent(), compressor, noopimpl.NewComponent(), "hostname") return NewTestAgentDemultiplexer(demux) } diff --git a/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go b/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go index 73de2ac99cca7..5561ed840fd49 100644 --- a/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go +++ b/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go @@ -9,6 +9,7 @@ package diagnosesendermanagerimpl import ( "context" + noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop" "go.uber.org/fx" "github.com/DataDog/datadog-agent/comp/aggregator/diagnosesendermanager" @@ -78,6 +79,7 @@ func (sender *diagnoseSenderManager) LazyGetSenderManager() (sender.SenderManage orchestratorForwarder, opts, eventPlatformForwarder, + noophaagent.NewNoopHaAgent(), sender.deps.Compressor, sender.deps.Tagger, hostnameDetected) diff --git a/comp/haagent/impl-noop/noophaagent.go b/comp/haagent/impl-noop/noophaagent.go new file mode 100644 index 0000000000000..f194c62491f68 --- /dev/null +++ b/comp/haagent/impl-noop/noophaagent.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +// Package noophaagent provides a noop haagent component +package noophaagent + +import ( + "go.uber.org/fx" + + log "github.com/DataDog/datadog-agent/comp/core/log/def" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" +) + +type noopHaAgent struct { + Logger log.Component +} + +func (m *noopHaAgent) GetGroup() string { return "" } + +func (m *noopHaAgent) Enabled() bool { return false } + +func (m *noopHaAgent) SetLeader(_ string) {} + +func (m *noopHaAgent) IsLeader() bool { return false } + +// Provides that defines the output of mocked snmpscan component +type Provides struct { + comp haagent.Component +} + +// NewNoopHaAgent returns a new Mock +func NewNoopHaAgent() haagent.Component { + return &noopHaAgent{} +} + +// Module defines the fx options for the noopHaAgent component. +func Module() fxutil.Module { + return fxutil.Component( + fx.Provide(NewNoopHaAgent), + ) +} diff --git a/comp/haagent/mock/mock.go b/comp/haagent/mock/mock.go index 6ee0c733361f7..f4bd713137b07 100644 --- a/comp/haagent/mock/mock.go +++ b/comp/haagent/mock/mock.go @@ -9,37 +9,64 @@ package mock import ( - "testing" - log "github.com/DataDog/datadog-agent/comp/core/log/def" haagent "github.com/DataDog/datadog-agent/comp/haagent/def" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" + "go.uber.org/fx" ) -type mock struct { +type mockHaAgent struct { Logger log.Component + + group string + enabled bool +} + +func (m *mockHaAgent) GetGroup() string { + return m.group } -func (m *mock) GetGroup() string { - return "mockGroup01" +func (m *mockHaAgent) Enabled() bool { + return m.enabled } -func (m *mock) Enabled() bool { - return true +func (m *mockHaAgent) SetLeader(_ string) { } -func (m *mock) SetLeader(_ string) { +func (m *mockHaAgent) IsLeader() bool { return false } + +func (m *mockHaAgent) SetGroup(group string) { + m.group = group } -func (m *mock) IsLeader() bool { return false } +func (m *mockHaAgent) SetEnabled(enabled bool) { + m.enabled = enabled +} + +// MockComponent is the component type. +type MockComponent interface { + haagent.Component + + SetGroup(string) + SetEnabled(bool) +} // Provides that defines the output of mocked snmpscan component type Provides struct { - comp haagent.Component + comp MockComponent } -// Mock returns a mock for haagent component. -func Mock(_ *testing.T) Provides { - return Provides{ - comp: &mock{}, +// NewMockHaAgent returns a new Mock +func NewMockHaAgent() MockComponent { + return &mockHaAgent{ + enabled: false, + group: "group01", } } + +// MockModule defines the fx options for the mockHaAgent component. +func MockModule() fxutil.Module { + return fxutil.Component( + fx.Provide(NewMockHaAgent), + ) +} diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index d42ac89d50e47..e8ac4fd425d13 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -16,6 +16,7 @@ import ( tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" "github.com/DataDog/datadog-agent/comp/core/tagger/types" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" "github.com/DataDog/datadog-agent/pkg/config/model" @@ -252,6 +253,7 @@ type BufferedAggregator struct { flushMutex sync.Mutex // to start multiple flushes in parallel serializer serializer.MetricSerializer eventPlatformForwarder eventplatform.Component + haAgent haagent.Component hostname string hostnameUpdate chan string hostnameUpdateDone chan struct{} // signals that the hostname update is finished @@ -283,7 +285,7 @@ func NewFlushAndSerializeInParallel(config model.Config) FlushAndSerializeInPara } // NewBufferedAggregator instantiates a BufferedAggregator -func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator { +func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator { bufferSize := pkgconfigsetup.Datadog().GetInt("aggregator_buffer_size") agentName := flavor.GetFlavor() @@ -326,6 +328,7 @@ func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder flushInterval: flushInterval, serializer: s, eventPlatformForwarder: eventPlatformForwarder, + haAgent: haAgent, hostname: hostname, hostnameUpdate: make(chan string), hostnameUpdateDone: make(chan struct{}), @@ -861,6 +864,9 @@ func (agg *BufferedAggregator) tags(withVersion bool) []string { tags = append(tags, "package_version:"+version.AgentPackageVersion) } } + if agg.haAgent.Enabled() { + tags = append(tags, "agent_group:"+agg.haAgent.GetGroup()) + } // nil to empty string // This is expected by other components/tests if tags == nil { diff --git a/pkg/aggregator/aggregator_test.go b/pkg/aggregator/aggregator_test.go index 64c33496891c7..e043064e4cade 100644 --- a/pkg/aggregator/aggregator_test.go +++ b/pkg/aggregator/aggregator_test.go @@ -27,6 +27,8 @@ import ( "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" + noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop" + mockhaagent "github.com/DataDog/datadog-agent/comp/haagent/mock" "github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" configmock "github.com/DataDog/datadog-agent/pkg/config/mock" @@ -146,7 +148,7 @@ func TestAddServiceCheckDefaultValues(t *testing.T) { s := &MockSerializerIterableSerie{} taggerComponent := taggerMock.SetupFakeTagger(t) - agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) + agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) agg.addServiceCheck(servicecheck.ServiceCheck{ // leave Host and Ts fields blank @@ -179,7 +181,7 @@ func TestAddEventDefaultValues(t *testing.T) { s := &MockSerializerIterableSerie{} taggerComponent := taggerMock.SetupFakeTagger(t) - agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) + agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) agg.addEvent(event.Event{ // only populate required fields @@ -229,7 +231,7 @@ func TestDefaultData(t *testing.T) { s := &MockSerializerIterableSerie{} taggerComponent := taggerMock.SetupFakeTagger(t) - agg := NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval) + agg := NewBufferedAggregator(s, nil, noophaagent.NewNoopHaAgent(), taggerComponent, "hostname", DefaultFlushInterval) start := time.Now() @@ -512,6 +514,7 @@ func TestTags(t *testing.T) { agentTags func(types.TagCardinality) ([]string, error) globalTags func(types.TagCardinality) ([]string, error) withVersion bool + haAgentEnabled bool want []string }{ { @@ -577,6 +580,16 @@ func TestTags(t *testing.T) { withVersion: true, want: []string{"container_name:agent", "version:" + version.AgentVersion, "kube_cluster_name:foo"}, }, + { + name: "tags disabled, without version, ha agent enabled", + hostname: "hostname", + tlmContainerTagsEnabled: false, + agentTags: func(types.TagCardinality) ([]string, error) { return nil, errors.New("disabled") }, + globalTags: func(types.TagCardinality) ([]string, error) { return nil, errors.New("disabled") }, + withVersion: false, + haAgentEnabled: true, + want: []string{"agent_group:group01"}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -585,7 +598,10 @@ func TestTags(t *testing.T) { taggerComponent := taggerMock.SetupFakeTagger(t) - agg := NewBufferedAggregator(nil, nil, taggerComponent, tt.hostname, time.Second) + mockHaAgent := mockhaagent.NewMockHaAgent() + mockHaAgent.SetEnabled(tt.haAgentEnabled) + + agg := NewBufferedAggregator(nil, nil, mockHaAgent, taggerComponent, tt.hostname, time.Second) agg.agentTags = tt.agentTags agg.globalTags = tt.globalTags assert.ElementsMatch(t, tt.want, agg.tags(tt.withVersion)) @@ -619,7 +635,7 @@ func TestAddDJMRecurrentSeries(t *testing.T) { s := &MockSerializerIterableSerie{} // NewBufferedAggregator with DJM enable will create a new recurrentSeries taggerComponent := taggerMock.SetupFakeTagger(t) - NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval) + NewBufferedAggregator(s, nil, nil, taggerComponent, "hostname", DefaultFlushInterval) expectedRecurrentSeries := metrics.Series{&metrics.Serie{ Name: "datadog.djm.agent_host", @@ -728,7 +744,7 @@ type aggregatorDeps struct { } func createAggrDeps(t *testing.T) aggregatorDeps { - deps := fxutil.Test[TestDeps](t, defaultforwarder.MockModule(), core.MockBundle(), compressionimpl.MockModule()) + deps := fxutil.Test[TestDeps](t, defaultforwarder.MockModule(), core.MockBundle(), compressionimpl.MockModule(), noophaagent.Module()) opts := demuxTestOptions() return aggregatorDeps{ diff --git a/pkg/aggregator/check_sampler_bench_test.go b/pkg/aggregator/check_sampler_bench_test.go index dbb6e8b57bf45..720dc5ea70329 100644 --- a/pkg/aggregator/check_sampler_bench_test.go +++ b/pkg/aggregator/check_sampler_bench_test.go @@ -17,6 +17,7 @@ import ( "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" + noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop" "github.com/DataDog/datadog-agent/comp/serializer/compression" //nolint:revive // TODO(AML) Fix revive linter @@ -51,7 +52,7 @@ func benchmarkAddBucket(bucketValue int64, b *testing.B) { sharedForwarder := forwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), deps.Log, forwarderOpts) orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname)) - demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.Compressor, taggerComponent, "hostname") + demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, noophaagent.NewNoopHaAgent(), deps.Compressor, taggerComponent, "hostname") defer demux.Stop(true) checkSampler := newCheckSampler(1, true, true, 1000, tags.NewStore(true, "bench"), checkid.ID("hello:world:1234"), taggerComponent) diff --git a/pkg/aggregator/demultiplexer_agent.go b/pkg/aggregator/demultiplexer_agent.go index a420537fccd25..36528b374b745 100644 --- a/pkg/aggregator/demultiplexer_agent.go +++ b/pkg/aggregator/demultiplexer_agent.go @@ -17,6 +17,7 @@ import ( forwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/comp/serializer/compression" "github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" @@ -125,20 +126,21 @@ func InitAndStartAgentDemultiplexer( orchestratorForwarder orchestratorforwarder.Component, options AgentDemultiplexerOptions, eventPlatformForwarder eventplatform.Component, + haAgent haagent.Component, compressor compression.Component, tagger tagger.Component, hostname string) *AgentDemultiplexer { - demux := initAgentDemultiplexer(log, sharedForwarder, orchestratorForwarder, options, eventPlatformForwarder, compressor, tagger, hostname) + demux := initAgentDemultiplexer(log, sharedForwarder, orchestratorForwarder, options, eventPlatformForwarder, haAgent, compressor, tagger, hostname) go demux.run() return demux } -func initAgentDemultiplexer( - log log.Component, +func initAgentDemultiplexer(log log.Component, sharedForwarder forwarder.Forwarder, orchestratorForwarder orchestratorforwarder.Component, options AgentDemultiplexerOptions, eventPlatformForwarder eventplatform.Component, + haAgent haagent.Component, compressor compression.Component, tagger tagger.Component, hostname string) *AgentDemultiplexer { @@ -157,7 +159,7 @@ func initAgentDemultiplexer( // prepare the embedded aggregator // -- - agg := NewBufferedAggregator(sharedSerializer, eventPlatformForwarder, tagger, hostname, options.FlushInterval) + agg := NewBufferedAggregator(sharedSerializer, eventPlatformForwarder, haAgent, tagger, hostname, options.FlushInterval) // statsd samplers // --------------- diff --git a/pkg/aggregator/demultiplexer_agent_test.go b/pkg/aggregator/demultiplexer_agent_test.go index f2703abc6ec17..b932540e16258 100644 --- a/pkg/aggregator/demultiplexer_agent_test.go +++ b/pkg/aggregator/demultiplexer_agent_test.go @@ -23,6 +23,8 @@ import ( "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" + noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop" "github.com/DataDog/datadog-agent/comp/serializer/compression" "github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -65,7 +67,7 @@ func TestDemuxNoAggOptionDisabled(t *testing.T) { opts := demuxTestOptions() deps := createDemultiplexerAgentTestDeps(t) - demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, deps.Compressor, deps.Tagger, "") + demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, deps.HaAgent, deps.Compressor, deps.Tagger, "") batch := testDemuxSamples(t) @@ -87,7 +89,7 @@ func TestDemuxNoAggOptionEnabled(t *testing.T) { mockSerializer.On("AreSketchesEnabled").Return(true) opts.EnableNoAggregationPipeline = true deps := createDemultiplexerAgentTestDeps(t) - demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, deps.Compressor, deps.Tagger, "") + demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, deps.HaAgent, deps.Compressor, deps.Tagger, "") demux.statsd.noAggStreamWorker.serializer = mockSerializer // the no agg pipeline will use our mocked serializer go demux.run() @@ -112,7 +114,7 @@ func TestDemuxNoAggOptionEnabled(t *testing.T) { func TestDemuxNoAggOptionIsDisabledByDefault(t *testing.T) { opts := demuxTestOptions() - deps := fxutil.Test[TestDeps](t, defaultforwarder.MockModule(), core.MockBundle(), compressionimpl.MockModule()) + deps := fxutil.Test[TestDeps](t, defaultforwarder.MockModule(), core.MockBundle(), compressionimpl.MockModule(), noophaagent.Module()) demux := InitAndStartAgentDemultiplexerForTest(deps, opts, "") require.False(t, demux.Options().EnableNoAggregationPipeline, "the no aggregation pipeline should be disabled by default") @@ -157,6 +159,7 @@ type DemultiplexerAgentTestDeps struct { EventPlatform eventplatform.Component Compressor compression.Component Tagger tagger.Component + HaAgent haagent.Component } func createDemultiplexerAgentTestDeps(t *testing.T) DemultiplexerAgentTestDeps { @@ -168,6 +171,7 @@ func createDemultiplexerAgentTestDeps(t *testing.T) DemultiplexerAgentTestDeps { core.MockBundle(), orchestratorimpl.MockModule(), eventplatformimpl.MockModule(), + noophaagent.Module(), compressionimpl.MockModule(), fx.Provide(func() tagger.Component { return taggerComponent }), ) diff --git a/pkg/aggregator/demultiplexer_mock.go b/pkg/aggregator/demultiplexer_mock.go index 917a788795957..c6511f7eb6890 100644 --- a/pkg/aggregator/demultiplexer_mock.go +++ b/pkg/aggregator/demultiplexer_mock.go @@ -16,6 +16,7 @@ import ( "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/comp/serializer/compression" "github.com/DataDog/datadog-agent/pkg/util/optional" ) @@ -27,11 +28,12 @@ type TestDeps struct { Hostname hostname.Component SharedForwarder defaultforwarder.Component Compressor compression.Component + HaAgent haagent.Component } // InitAndStartAgentDemultiplexerForTest initializes an aggregator for tests. func InitAndStartAgentDemultiplexerForTest(deps TestDeps, options AgentDemultiplexerOptions, hostname string) *AgentDemultiplexer { orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname)) - return InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.Compressor, nooptagger.NewComponent(), hostname) + return InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.HaAgent, deps.Compressor, nooptagger.NewComponent(), hostname) } diff --git a/pkg/aggregator/demultiplexer_test.go b/pkg/aggregator/demultiplexer_test.go index 6c3dd77976bdf..6ffce51e72404 100644 --- a/pkg/aggregator/demultiplexer_test.go +++ b/pkg/aggregator/demultiplexer_test.go @@ -173,7 +173,7 @@ func TestDemuxFlushAggregatorToSerializer(t *testing.T) { opts := demuxTestOptions() opts.FlushInterval = time.Hour deps := createDemuxDeps(t, opts, eventplatformimpl.NewDefaultParams()) - demux := initAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorFwd, opts, deps.EventPlatformFwd, deps.Compressor, nooptagger.NewComponent(), "") + demux := initAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorFwd, opts, deps.EventPlatformFwd, deps.HaAgent, deps.Compressor, nooptagger.NewComponent(), "") demux.Aggregator().tlmContainerTagsEnabled = false require.NotNil(demux) require.NotNil(demux.aggregator) @@ -300,7 +300,7 @@ func createDemuxDepsWithOrchestratorFwd( return aggregatorDeps{ TestDeps: deps.TestDeps, - Demultiplexer: InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorForwarder, opts, deps.Eventplatform, deps.Compressor, nooptagger.NewComponent(), ""), + Demultiplexer: InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorForwarder, opts, deps.Eventplatform, deps.HaAgent, deps.Compressor, nooptagger.NewComponent(), ""), OrchestratorFwd: deps.OrchestratorForwarder, EventPlatformFwd: deps.Eventplatform, } diff --git a/pkg/aggregator/mocksender/mocksender.go b/pkg/aggregator/mocksender/mocksender.go index e4403bdc352ef..21c9bf7e92a4b 100644 --- a/pkg/aggregator/mocksender/mocksender.go +++ b/pkg/aggregator/mocksender/mocksender.go @@ -10,6 +10,7 @@ package mocksender import ( "time" + noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop" "github.com/stretchr/testify/mock" "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" @@ -43,7 +44,7 @@ func CreateDefaultDemultiplexer() *aggregator.AgentDemultiplexer { orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostnameimpl.NewHostnameService())) taggerComponent := nooptagger.NewComponent() - return aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, compressionimpl.NewMockCompressor(), taggerComponent, "") + return aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, noophaagent.NewNoopHaAgent(), compressionimpl.NewMockCompressor(), taggerComponent, "") } diff --git a/pkg/aggregator/sender_test.go b/pkg/aggregator/sender_test.go index 17713653a59bc..ce2e182bb51e1 100644 --- a/pkg/aggregator/sender_test.go +++ b/pkg/aggregator/sender_test.go @@ -23,6 +23,7 @@ import ( "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" + noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop" "github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -58,7 +59,7 @@ func testDemux(log log.Component, hostname hostname.Component) *AgentDemultiplex opts.DontStartForwarders = true orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostname)) - demux := initAgentDemultiplexer(log, NewForwarderTest(log), &orchestratorForwarder, opts, eventPlatformForwarder, compressionimpl.NewMockCompressor(), nooptagger.NewComponent(), defaultHostname) + demux := initAgentDemultiplexer(log, NewForwarderTest(log), &orchestratorForwarder, opts, eventPlatformForwarder, noophaagent.NewNoopHaAgent(), compressionimpl.NewMockCompressor(), nooptagger.NewComponent(), defaultHostname) return demux } diff --git a/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go b/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go index 720f4f713a35b..6e7c55f230fcc 100644 --- a/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go +++ b/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go @@ -25,7 +25,7 @@ import ( func TestProfileBundleJsonZip(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "zipprofiles.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) diff --git a/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go b/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go index 06b55c7df2180..68e82058528dd 100644 --- a/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go +++ b/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go @@ -32,7 +32,7 @@ import ( func TestProfileMetadata_f5(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) diff --git a/pkg/collector/corechecks/snmp/integration_topology_test.go b/pkg/collector/corechecks/snmp/integration_topology_test.go index 0f5ccfbc2e5f4..b4f0f42ceb6d0 100644 --- a/pkg/collector/corechecks/snmp/integration_topology_test.go +++ b/pkg/collector/corechecks/snmp/integration_topology_test.go @@ -32,7 +32,7 @@ import ( func TestTopologyPayload_LLDP(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) @@ -734,7 +734,7 @@ profiles: func TestTopologyPayload_CDP(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) @@ -1427,7 +1427,7 @@ profiles: // we have different data for LLDP and CDP to test that we're only using LLDP to build the links func TestTopologyPayload_LLDP_CDP(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) diff --git a/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go b/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go index 997aadd210390..2dc78dd76b547 100644 --- a/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go +++ b/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go @@ -25,7 +25,7 @@ import ( func TestConfigurations(t *testing.T) { profile.SetConfdPathAndCleanProfiles() - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) // language=yaml rawInstanceConfig := []byte(` @@ -326,7 +326,7 @@ profiles: func TestInlineProfileConfiguration(t *testing.T) { profile.SetConfdPathAndCleanProfiles() - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) // language=yaml rawInstanceConfig := []byte(` diff --git a/pkg/collector/corechecks/systemd/systemd_test.go b/pkg/collector/corechecks/systemd/systemd_test.go index 6f91dc4edd2fe..7fb9d67df981a 100644 --- a/pkg/collector/corechecks/systemd/systemd_test.go +++ b/pkg/collector/corechecks/systemd/systemd_test.go @@ -1087,7 +1087,7 @@ unit_names: func TestCheckID(t *testing.T) { check1 := newCheck() check2 := newCheck() - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour) // language=yaml rawInstanceConfig1 := []byte(` diff --git a/test/benchmarks/kubernetes_state/main.go b/test/benchmarks/kubernetes_state/main.go index 08b83d358855e..a1ea844898a9b 100644 --- a/test/benchmarks/kubernetes_state/main.go +++ b/test/benchmarks/kubernetes_state/main.go @@ -207,7 +207,7 @@ func main() { * As it has a `nil` serializer, it will panic if it tries to flush the metrics. * That’s why we need a big enough flush interval */ - aggregator.NewBufferedAggregator(nil, "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, "", nil, 1*time.Hour) /* * Wait for informers to get populated