Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASCII-1081] Remove Stop method from demultiplexer interface #22069

Merged
merged 9 commits into from
Jan 26, 2024
12 changes: 4 additions & 8 deletions cmd/agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func run(log log.Component,
agentAPI internalAPI.Component,
) error {
defer func() {
stopAgent(cliParams, server, demultiplexer, agentAPI)
stopAgent(cliParams, server, agentAPI)
}()

// prepare go runtime
Expand Down Expand Up @@ -612,12 +612,12 @@ func startAgent(
}

// StopAgentWithDefaults is a temporary way for other packages to use stopAgent.
func StopAgentWithDefaults(server dogstatsdServer.Component, demultiplexer demultiplexer.Component, agentAPI internalAPI.Component) {
stopAgent(&cliParams{GlobalParams: &command.GlobalParams{}}, server, demultiplexer, agentAPI)
func StopAgentWithDefaults(server dogstatsdServer.Component, agentAPI internalAPI.Component) {
stopAgent(&cliParams{GlobalParams: &command.GlobalParams{}}, server, agentAPI)
}

// stopAgent Tears down the agent process
func stopAgent(cliParams *cliParams, server dogstatsdServer.Component, demultiplexer demultiplexer.Component, agentAPI internalAPI.Component) {
func stopAgent(cliParams *cliParams, server dogstatsdServer.Component, agentAPI internalAPI.Component) {
// retrieve the agent health before stopping the components
// GetReadyNonBlocking has a 100ms timeout to avoid blocking
health, err := health.GetReadyNonBlocking()
Expand All @@ -644,10 +644,6 @@ func stopAgent(cliParams *cliParams, server dogstatsdServer.Component, demultipl
clcrunnerapi.StopCLCRunnerServer()
jmx.StopJmxfetch()

if demultiplexer != nil {
demultiplexer.Stop(true)
}

gui.StopGUIServer()
profiler.Stop()

Expand Down
1 change: 0 additions & 1 deletion cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ func start(log log.Component, config config.Component, telemetry telemetry.Compo

close(stopCh)

demultiplexer.Stop(true)
if err := metricsServer.Shutdown(context.Background()); err != nil {
pkglog.Errorf("Error shutdowning metrics server on port %d: %v", metricsPort, err)
}
Expand Down
3 changes: 3 additions & 0 deletions comp/aggregator/demultiplexer/component_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
// Mock implements mock-specific methods.
type Mock interface {
SetDefaultSender(sender.Sender)
Stop(bool)
Component
}

Expand All @@ -32,4 +33,6 @@ type FakeSamplerMock interface {
Reset()

GetAgentDemultiplexer() *aggregator.AgentDemultiplexer

Stop(bool)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func Module() fxutil.Module {

type dependencies struct {
fx.In
Lc fx.Lifecycle
Log log.Component
SharedForwarder defaultforwarder.Component
OrchestratorForwarder orchestratorforwarder.Component
Expand Down Expand Up @@ -74,6 +75,10 @@ func newDemultiplexer(deps dependencies) (provides, error) {
demultiplexer := demultiplexer{
AgentDemultiplexer: agentDemultiplexer,
}
deps.Lc.Append(fx.Hook{OnStop: func(ctx context.Context) error {
agentDemultiplexer.Stop(true)
return nil
}})
Comment on lines +78 to +81
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it change the order in which the components are stopped, similarly to the bug we had a few months ago ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed, that is why I added QA instructions and carefully check it is OK to stop the aggregator later.


return provides{
Comp: demultiplexer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package demultiplexerimpl

import (
"context"
"time"

demultiplexerComp "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
Expand All @@ -25,20 +26,36 @@ func FakeSamplerMockModule() fxutil.Module {

type fakeSamplerMockDependencies struct {
fx.In
Lc fx.Lifecycle
Log log.Component
}

type fakeSamplerMock struct {
*TestAgentDemultiplexer
stopped bool
}

func (f fakeSamplerMock) GetAgentDemultiplexer() *aggregator.AgentDemultiplexer {
func (f *fakeSamplerMock) GetAgentDemultiplexer() *aggregator.AgentDemultiplexer {
return f.TestAgentDemultiplexer.AgentDemultiplexer
}

func (f *fakeSamplerMock) Stop(flush bool) {
if !f.stopped {
f.TestAgentDemultiplexer.Stop(flush)
f.stopped = true
}
}

func newFakeSamplerMock(deps fakeSamplerMockDependencies) demultiplexerComp.FakeSamplerMock {
demux := initTestAgentDemultiplexerWithFlushInterval(deps.Log, time.Hour)
return fakeSamplerMock{
mock := &fakeSamplerMock{
TestAgentDemultiplexer: demux,
}

deps.Lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
mock.Stop(false)
return nil
}})
return mock
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func MockModule() fxutil.Module {
}

type mock struct {
demultiplexerComp.Component
*aggregator.AgentDemultiplexer
sender *sender.Sender
}

Expand All @@ -36,7 +36,7 @@ func (m *mock) GetDefaultSender() (sender.Sender, error) {
if m.sender != nil {
return *m.sender, nil
}
return m.Component.GetDefaultSender()
return m.AgentDemultiplexer.GetDefaultSender()
}

func (m *mock) LazyGetSenderManager() (sender.SenderManager, error) {
Expand All @@ -56,10 +56,7 @@ func newMock(deps mockDependencies) (demultiplexerComp.Component, demultiplexerC
Log: deps.Log,
SharedForwarder: defaultforwarder.NoopForwarder{},
}
demultiplexer := demultiplexer{
AgentDemultiplexer: aggregator.InitAndStartAgentDemultiplexerForTest(aggDeps, opts, ""),
}

instance := &mock{Component: demultiplexer}
instance := &mock{AgentDemultiplexer: aggregator.InitAndStartAgentDemultiplexerForTest(aggDeps, opts, "")}
return instance, instance
}
1 change: 0 additions & 1 deletion comp/dogstatsd/server/server_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func benchParsePackets(b *testing.B, rawPacket []byte) {
pkgconfig.SetupLogger("", "off", "", "", false, true, false)

demux := deps.Demultiplexer
defer demux.Stop(false)
_ = s.Start(demux)
defer s.Stop()

Expand Down
10 changes: 0 additions & 10 deletions comp/dogstatsd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func TestNewServer(t *testing.T) {

deps := fulfillDepsWithConfigOverride(t, cfg)
demux := createDemultiplexer(t)
defer demux.Stop(false)
requireStart(t, deps.Server, demux)

deps.Server.Stop()
Expand All @@ -101,7 +100,6 @@ func TestStopServer(t *testing.T) {
deps := fulfillDepsWithConfigOverride(t, cfg)

demux := createDemultiplexer(t)
defer demux.Stop(false)
requireStart(t, deps.Server, demux)
deps.Server.Stop()

Expand Down Expand Up @@ -154,7 +152,6 @@ func TestUDPReceive(t *testing.T) {

deps := fulfillDepsWithConfigOverride(t, cfg)
demux := deps.Demultiplexer
defer demux.Stop(false)
requireStart(t, deps.Server, demux)
defer deps.Server.Stop()

Expand Down Expand Up @@ -437,7 +434,6 @@ func TestUDPForward(t *testing.T) {
defer pc.Close()

demux := deps.Demultiplexer
defer demux.Stop(false)
requireStart(t, deps.Server, demux)
defer deps.Server.Stop()

Expand Down Expand Up @@ -469,7 +465,6 @@ func TestHistToDist(t *testing.T) {
deps := fulfillDepsWithConfigOverride(t, cfg)

demux := createDemultiplexer(t)
defer demux.Stop(false)
requireStart(t, deps.Server, demux)
defer deps.Server.Stop()

Expand Down Expand Up @@ -589,7 +584,6 @@ func TestE2EParsing(t *testing.T) {
assert.Equal(t, 0, len(timedSamples))
deps.Server.Stop()
demux.Reset()
demux.Stop(false)
}

func TestExtraTags(t *testing.T) {
Expand Down Expand Up @@ -665,7 +659,6 @@ func TestNoMappingsConfig(t *testing.T) {
samples := []metrics.MetricSample{}

demux := deps.Demultiplexer
defer demux.Stop(false)
requireStart(t, s, demux)

assert.Nil(t, s.mapper)
Expand Down Expand Up @@ -777,7 +770,6 @@ dogstatsd_mapper_profiles:
cw.SetWithoutSource("dogstatsd_port", listeners.RandomPortName)

demux := deps.Demultiplexer
defer demux.Stop(false)
requireStart(t, s, demux)

assert.Equal(t, deps.Config.Get("dogstatsd_mapper_cache_size"), scenario.expectedCacheSize, "Case `%s` failed. cache_size `%s` should be `%s`", scenario.name, deps.Config.Get("dogstatsd_mapper_cache_size"), scenario.expectedCacheSize)
Expand Down Expand Up @@ -837,7 +829,6 @@ func TestNewServerExtraTags(t *testing.T) {
require.Equal(s.extraTags[0], "extra:tags", "the tag extra:tags should be set")
require.Equal(s.extraTags[1], "hello:world", "the tag hello:world should be set")
s.Stop()
demux.Stop(false)
}

func TestProcessedMetricsOrigin(t *testing.T) {
Expand All @@ -851,7 +842,6 @@ func TestProcessedMetricsOrigin(t *testing.T) {
assert := assert.New(t)

demux := deps.Demultiplexer
defer demux.Stop(false)
requireStart(t, s, demux)

s.Stop()
Expand Down
6 changes: 0 additions & 6 deletions pkg/aggregator/demultiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ import (
type Demultiplexer interface {
// General
// --

// Run runs all demultiplexer parts
Run()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the Run method used anywhere ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run is called at https://github.com/DataDog/datadog-agent/blob/7.50.0/pkg/aggregator/demultiplexer_agent.go#L130 and doesn't need to be exported. I have created a commit to not export the function.

// Stop stops the demultiplexer.
// Resources are released, the instance should not be used after a call to `Stop()`.
Stop(flush bool)
// Serializer returns the serializer used by the Demultiplexer instance.
Serializer() serializer.MetricSerializer

Expand Down
6 changes: 3 additions & 3 deletions pkg/aggregator/demultiplexer_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type dataOutputs struct {
// In the future, goroutines will be started for the event platform forwarder and/or orchestrator forwarder.
func InitAndStartAgentDemultiplexer(log log.Component, sharedForwarder forwarder.Forwarder, orchestratorForwarder orchestratorforwarder.Component, options AgentDemultiplexerOptions, hostname string) *AgentDemultiplexer {
demux := initAgentDemultiplexer(log, sharedForwarder, orchestratorForwarder, options, hostname)
go demux.Run()
go demux.run()
return demux
}

Expand Down Expand Up @@ -261,8 +261,8 @@ func (d *AgentDemultiplexer) AddAgentStartupTelemetry(agentVersion string) {
}
}

// Run runs all demultiplexer parts
func (d *AgentDemultiplexer) Run() {
// run runs all demultiplexer parts
func (d *AgentDemultiplexer) run() {
if !d.options.DontStartForwarders {
d.log.Debugf("Starting forwarders")

Expand Down
2 changes: 1 addition & 1 deletion pkg/aggregator/demultiplexer_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestDemuxNoAggOptionEnabled(t *testing.T) {
demux := initAgentDemultiplexer(log, NewForwarderTest(log), &orchestratorForwarder, opts, "")
demux.statsd.noAggStreamWorker.serializer = mockSerializer // the no agg pipeline will use our mocked serializer

go demux.Run()
go demux.run()

batch := testDemuxSamples(t)

Expand Down
2 changes: 1 addition & 1 deletion pkg/metadata/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestTriggerAndResetCollectorTimer(t *testing.T) {

type deps struct {
fx.In
Demultiplexer demultiplexer.Component
Demultiplexer demultiplexer.Mock
}

func buildDeps(t *testing.T) deps {
Expand Down
4 changes: 0 additions & 4 deletions pkg/serverless/invocationlifecycle/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,6 @@ func TestTriggerTypesLifecycleEventForAPIGateway5xxResponse(t *testing.T) {
tracePayload = payload
}
demux := createDemultiplexer(t)
defer demux.Stop(false)

testProcessor := &LifecycleProcessor{
ExtraTags: extraTags,
Expand Down Expand Up @@ -508,7 +507,6 @@ func TestTriggerTypesLifecycleEventForAPIGatewayNonProxy5xxResponse(t *testing.T
tracePayload = payload
}
demux := createDemultiplexer(t)
defer demux.Stop(false)

testProcessor := &LifecycleProcessor{
ExtraTags: extraTags,
Expand Down Expand Up @@ -597,7 +595,6 @@ func TestTriggerTypesLifecycleEventForAPIGatewayWebsocket5xxResponse(t *testing.
tracePayload = payload
}
demux := createDemultiplexer(t)
defer demux.Stop(false)

testProcessor := &LifecycleProcessor{
ExtraTags: extraTags,
Expand Down Expand Up @@ -683,7 +680,6 @@ func TestTriggerTypesLifecycleEventForALB5xxResponse(t *testing.T) {
tracePayload = payload
}
demux := createDemultiplexer(t)
defer demux.Stop(false)

testProcessor := &LifecycleProcessor{
ExtraTags: extraTags,
Expand Down
Loading
Loading