Skip to content

Commit

Permalink
add comp
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandreYang committed Nov 17, 2024
1 parent e5d0250 commit 9b31710
Show file tree
Hide file tree
Showing 18 changed files with 38 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package demultiplexerimpl
import (
"context"

haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
"go.uber.org/fx"

demultiplexerComp "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
Expand Down Expand Up @@ -42,6 +43,7 @@ type dependencies struct {
SharedForwarder defaultforwarder.Component
OrchestratorForwarder orchestratorforwarder.Component
EventPlatformForwarder eventplatform.Component
HaAgent haagent.Component
Compressor compression.Component
Tagger tagger.Component

Expand Down Expand Up @@ -86,6 +88,7 @@ func newDemultiplexer(deps dependencies) (provides, error) {
deps.OrchestratorForwarder,
options,
deps.EventPlatformForwarder,
deps.HaAgent,
deps.Compressor,
deps.Tagger,
hostnameDetected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,6 @@ func initTestAgentDemultiplexerWithFlushInterval(log log.Component, hostname hos
sharedForwarder := defaultforwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), log, sharedForwarderOptions)
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostname))
demux := aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, compressor, noopimpl.NewComponent(), "hostname")
demux := aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, nil, compressor, noopimpl.NewComponent(), "hostname")
return NewTestAgentDemultiplexer(demux)
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,7 @@ func (sender *diagnoseSenderManager) LazyGetSenderManager() (sender.SenderManage
forwarder := defaultforwarder.NewDefaultForwarder(config, log, defaultforwarder.NewOptions(config, log, nil))
orchestratorForwarder := optional.NewOptionPtr[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(sender.deps.Hostname))
senderManager = aggregator.InitAndStartAgentDemultiplexer(
log,
forwarder,
orchestratorForwarder,
opts,
eventPlatformForwarder,
sender.deps.Compressor,
sender.deps.Tagger,
hostnameDetected)
senderManager = aggregator.InitAndStartAgentDemultiplexer(log, forwarder, orchestratorForwarder, opts, eventPlatformForwarder, nil, sender.deps.Compressor, sender.deps.Tagger, hostnameDetected)

sender.senderManager.Set(senderManager)
return senderManager, nil
Expand Down
9 changes: 6 additions & 3 deletions pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
"github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
"github.com/DataDog/datadog-agent/pkg/config/model"
Expand Down Expand Up @@ -252,6 +253,7 @@ type BufferedAggregator struct {
flushMutex sync.Mutex // to start multiple flushes in parallel
serializer serializer.MetricSerializer
eventPlatformForwarder eventplatform.Component
haAgent haagent.Component
hostname string
hostnameUpdate chan string
hostnameUpdateDone chan struct{} // signals that the hostname update is finished
Expand Down Expand Up @@ -283,7 +285,7 @@ func NewFlushAndSerializeInParallel(config model.Config) FlushAndSerializeInPara
}

// NewBufferedAggregator instantiates a BufferedAggregator
func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator {
func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator {
bufferSize := pkgconfigsetup.Datadog().GetInt("aggregator_buffer_size")

agentName := flavor.GetFlavor()
Expand Down Expand Up @@ -326,6 +328,7 @@ func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder
flushInterval: flushInterval,
serializer: s,
eventPlatformForwarder: eventPlatformForwarder,
haAgent: haAgent,
hostname: hostname,
hostnameUpdate: make(chan string),
hostnameUpdateDone: make(chan struct{}),
Expand Down Expand Up @@ -861,8 +864,8 @@ func (agg *BufferedAggregator) tags(withVersion bool) []string {
tags = append(tags, "package_version:"+version.AgentPackageVersion)
}
}
if haagent.IsEnabled() {
tags = append(tags, "ha_agent_group:"+haagent.GetGroup())
if agg.haAgent != nil && agg.haAgent.Enabled() {
tags = append(tags, "ha_agent_group:"+agg.haAgent.GetGroup())
}
// nil to empty string
// This is expected by other components/tests
Expand Down
10 changes: 5 additions & 5 deletions pkg/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestAddServiceCheckDefaultValues(t *testing.T) {

s := &MockSerializerIterableSerie{}
taggerComponent := taggerMock.SetupFakeTagger(t)
agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval)
agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval)

agg.addServiceCheck(servicecheck.ServiceCheck{
// leave Host and Ts fields blank
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestAddEventDefaultValues(t *testing.T) {

s := &MockSerializerIterableSerie{}
taggerComponent := taggerMock.SetupFakeTagger(t)
agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval)
agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval)

agg.addEvent(event.Event{
// only populate required fields
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestDefaultData(t *testing.T) {

s := &MockSerializerIterableSerie{}
taggerComponent := taggerMock.SetupFakeTagger(t)
agg := NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval)
agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "hostname", DefaultFlushInterval)

start := time.Now()

Expand Down Expand Up @@ -585,7 +585,7 @@ func TestTags(t *testing.T) {

taggerComponent := taggerMock.SetupFakeTagger(t)

agg := NewBufferedAggregator(nil, nil, taggerComponent, tt.hostname, time.Second)
agg := NewBufferedAggregator(nil, nil, nil, taggerComponent, tt.hostname, time.Second)
agg.agentTags = tt.agentTags
agg.globalTags = tt.globalTags
assert.ElementsMatch(t, tt.want, agg.tags(tt.withVersion))
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestAddDJMRecurrentSeries(t *testing.T) {
s := &MockSerializerIterableSerie{}
// NewBufferedAggregator with DJM enable will create a new recurrentSeries
taggerComponent := taggerMock.SetupFakeTagger(t)
NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval)
NewBufferedAggregator(s, nil, nil, taggerComponent, "hostname", DefaultFlushInterval)

expectedRecurrentSeries := metrics.Series{&metrics.Serie{
Name: "datadog.djm.agent_host",
Expand Down
2 changes: 1 addition & 1 deletion pkg/aggregator/check_sampler_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func benchmarkAddBucket(bucketValue int64, b *testing.B) {
sharedForwarder := forwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), deps.Log, forwarderOpts)
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname))
demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.Compressor, taggerComponent, "hostname")
demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, nil, deps.Compressor, taggerComponent, "hostname")
defer demux.Stop(true)

checkSampler := newCheckSampler(1, true, true, 1000, tags.NewStore(true, "bench"), checkid.ID("hello:world:1234"), taggerComponent)
Expand Down
25 changes: 5 additions & 20 deletions pkg/aggregator/demultiplexer_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
forwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator"
haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
"github.com/DataDog/datadog-agent/comp/serializer/compression"
"github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
Expand Down Expand Up @@ -119,29 +120,13 @@ type dataOutputs struct {
// InitAndStartAgentDemultiplexer creates a new Demultiplexer and runs what's necessary
// in goroutines. As of today, only the embedded BufferedAggregator needs a separate goroutine.
// In the future, goroutines will be started for the event platform forwarder and/or orchestrator forwarder.
func InitAndStartAgentDemultiplexer(
log log.Component,
sharedForwarder forwarder.Forwarder,
orchestratorForwarder orchestratorforwarder.Component,
options AgentDemultiplexerOptions,
eventPlatformForwarder eventplatform.Component,
compressor compression.Component,
tagger tagger.Component,
hostname string) *AgentDemultiplexer {
demux := initAgentDemultiplexer(log, sharedForwarder, orchestratorForwarder, options, eventPlatformForwarder, compressor, tagger, hostname)
func InitAndStartAgentDemultiplexer(log log.Component, sharedForwarder forwarder.Forwarder, orchestratorForwarder orchestratorforwarder.Component, options AgentDemultiplexerOptions, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, compressor compression.Component, tagger tagger.Component, hostname string) *AgentDemultiplexer {
demux := initAgentDemultiplexer(log, sharedForwarder, orchestratorForwarder, options, eventPlatformForwarder, haAgent, compressor, tagger, hostname)
go demux.run()
return demux
}

func initAgentDemultiplexer(
log log.Component,
sharedForwarder forwarder.Forwarder,
orchestratorForwarder orchestratorforwarder.Component,
options AgentDemultiplexerOptions,
eventPlatformForwarder eventplatform.Component,
compressor compression.Component,
tagger tagger.Component,
hostname string) *AgentDemultiplexer {
func initAgentDemultiplexer(log log.Component, sharedForwarder forwarder.Forwarder, orchestratorForwarder orchestratorforwarder.Component, options AgentDemultiplexerOptions, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, compressor compression.Component, tagger tagger.Component, hostname string) *AgentDemultiplexer {
// prepare the multiple forwarders
// -------------------------------
if pkgconfigsetup.Datadog().GetBool("telemetry.enabled") && pkgconfigsetup.Datadog().GetBool("telemetry.dogstatsd_origin") && !pkgconfigsetup.Datadog().GetBool("aggregator_use_tags_store") {
Expand All @@ -157,7 +142,7 @@ func initAgentDemultiplexer(
// prepare the embedded aggregator
// --

agg := NewBufferedAggregator(sharedSerializer, eventPlatformForwarder, tagger, hostname, options.FlushInterval)
agg := NewBufferedAggregator(sharedSerializer, eventPlatformForwarder, haAgent, tagger, hostname, options.FlushInterval)

// statsd samplers
// ---------------
Expand Down
4 changes: 2 additions & 2 deletions pkg/aggregator/demultiplexer_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestDemuxNoAggOptionDisabled(t *testing.T) {
opts := demuxTestOptions()
deps := createDemultiplexerAgentTestDeps(t)

demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, deps.Compressor, deps.Tagger, "")
demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, nil, deps.Compressor, deps.Tagger, "")

batch := testDemuxSamples(t)

Expand All @@ -87,7 +87,7 @@ func TestDemuxNoAggOptionEnabled(t *testing.T) {
mockSerializer.On("AreSketchesEnabled").Return(true)
opts.EnableNoAggregationPipeline = true
deps := createDemultiplexerAgentTestDeps(t)
demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, deps.Compressor, deps.Tagger, "")
demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, nil, deps.Compressor, deps.Tagger, "")
demux.statsd.noAggStreamWorker.serializer = mockSerializer // the no agg pipeline will use our mocked serializer

go demux.run()
Expand Down
2 changes: 1 addition & 1 deletion pkg/aggregator/demultiplexer_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ type TestDeps struct {
func InitAndStartAgentDemultiplexerForTest(deps TestDeps, options AgentDemultiplexerOptions, hostname string) *AgentDemultiplexer {
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname))
return InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.Compressor, nooptagger.NewComponent(), hostname)
return InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, nil, deps.Compressor, nooptagger.NewComponent(), hostname)
}
4 changes: 2 additions & 2 deletions pkg/aggregator/demultiplexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestDemuxFlushAggregatorToSerializer(t *testing.T) {
opts := demuxTestOptions()
opts.FlushInterval = time.Hour
deps := createDemuxDeps(t, opts, eventplatformimpl.NewDefaultParams())
demux := initAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorFwd, opts, deps.EventPlatformFwd, deps.Compressor, nooptagger.NewComponent(), "")
demux := initAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorFwd, opts, deps.EventPlatformFwd, nil, deps.Compressor, nooptagger.NewComponent(), "")
demux.Aggregator().tlmContainerTagsEnabled = false
require.NotNil(demux)
require.NotNil(demux.aggregator)
Expand Down Expand Up @@ -300,7 +300,7 @@ func createDemuxDepsWithOrchestratorFwd(

return aggregatorDeps{
TestDeps: deps.TestDeps,
Demultiplexer: InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorForwarder, opts, deps.Eventplatform, deps.Compressor, nooptagger.NewComponent(), ""),
Demultiplexer: InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorForwarder, opts, deps.Eventplatform, nil, deps.Compressor, nooptagger.NewComponent(), ""),
OrchestratorFwd: deps.OrchestratorForwarder,
EventPlatformFwd: deps.Eventplatform,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/aggregator/mocksender/mocksender.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func CreateDefaultDemultiplexer() *aggregator.AgentDemultiplexer {
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostnameimpl.NewHostnameService()))
taggerComponent := nooptagger.NewComponent()
return aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, compressionimpl.NewMockCompressor(), taggerComponent, "")
return aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, nil, compressionimpl.NewMockCompressor(), taggerComponent, "")

}

Expand Down
2 changes: 1 addition & 1 deletion pkg/aggregator/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func testDemux(log log.Component, hostname hostname.Component) *AgentDemultiplex
opts.DontStartForwarders = true
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostname))
demux := initAgentDemultiplexer(log, NewForwarderTest(log), &orchestratorForwarder, opts, eventPlatformForwarder, compressionimpl.NewMockCompressor(), nooptagger.NewComponent(), defaultHostname)
demux := initAgentDemultiplexer(log, NewForwarderTest(log), &orchestratorForwarder, opts, eventPlatformForwarder, nil, compressionimpl.NewMockCompressor(), nooptagger.NewComponent(), defaultHostname)
return demux
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func TestProfileBundleJsonZip(t *testing.T) {
timeNow = common.MockTimeNow
aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "zipprofiles.d"))
pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

func TestProfileMetadata_f5(t *testing.T) {
timeNow = common.MockTimeNow
aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d"))
pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath)

Expand Down
6 changes: 3 additions & 3 deletions pkg/collector/corechecks/snmp/integration_topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

func TestTopologyPayload_LLDP(t *testing.T) {
timeNow = common.MockTimeNow
aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d"))
pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath)

Expand Down Expand Up @@ -734,7 +734,7 @@ profiles:

func TestTopologyPayload_CDP(t *testing.T) {
timeNow = common.MockTimeNow
aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d"))
pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath)

Expand Down Expand Up @@ -1427,7 +1427,7 @@ profiles:
// we have different data for LLDP and CDP to test that we're only using LLDP to build the links
func TestTopologyPayload_LLDP_CDP(t *testing.T) {
timeNow = common.MockTimeNow
aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d"))
pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func TestConfigurations(t *testing.T) {
profile.SetConfdPathAndCleanProfiles()
aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)

// language=yaml
rawInstanceConfig := []byte(`
Expand Down Expand Up @@ -326,7 +326,7 @@ profiles:

func TestInlineProfileConfiguration(t *testing.T) {
profile.SetConfdPathAndCleanProfiles()
aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)

// language=yaml
rawInstanceConfig := []byte(`
Expand Down
2 changes: 1 addition & 1 deletion pkg/collector/corechecks/systemd/systemd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ unit_names:
func TestCheckID(t *testing.T) {
check1 := newCheck()
check2 := newCheck()
aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)
aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewComponent(), "", 1*time.Hour)

// language=yaml
rawInstanceConfig1 := []byte(`
Expand Down
2 changes: 1 addition & 1 deletion test/benchmarks/kubernetes_state/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func main() {
* As it has a `nil` serializer, it will panic if it tries to flush the metrics.
* That’s why we need a big enough flush interval
*/
aggregator.NewBufferedAggregator(nil, "", 1*time.Hour)
aggregator.NewBufferedAggregator(nil, "", nil, 1*time.Hour)

/*
* Wait for informers to get populated
Expand Down

0 comments on commit 9b31710

Please sign in to comment.