diff --git a/cmd/newrelic-infra/newrelic-infra.go b/cmd/newrelic-infra/newrelic-infra.go index 0b8286497..35502e2cc 100644 --- a/cmd/newrelic-infra/newrelic-infra.go +++ b/cmd/newrelic-infra/newrelic-infra.go @@ -364,7 +364,7 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error { tracker := track.NewTracker(dmEmitter) integrationEmitter := emitter.NewIntegrationEmittor(agt, dmEmitter, ffManager) - integrationManager := v4.NewManager(integrationCfg, integrationEmitter, il, definitionQ, terminateDefinitionQ, configEntryQ, tracker) + integrationManager := v4.NewManager(integrationCfg, integrationEmitter, il, definitionQ, terminateDefinitionQ, configEntryQ, tracker, agt.Context.IDLookup()) // Command channel handlers backoffSecsC := make(chan int, 1) // 1 won't block on initial cmd-channel fetch diff --git a/internal/integrations/v4/constants/constants.go b/internal/integrations/v4/constants/constants.go index 09047e6ce..c28a9939d 100644 --- a/internal/integrations/v4/constants/constants.go +++ b/internal/integrations/v4/constants/constants.go @@ -3,3 +3,4 @@ package constants const EnableVerbose = "enable_verbose" +const HostID = "host_id" diff --git a/internal/integrations/v4/executor/executor.go b/internal/integrations/v4/executor/executor.go index c677ba9f5..f9d9ec2ab 100644 --- a/internal/integrations/v4/executor/executor.go +++ b/internal/integrations/v4/executor/executor.go @@ -151,6 +151,12 @@ func (r *Executor) buildCommand(ctx context.Context) *exec.Cmd { cmd.Env = append(cmd.Env, "VERBOSE=1") } + hostID, ok := ctx.Value(constants.HostID).(string) + + if ok && hostID != "" { + cmd.Env = append(cmd.Env, "NRI_HOST_ID="+hostID) + } + cmd.Dir = r.Cfg.Directory return cmd } diff --git a/internal/integrations/v4/runner/group.go b/internal/integrations/v4/runner/group.go index b426cfcf4..573dbc03a 100644 --- a/internal/integrations/v4/runner/group.go +++ b/internal/integrations/v4/runner/group.go @@ -4,6 +4,7 @@ package runner import ( "context" + "github.com/newrelic/infrastructure-agent/pkg/entity/host" "github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration" "github.com/newrelic/infrastructure-agent/pkg/databind/pkg/databind" @@ -29,6 +30,7 @@ type Group struct { cmdReqHandle cmdrequest.HandleFn configHandle configrequest.HandleFn terminateDefinitionQ chan string + idLookup host.IDLookup } type runnerErrorHandler func(ctx context.Context, errs <-chan error) @@ -44,6 +46,7 @@ func NewGroup( configHandle configrequest.HandleFn, cfgPath string, terminateDefinitionQ chan string, + idLookup host.IDLookup, ) (g Group, c FeaturesCache, err error) { g, c, err = loadFn(il, passthroughEnv, cfgPath, cmdReqHandle, configHandle, terminateDefinitionQ) @@ -52,6 +55,7 @@ func NewGroup( } g.emitter = emitter + g.idLookup = idLookup return } @@ -60,7 +64,7 @@ func NewGroup( // provided context func (g *Group) Run(ctx context.Context) (hasStartedAnyOHI bool) { for _, integr := range g.integrations { - go NewRunner(integr, g.emitter, g.dSources, g.handleErrorsProvide, g.cmdReqHandle, g.configHandle, g.terminateDefinitionQ).Run(ctx, nil, nil) + go NewRunner(integr, g.emitter, g.dSources, g.handleErrorsProvide, g.cmdReqHandle, g.configHandle, g.terminateDefinitionQ, g.idLookup).Run(ctx, nil, nil) hasStartedAnyOHI = true } diff --git a/internal/integrations/v4/runner/group_test.go b/internal/integrations/v4/runner/group_test.go index 1ae151b3c..0e355bf57 100644 --- a/internal/integrations/v4/runner/group_test.go +++ b/internal/integrations/v4/runner/group_test.go @@ -4,6 +4,7 @@ package runner import ( "context" + "github.com/newrelic/infrastructure-agent/pkg/entity/host" "strings" "sync" "testing" @@ -43,7 +44,7 @@ func TestGroup_Run(t *testing.T) { {InstanceName: "saygoodbye", Exec: testhelp.Command(fixtures.IntegrationScript, "bye")}, }, }, nil) - gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue) + gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{}) require.NoError(t, err) // WHEN the Group executes all the integrations @@ -81,7 +82,7 @@ func TestGroup_Run_Inventory(t *testing.T) { Labels: map[string]string{"foo": "bar", "ou": "yea"}}, }, }, nil) - gr, _, err := NewGroup(loader, integration.InstancesLookup{}, passthroughEnv, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue) + gr, _, err := NewGroup(loader, integration.InstancesLookup{}, passthroughEnv, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{}) require.NoError(t, err) // WHEN the integration is executed @@ -130,7 +131,7 @@ func TestGroup_Run_Inventory_OverridePrefix(t *testing.T) { InventorySource: "custom/inventory"}, }, }, nil) - gr, _, err := NewGroup(loader, integration.InstancesLookup{}, passthroughEnv, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue) + gr, _, err := NewGroup(loader, integration.InstancesLookup{}, passthroughEnv, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{}) require.NoError(t, err) // WHEN the integration is executed @@ -156,7 +157,7 @@ func TestGroup_Run_Timeout(t *testing.T) { {InstanceName: "Hello", Exec: testhelp.Command(fixtures.BlockedCmd), Timeout: &to}, }, }, nil) - gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue) + gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{}) require.NoError(t, err) errs := interceptGroupErrors(&gr) @@ -247,7 +248,7 @@ func TestGroup_Run_ConfigPathUpdated(t *testing.T) { Config: "hello", }}, }, nil) - group, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue) + group, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{}) require.NoError(t, err) // shortening the interval to avoid long tests group.integrations[0].Interval = 100 * time.Millisecond @@ -324,7 +325,7 @@ func TestGroup_Run_IntegrationScriptPrintsErrorsAndReturnCodeIsZero(t *testing.T {InstanceName: "log_errors", Exec: testhelp.Command(fixtures.IntegrationPrintsErr, "bye")}, }, }, nil) - gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue) + gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{}) require.NoError(t, err) // WHEN we add a hook to the log to capture the "error" and "fatal" levels diff --git a/internal/integrations/v4/runner/runner.go b/internal/integrations/v4/runner/runner.go index d6001b62e..7c2a2883d 100644 --- a/internal/integrations/v4/runner/runner.go +++ b/internal/integrations/v4/runner/runner.go @@ -6,6 +6,8 @@ import ( "bytes" "context" "github.com/newrelic/infrastructure-agent/internal/agent/instrumentation" + "github.com/newrelic/infrastructure-agent/internal/integrations/v4/constants" + "github.com/newrelic/infrastructure-agent/pkg/entity/host" "regexp" "strings" "sync" @@ -60,6 +62,7 @@ type runner struct { heartBeatMutex sync.RWMutex cache cache.Cache terminateQueue chan<- string + idLookup host.IDLookup } // NewRunner creates an integration runner instance. @@ -72,6 +75,7 @@ func NewRunner( cmdReqHandle cmdrequest.HandleFn, configHandle configrequest.HandleFn, terminateQ chan<- string, + idLookup host.IDLookup, ) *runner { r := &runner{ emitter: emitter, @@ -83,6 +87,7 @@ func NewRunner( stderrParser: parseLogrusFields, terminateQueue: terminateQ, cache: cache.CreateCache(), + idLookup: idLookup, } if handleErrorsProvide != nil { r.handleErrors = handleErrorsProvide() @@ -210,6 +215,16 @@ func (r *runner) execute(ctx context.Context, matches *databind.Values, pidWCh, r.setHeartBeat(act.HeartBeat) } + // add hostID in the context to fetch and set in executor + hostID, err := r.idLookup.AgentShortEntityName() + + if err == nil { + ctx = contextWithHostID(ctx, hostID) + } else { + txn.NoticeError(err) + r.log.WithError(err).Error("can't fetch host ID") + } + // Runs all the matching integration instances outputs, err := r.definition.Run(ctx, matches, pidWCh, exitCodeCh) if err != nil { @@ -362,6 +377,10 @@ func (r *runner) handleLines(stdout <-chan []byte, extraLabels data.Map, entityR } } +func contextWithHostID(ctx context.Context, hostID string) context.Context { + return context.WithValue(ctx, constants.HostID, hostID) +} + func isHeartBeat(line []byte) bool { return bytes.Equal(bytes.Trim(line, " "), heartBeatJSON) } diff --git a/internal/integrations/v4/runner/runner_test.go b/internal/integrations/v4/runner/runner_test.go index 831e2a77c..6a58a1ee3 100644 --- a/internal/integrations/v4/runner/runner_test.go +++ b/internal/integrations/v4/runner/runner_test.go @@ -2,6 +2,7 @@ package runner import ( "context" + "github.com/newrelic/infrastructure-agent/pkg/entity/host" "io/ioutil" "os" "runtime" @@ -50,7 +51,7 @@ func Test_runner_Run(t *testing.T) { require.NoError(t, err) e := &testemit.RecordEmitter{} - r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, nil) + r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, nil, host.IDLookup{}) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() @@ -84,7 +85,7 @@ func Test_runner_Run_noHandleForCfgProtocol(t *testing.T) { require.NoError(t, err) e := &testemit.RecordEmitter{} - r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, nil, nil) + r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, nil, nil, host.IDLookup{}) // WHEN the runner executes the binary and handle the payload. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) @@ -127,7 +128,7 @@ func Test_runner_Run_failToUnMarshallCfgProtocol(t *testing.T) { require.NoError(t, err) e := &testemit.RecordEmitter{} - r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, nil) + r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, nil, host.IDLookup{}) // WHEN the runner executes the binary and handle the payload. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) @@ -166,7 +167,7 @@ func Test_runner_Run_handlesCfgProtocol(t *testing.T) { atomic.AddUint32(&called, 1) } e := &testemit.RecordEmitter{} - r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, mockHandleFn, nil) + r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, mockHandleFn, nil, host.IDLookup{}) // WHEN the runner executes the binary and handle the payload. ctx, cancel := context.WithTimeout(context.Background(), 1500*time.Millisecond) diff --git a/pkg/integrations/v4/manager.go b/pkg/integrations/v4/manager.go index e7539407f..538e2c1c8 100644 --- a/pkg/integrations/v4/manager.go +++ b/pkg/integrations/v4/manager.go @@ -5,6 +5,7 @@ package v4 import ( "context" "errors" + "github.com/newrelic/infrastructure-agent/pkg/entity/host" "io/ioutil" "os" "path/filepath" @@ -113,6 +114,7 @@ type Manager struct { handleCmdReq cmdrequest.HandleFn handleConfig configrequest.HandleFn tracker *track.Tracker + idLookup host.IDLookup } // groupContext pairs a runner.Group with its cancellation context @@ -191,6 +193,7 @@ func NewManager( terminateDefinitionQ chan string, configEntryQ chan configrequest.Entry, tracker *track.Tracker, + idLookup host.IDLookup, ) *Manager { watcher, err := fsnotify.NewWatcher() if err != nil { @@ -210,6 +213,7 @@ func NewManager( handleCmdReq: cmdrequest.NewHandleFn(definitionQ, il, illog), handleConfig: configrequest.NewHandleFn(configEntryQ, terminateDefinitionQ, il, illog), tracker: tracker, + idLookup: idLookup, } // Loads all the configuration files in the passed configFolders @@ -307,7 +311,7 @@ func (mgr *Manager) loadEnabledRunnerGroups(cfgs map[string]config2.YAML) { func (mgr *Manager) loadRunnerGroup(path string, cfg config2.YAML, cmdFF *runner.CmdFF) (*groupContext, error) { f := runner.NewFeatures(mgr.config.AgentFeatures, cmdFF) loader := runner.NewLoadFn(cfg, f) - gr, fc, err := runner.NewGroup(loader, mgr.lookup, mgr.config.PassthroughEnvironment, mgr.emitter, mgr.handleCmdReq, mgr.handleConfig, path, mgr.terminateDefinitionQueue) + gr, fc, err := runner.NewGroup(loader, mgr.lookup, mgr.config.PassthroughEnvironment, mgr.emitter, mgr.handleCmdReq, mgr.handleConfig, path, mgr.terminateDefinitionQueue, mgr.idLookup) if err != nil { return nil, err } @@ -324,7 +328,7 @@ func (mgr *Manager) handleRequestsQueue(ctx context.Context) { return case def := <-mgr.definitionQueue: - r := runner.NewRunner(def, mgr.emitter, nil, nil, mgr.handleCmdReq, nil, mgr.terminateDefinitionQueue) + r := runner.NewRunner(def, mgr.emitter, nil, nil, mgr.handleCmdReq, nil, mgr.terminateDefinitionQueue, mgr.idLookup) if def.CmdChanReq != nil { // tracking so cmd requests can be stopped by hash runCtx, pidWCh := mgr.tracker.Track(ctx, def.CmdChanReq.CmdChannelCmdHash, &def) @@ -339,7 +343,7 @@ func (mgr *Manager) handleRequestsQueue(ctx context.Context) { } case entry := <-mgr.configEntryQueue: ds, _ := entry.Databind.DataSources() - r := runner.NewRunner(entry.Definition, mgr.emitter, ds, nil, nil, nil, mgr.terminateDefinitionQueue) + r := runner.NewRunner(entry.Definition, mgr.emitter, ds, nil, nil, nil, mgr.terminateDefinitionQueue, mgr.idLookup) runCtx, pidWCh := mgr.tracker.Track(ctx, entry.Definition.Hash(), &entry.Definition) go r.Run(runCtx, pidWCh, nil) diff --git a/pkg/integrations/v4/manager_test.go b/pkg/integrations/v4/manager_test.go index e8d505d94..e6a48b1e9 100644 --- a/pkg/integrations/v4/manager_test.go +++ b/pkg/integrations/v4/manager_test.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "github.com/newrelic/infrastructure-agent/pkg/entity/host" "io/ioutil" "os" "path/filepath" @@ -163,7 +164,7 @@ func TestManager_StartIntegrations(t *testing.T) { // AND an integrations manager emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // WHEN the manager loads and executes the integrations in the folder ctx, cancel := context.WithCancel(context.Background()) @@ -198,7 +199,7 @@ integrations: // AND an integrations manager emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // WHEN the manager loads and executes the integration ctx, cancel := context.WithCancel(context.Background()) @@ -223,7 +224,7 @@ integrations: // AND an integrations manager emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // WHEN the manager loads and executes the integration ctx, cancel := context.WithCancel(context.Background()) @@ -258,7 +259,7 @@ func TestManager_SkipLoadingV3IntegrationsWithNoWarnings(t *testing.T) { // AND an integrations manager emitter := &testemit.RecordEmitter{} - _ = NewManager(Configuration{ConfigFolders: []string{dir}}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + _ = NewManager(Configuration{ConfigFolders: []string{dir}}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // THEN no long entries found for i := range hook.AllEntries() { @@ -281,7 +282,7 @@ func TestManager_LogWarningForInvalidYaml(t *testing.T) { // AND an integrations manager emitter := &testemit.RecordEmitter{} - _ = NewManager(Configuration{ConfigFolders: []string{dir}}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + _ = NewManager(Configuration{ConfigFolders: []string{dir}}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // THEN one long entry found require.NotEmpty(t, hook.AllEntries()) @@ -300,7 +301,7 @@ func TestManager_Config_EmbeddedYAML(t *testing.T) { // AND an integrations manager emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // WHEN the manager loads and executes the integrations in the folder ctx, cancel := context.WithCancel(context.Background()) @@ -325,7 +326,7 @@ func TestManager_HotReload_Add(t *testing.T) { defer removeTempFiles(t, dir) emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mgr.Start(ctx) @@ -359,7 +360,7 @@ func TestManager_HotReload_Modify(t *testing.T) { defer removeTempFiles(t, dir) emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mgr.Start(ctx) @@ -406,7 +407,7 @@ func TestManager_HotReload_ModifyLinkFile(t *testing.T) { require.NoError(t, err) emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mgr.Start(ctx) @@ -455,7 +456,7 @@ func TestManager_HotReload_Delete(t *testing.T) { defer removeTempFiles(t, dir) emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mgr.Start(ctx) @@ -505,7 +506,7 @@ integrations: ConfigFolders: []string{configDir}, DefinitionFolders: []string{niDir}, PassthroughEnvironment: []string{"VALUE"}, - }, emitter, instancesLookupReturning(execPath), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, emitter, instancesLookupReturning(execPath), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mgr.Start(ctx) @@ -542,7 +543,7 @@ integrations: ConfigFolders: []string{configDir}, DefinitionFolders: []string{niDir}, PassthroughEnvironment: []string{"VALUE"}, - }, emitter, instancesLookupReturning(execPath), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, emitter, instancesLookupReturning(execPath), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mgr.Start(ctx) @@ -586,7 +587,7 @@ integrations: mgr := NewManager(Configuration{ ConfigFolders: []string{configDir}, DefinitionFolders: []string{definitionsDir}, - }, emitter, instancesLookupLegacy(definitionsDir), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, emitter, instancesLookupLegacy(definitionsDir), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mgr.Start(ctx) @@ -634,7 +635,7 @@ integrations: ConfigFolders: []string{configDir}, DefinitionFolders: []string{definitionsDir}, PassthroughEnvironment: []string{"VALUE"}, - }, emitter, instancesLookupLegacy(definitionsDir), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, emitter, instancesLookupLegacy(definitionsDir), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mgr.Start(ctx) @@ -677,7 +678,7 @@ integrations: mgr := NewManager(Configuration{ ConfigFolders: []string{configDir}, DefinitionFolders: []string{niDir, ciDir, "unexisting-dir"}, - }, emitter, instancesLookupReturning(execPath1, execPath2), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, emitter, instancesLookupReturning(execPath1, execPath2), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mgr.Start(ctx) @@ -717,7 +718,7 @@ integrations: mgr := NewManager(Configuration{ ConfigFolders: []string{configDir}, DefinitionFolders: []string{niDir}, - }, emitter, instancesLookupReturning(execPath), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, emitter, instancesLookupReturning(execPath), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mgr.Start(ctx) @@ -742,7 +743,7 @@ func TestManager_EnableFeature_WhenFeatureOnOHICfgAndAgentCfgIsDisabledAndEnable ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv, //AgentFeatures: map[string]bool{"docker_enabled": false}, - }, e, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, e, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // AND the manager loads and executes the integrations in the folder ctx, cancel := context.WithCancel(context.Background()) @@ -774,7 +775,7 @@ func TestManager_EnableFeatureFromAgentConfig(t *testing.T) { ConfigFolders: []string{dir}, AgentFeatures: map[string]bool{"docker_enabled": true}, PassthroughEnvironment: passthroughEnv, - }, e, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, e, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // AND the manager starts ctx, cancel := context.WithCancel(context.Background()) @@ -801,7 +802,7 @@ func TestManager_CCDisablesAgentEnabledFeature(t *testing.T) { ConfigFolders: []string{dir}, AgentFeatures: map[string]bool{"docker_enabled": true}, PassthroughEnvironment: passthroughEnv, - }, e, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, e, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // AND manager loads and executes the integrations in the folder ctx, cancel := context.WithCancel(context.Background()) @@ -836,7 +837,7 @@ func TestManager_CCDisablesPreviouslyEnabledFeature(t *testing.T) { mgr := NewManager(Configuration{ ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv, - }, e, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, e, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // AND manager loads and executes the integrations in the folder ctx, cancel := context.WithCancel(context.Background()) @@ -879,7 +880,7 @@ func TestManager_WhenFileExists(t *testing.T) { defer removeTempFiles(t, dir) emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // WHEN the manager loads and executes the integrations in the folder ctx, cancel := context.WithCancel(context.Background()) @@ -901,7 +902,7 @@ func TestManager_WhenFileDoesNotExist(t *testing.T) { defer removeTempFiles(t, dir) emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // WHEN the manager loads and executes the integrations in the folder ctx, cancel := context.WithCancel(context.Background()) @@ -926,7 +927,7 @@ func TestManager_StartWithVerbose(t *testing.T) { ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv, Verbose: 1, - }, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // AND the manager starts ctx, cancel := context.WithCancel(context.Background()) @@ -958,7 +959,7 @@ func TestManager_StartWithVerboseFalse(t *testing.T) { ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv, Verbose: 0, - }, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // AND the manager starts ctx, cancel := context.WithCancel(context.Background()) @@ -1011,7 +1012,7 @@ func TestManager_anIntegrationCanSpawnAnotherOne(t *testing.T) { // AND an integrations manager emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // WHEN the manager executes the integration ctx, cancel := context.WithCancel(context.Background()) @@ -1033,7 +1034,7 @@ func TestManager_cfgProtocolSpawnIntegrationV3Payload(t *testing.T) { // AND an integrations manager emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // WHEN the manager executes the integration ctx, cancel := context.WithCancel(context.Background()) @@ -1055,7 +1056,7 @@ func TestManager_cfgProtocolSpawnIntegrationV4Payload(t *testing.T) { // AND an integrations manager emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // WHEN the manager executes the integration ctx, cancel := context.WithCancel(context.Background()) @@ -1080,7 +1081,7 @@ func TestManager_cfgProtocolSpawnedIntegrationCannotSpawnIntegration(t *testing. // AND an integrations manager emitter := &testemit.RecordEmitter{} - mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + mgr := NewManager(Configuration{ConfigFolders: []string{dir}, PassthroughEnvironment: passthroughEnv}, emitter, integration.ErrLookup, definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) // WHEN the manager executes the integration ctx, cancel := context.WithCancel(context.Background()) @@ -1130,7 +1131,7 @@ integrations: mgr := NewManager(Configuration{ ConfigFolders: []string{configDir}, DefinitionFolders: []string{niDir}, - }, emitter, instancesLookupReturning(execPath), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil)) + }, emitter, instancesLookupReturning(execPath), definitionQ, terminateDefinitionQ, configEntryQ, track.NewTracker(nil), host.IDLookup{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mgr.Start(ctx) diff --git a/test/cfgprotocol/agent/emulator.go b/test/cfgprotocol/agent/emulator.go index 9781d7f81..9afb3f409 100644 --- a/test/cfgprotocol/agent/emulator.go +++ b/test/cfgprotocol/agent/emulator.go @@ -116,7 +116,7 @@ func (ae *Emulator) RunAgent() error { tracker := track.NewTracker(dmEmitter) il := newInstancesLookup(ae.integrationCfg) integrationEmitter := emitter.NewIntegrationEmittor(ae.agent, dmEmitter, ffManager) - integrationManager := v4.NewManager(ae.integrationCfg, integrationEmitter, il, definitionQ, terminateDefinitionQ, configEntryQ, tracker) + integrationManager := v4.NewManager(ae.integrationCfg, integrationEmitter, il, definitionQ, terminateDefinitionQ, configEntryQ, tracker, ae.agent.Context.IDLookup()) // Start all plugins we want the agent to run. if err = plugins.RegisterPlugins(ae.agent); err != nil {