Skip to content

Commit

Permalink
[haagent] Add agent group tag to datadog.agent.running
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandreYang committed Nov 18, 2024
1 parent af96a94 commit e3bd8e5
Show file tree
Hide file tree
Showing 22 changed files with 158 additions and 44 deletions.
2 changes: 2 additions & 0 deletions comp/aggregator/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop"
"github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
)
Expand All @@ -26,5 +27,6 @@ func TestBundleDependencies(t *testing.T) {
orchestratorForwarderImpl.MockModule(),
eventplatformimpl.MockModule(),
nooptagger.Module(),
noophaagent.Module(),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator"
haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
"github.com/DataDog/datadog-agent/comp/serializer/compression"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
Expand All @@ -42,6 +43,7 @@ type dependencies struct {
SharedForwarder defaultforwarder.Component
OrchestratorForwarder orchestratorforwarder.Component
EventPlatformForwarder eventplatform.Component
HaAgent haagent.Component
Compressor compression.Component
Tagger tagger.Component

Expand Down Expand Up @@ -86,6 +88,7 @@ func newDemultiplexer(deps dependencies) (provides, error) {
deps.OrchestratorForwarder,
options,
deps.EventPlatformForwarder,
deps.HaAgent,
deps.Compressor,
deps.Tagger,
hostnameDetected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"bytes"
"testing"

noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop"
"github.com/stretchr/testify/require"
"go.uber.org/fx"

Expand Down Expand Up @@ -61,6 +62,7 @@ func TestStatusOutPut(t *testing.T) {
core.MockBundle(),
compressionimpl.MockModule(),
defaultforwarder.MockModule(),
noophaagent.Module(),
orchestratorimpl.MockModule(),
eventplatformimpl.MockModule(),
fx.Provide(func() tagger.Component {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl"
noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop"
"github.com/DataDog/datadog-agent/comp/serializer/compression"
"github.com/DataDog/datadog-agent/pkg/aggregator"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
Expand Down Expand Up @@ -185,6 +186,6 @@ func initTestAgentDemultiplexerWithFlushInterval(log log.Component, hostname hos
sharedForwarder := defaultforwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), log, sharedForwarderOptions)
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostname))
demux := aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, compressor, noopimpl.NewComponent(), "hostname")
demux := aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, noophaagent.NewNoopHaAgent(), compressor, noopimpl.NewComponent(), "hostname")
return NewTestAgentDemultiplexer(demux)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package diagnosesendermanagerimpl
import (
"context"

noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop"
"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/aggregator/diagnosesendermanager"
Expand Down Expand Up @@ -78,6 +79,7 @@ func (sender *diagnoseSenderManager) LazyGetSenderManager() (sender.SenderManage
orchestratorForwarder,
opts,
eventPlatformForwarder,
noophaagent.NewNoopHaAgent(),
sender.deps.Compressor,
sender.deps.Tagger,
hostnameDetected)
Expand Down
44 changes: 44 additions & 0 deletions comp/haagent/impl-noop/noophaagent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

// Package noophaagent provides a noop haagent component
package noophaagent

import (
"go.uber.org/fx"

log "github.com/DataDog/datadog-agent/comp/core/log/def"
haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
)

type noopHaAgent struct {
Logger log.Component
}

func (m *noopHaAgent) GetGroup() string { return "" }

func (m *noopHaAgent) Enabled() bool { return false }

func (m *noopHaAgent) SetLeader(_ string) {}

func (m *noopHaAgent) IsLeader() bool { return false }

// Provides that defines the output of mocked snmpscan component
type Provides struct {
comp haagent.Component
}

// NewNoopHaAgent returns a new Mock
func NewNoopHaAgent() haagent.Component {
return &noopHaAgent{}
}

// Module defines the fx options for the noopHaAgent component.
func Module() fxutil.Module {
return fxutil.Component(
fx.Provide(NewNoopHaAgent),
)
}
55 changes: 41 additions & 14 deletions comp/haagent/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,64 @@
package mock

import (
"testing"

log "github.com/DataDog/datadog-agent/comp/core/log/def"
haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"go.uber.org/fx"
)

type mock struct {
type mockHaAgent struct {
Logger log.Component

group string
enabled bool
}

func (m *mockHaAgent) GetGroup() string {
return m.group
}

func (m *mock) GetGroup() string {
return "mockGroup01"
func (m *mockHaAgent) Enabled() bool {
return m.enabled
}

func (m *mock) Enabled() bool {
return true
func (m *mockHaAgent) SetLeader(_ string) {
}

func (m *mock) SetLeader(_ string) {
func (m *mockHaAgent) IsLeader() bool { return false }

func (m *mockHaAgent) SetGroup(group string) {
m.group = group
}

func (m *mock) IsLeader() bool { return false }
func (m *mockHaAgent) SetEnabled(enabled bool) {
m.enabled = enabled
}

// MockComponent is the component type.
type MockComponent interface {
haagent.Component

SetGroup(string)
SetEnabled(bool)
}

// Provides that defines the output of mocked snmpscan component
type Provides struct {
comp haagent.Component
comp MockComponent
}

// Mock returns a mock for haagent component.
func Mock(_ *testing.T) Provides {
return Provides{
comp: &mock{},
// NewMockHaAgent returns a new Mock
func NewMockHaAgent() MockComponent {
return &mockHaAgent{
enabled: false,
group: "group01",
}
}

// MockModule defines the fx options for the mockHaAgent component.
func MockModule() fxutil.Module {
return fxutil.Component(
fx.Provide(NewMockHaAgent),
)
}
8 changes: 7 additions & 1 deletion pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
"github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
"github.com/DataDog/datadog-agent/pkg/config/model"
Expand Down Expand Up @@ -252,6 +253,7 @@ type BufferedAggregator struct {
flushMutex sync.Mutex // to start multiple flushes in parallel
serializer serializer.MetricSerializer
eventPlatformForwarder eventplatform.Component
haAgent haagent.Component
hostname string
hostnameUpdate chan string
hostnameUpdateDone chan struct{} // signals that the hostname update is finished
Expand Down Expand Up @@ -283,7 +285,7 @@ func NewFlushAndSerializeInParallel(config model.Config) FlushAndSerializeInPara
}

// NewBufferedAggregator instantiates a BufferedAggregator
func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator {
func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator {
bufferSize := pkgconfigsetup.Datadog().GetInt("aggregator_buffer_size")

agentName := flavor.GetFlavor()
Expand Down Expand Up @@ -326,6 +328,7 @@ func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder
flushInterval: flushInterval,
serializer: s,
eventPlatformForwarder: eventPlatformForwarder,
haAgent: haAgent,
hostname: hostname,
hostnameUpdate: make(chan string),
hostnameUpdateDone: make(chan struct{}),
Expand Down Expand Up @@ -861,6 +864,9 @@ func (agg *BufferedAggregator) tags(withVersion bool) []string {
tags = append(tags, "package_version:"+version.AgentPackageVersion)
}
}
if agg.haAgent.Enabled() {
tags = append(tags, "agent_group:"+agg.haAgent.GetGroup())
}
// nil to empty string
// This is expected by other components/tests
if tags == nil {
Expand Down
28 changes: 22 additions & 6 deletions pkg/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator"
noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop"
mockhaagent "github.com/DataDog/datadog-agent/comp/haagent/mock"
"github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
configmock "github.com/DataDog/datadog-agent/pkg/config/mock"
Expand Down Expand Up @@ -146,7 +148,7 @@ func TestAddServiceCheckDefaultValues(t *testing.T) {

s := &MockSerializerIterableSerie{}
taggerComponent := taggerMock.SetupFakeTagger(t)
agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval)
agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval)

agg.addServiceCheck(servicecheck.ServiceCheck{
// leave Host and Ts fields blank
Expand Down Expand Up @@ -179,7 +181,7 @@ func TestAddEventDefaultValues(t *testing.T) {

s := &MockSerializerIterableSerie{}
taggerComponent := taggerMock.SetupFakeTagger(t)
agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval)
agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval)

agg.addEvent(event.Event{
// only populate required fields
Expand Down Expand Up @@ -229,7 +231,7 @@ func TestDefaultData(t *testing.T) {

s := &MockSerializerIterableSerie{}
taggerComponent := taggerMock.SetupFakeTagger(t)
agg := NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval)
agg := NewBufferedAggregator(s, nil, noophaagent.NewNoopHaAgent(), taggerComponent, "hostname", DefaultFlushInterval)

start := time.Now()

Expand Down Expand Up @@ -512,6 +514,7 @@ func TestTags(t *testing.T) {
agentTags func(types.TagCardinality) ([]string, error)
globalTags func(types.TagCardinality) ([]string, error)
withVersion bool
haAgentEnabled bool
want []string
}{
{
Expand Down Expand Up @@ -577,6 +580,16 @@ func TestTags(t *testing.T) {
withVersion: true,
want: []string{"container_name:agent", "version:" + version.AgentVersion, "kube_cluster_name:foo"},
},
{
name: "tags disabled, without version, ha agent enabled",
hostname: "hostname",
tlmContainerTagsEnabled: false,
agentTags: func(types.TagCardinality) ([]string, error) { return nil, errors.New("disabled") },
globalTags: func(types.TagCardinality) ([]string, error) { return nil, errors.New("disabled") },
withVersion: false,
haAgentEnabled: true,
want: []string{"agent_group:group01"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -585,7 +598,10 @@ func TestTags(t *testing.T) {

taggerComponent := taggerMock.SetupFakeTagger(t)

agg := NewBufferedAggregator(nil, nil, taggerComponent, tt.hostname, time.Second)
mockHaAgent := mockhaagent.NewMockHaAgent()
mockHaAgent.SetEnabled(tt.haAgentEnabled)

agg := NewBufferedAggregator(nil, nil, mockHaAgent, taggerComponent, tt.hostname, time.Second)
agg.agentTags = tt.agentTags
agg.globalTags = tt.globalTags
assert.ElementsMatch(t, tt.want, agg.tags(tt.withVersion))
Expand Down Expand Up @@ -619,7 +635,7 @@ func TestAddDJMRecurrentSeries(t *testing.T) {
s := &MockSerializerIterableSerie{}
// NewBufferedAggregator with DJM enable will create a new recurrentSeries
taggerComponent := taggerMock.SetupFakeTagger(t)
NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval)
NewBufferedAggregator(s, nil, nil, taggerComponent, "hostname", DefaultFlushInterval)

expectedRecurrentSeries := metrics.Series{&metrics.Serie{
Name: "datadog.djm.agent_host",
Expand Down Expand Up @@ -728,7 +744,7 @@ type aggregatorDeps struct {
}

func createAggrDeps(t *testing.T) aggregatorDeps {
deps := fxutil.Test[TestDeps](t, defaultforwarder.MockModule(), core.MockBundle(), compressionimpl.MockModule())
deps := fxutil.Test[TestDeps](t, defaultforwarder.MockModule(), core.MockBundle(), compressionimpl.MockModule(), noophaagent.Module())

opts := demuxTestOptions()
return aggregatorDeps{
Expand Down
3 changes: 2 additions & 1 deletion pkg/aggregator/check_sampler_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl"
noophaagent "github.com/DataDog/datadog-agent/comp/haagent/impl-noop"
"github.com/DataDog/datadog-agent/comp/serializer/compression"

//nolint:revive // TODO(AML) Fix revive linter
Expand Down Expand Up @@ -51,7 +52,7 @@ func benchmarkAddBucket(bucketValue int64, b *testing.B) {
sharedForwarder := forwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), deps.Log, forwarderOpts)
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname))
demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.Compressor, taggerComponent, "hostname")
demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, noophaagent.NewNoopHaAgent(), deps.Compressor, taggerComponent, "hostname")
defer demux.Stop(true)

checkSampler := newCheckSampler(1, true, true, 1000, tags.NewStore(true, "bench"), checkid.ID("hello:world:1234"), taggerComponent)
Expand Down
Loading

0 comments on commit e3bd8e5

Please sign in to comment.