Skip to content

Commit

Permalink
add host_id into v4 integrations (#1021)
Browse files Browse the repository at this point in the history
  • Loading branch information
brushknight authored Mar 18, 2022
1 parent 2daec54 commit 6b63975
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 45 deletions.
2 changes: 1 addition & 1 deletion cmd/newrelic-infra/newrelic-infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error {
tracker := track.NewTracker(dmEmitter)

integrationEmitter := emitter.NewIntegrationEmittor(agt, dmEmitter, ffManager)
integrationManager := v4.NewManager(integrationCfg, integrationEmitter, il, definitionQ, terminateDefinitionQ, configEntryQ, tracker)
integrationManager := v4.NewManager(integrationCfg, integrationEmitter, il, definitionQ, terminateDefinitionQ, configEntryQ, tracker, agt.Context.IDLookup())

// Command channel handlers
backoffSecsC := make(chan int, 1) // 1 won't block on initial cmd-channel fetch
Expand Down
1 change: 1 addition & 0 deletions internal/integrations/v4/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
package constants

const EnableVerbose = "enable_verbose"
const HostID = "host_id"
6 changes: 6 additions & 0 deletions internal/integrations/v4/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ func (r *Executor) buildCommand(ctx context.Context) *exec.Cmd {
cmd.Env = append(cmd.Env, "VERBOSE=1")
}

hostID, ok := ctx.Value(constants.HostID).(string)

if ok && hostID != "" {
cmd.Env = append(cmd.Env, "NRI_HOST_ID="+hostID)
}

cmd.Dir = r.Cfg.Directory
return cmd
}
Expand Down
6 changes: 5 additions & 1 deletion internal/integrations/v4/runner/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package runner

import (
"context"
"github.com/newrelic/infrastructure-agent/pkg/entity/host"

"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/pkg/databind/pkg/databind"
Expand All @@ -29,6 +30,7 @@ type Group struct {
cmdReqHandle cmdrequest.HandleFn
configHandle configrequest.HandleFn
terminateDefinitionQ chan string
idLookup host.IDLookup
}

type runnerErrorHandler func(ctx context.Context, errs <-chan error)
Expand All @@ -44,6 +46,7 @@ func NewGroup(
configHandle configrequest.HandleFn,
cfgPath string,
terminateDefinitionQ chan string,
idLookup host.IDLookup,
) (g Group, c FeaturesCache, err error) {

g, c, err = loadFn(il, passthroughEnv, cfgPath, cmdReqHandle, configHandle, terminateDefinitionQ)
Expand All @@ -52,6 +55,7 @@ func NewGroup(
}

g.emitter = emitter
g.idLookup = idLookup

return
}
Expand All @@ -60,7 +64,7 @@ func NewGroup(
// provided context
func (g *Group) Run(ctx context.Context) (hasStartedAnyOHI bool) {
for _, integr := range g.integrations {
go NewRunner(integr, g.emitter, g.dSources, g.handleErrorsProvide, g.cmdReqHandle, g.configHandle, g.terminateDefinitionQ).Run(ctx, nil, nil)
go NewRunner(integr, g.emitter, g.dSources, g.handleErrorsProvide, g.cmdReqHandle, g.configHandle, g.terminateDefinitionQ, g.idLookup).Run(ctx, nil, nil)
hasStartedAnyOHI = true
}

Expand Down
13 changes: 7 additions & 6 deletions internal/integrations/v4/runner/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package runner

import (
"context"
"github.com/newrelic/infrastructure-agent/pkg/entity/host"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -43,7 +44,7 @@ func TestGroup_Run(t *testing.T) {
{InstanceName: "saygoodbye", Exec: testhelp.Command(fixtures.IntegrationScript, "bye")},
},
}, nil)
gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue)
gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{})
require.NoError(t, err)

// WHEN the Group executes all the integrations
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestGroup_Run_Inventory(t *testing.T) {
Labels: map[string]string{"foo": "bar", "ou": "yea"}},
},
}, nil)
gr, _, err := NewGroup(loader, integration.InstancesLookup{}, passthroughEnv, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue)
gr, _, err := NewGroup(loader, integration.InstancesLookup{}, passthroughEnv, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{})
require.NoError(t, err)

// WHEN the integration is executed
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestGroup_Run_Inventory_OverridePrefix(t *testing.T) {
InventorySource: "custom/inventory"},
},
}, nil)
gr, _, err := NewGroup(loader, integration.InstancesLookup{}, passthroughEnv, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue)
gr, _, err := NewGroup(loader, integration.InstancesLookup{}, passthroughEnv, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{})
require.NoError(t, err)

// WHEN the integration is executed
Expand All @@ -156,7 +157,7 @@ func TestGroup_Run_Timeout(t *testing.T) {
{InstanceName: "Hello", Exec: testhelp.Command(fixtures.BlockedCmd), Timeout: &to},
},
}, nil)
gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue)
gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{})
require.NoError(t, err)
errs := interceptGroupErrors(&gr)

Expand Down Expand Up @@ -247,7 +248,7 @@ func TestGroup_Run_ConfigPathUpdated(t *testing.T) {
Config: "hello",
}},
}, nil)
group, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue)
group, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{})
require.NoError(t, err)
// shortening the interval to avoid long tests
group.integrations[0].Interval = 100 * time.Millisecond
Expand Down Expand Up @@ -324,7 +325,7 @@ func TestGroup_Run_IntegrationScriptPrintsErrorsAndReturnCodeIsZero(t *testing.T
{InstanceName: "log_errors", Exec: testhelp.Command(fixtures.IntegrationPrintsErr, "bye")},
},
}, nil)
gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue)
gr, _, err := NewGroup(loader, integration.InstancesLookup{}, nil, te, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, "", terminatedQueue, host.IDLookup{})
require.NoError(t, err)

// WHEN we add a hook to the log to capture the "error" and "fatal" levels
Expand Down
19 changes: 19 additions & 0 deletions internal/integrations/v4/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"bytes"
"context"
"github.com/newrelic/infrastructure-agent/internal/agent/instrumentation"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/constants"
"github.com/newrelic/infrastructure-agent/pkg/entity/host"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -60,6 +62,7 @@ type runner struct {
heartBeatMutex sync.RWMutex
cache cache.Cache
terminateQueue chan<- string
idLookup host.IDLookup
}

// NewRunner creates an integration runner instance.
Expand All @@ -72,6 +75,7 @@ func NewRunner(
cmdReqHandle cmdrequest.HandleFn,
configHandle configrequest.HandleFn,
terminateQ chan<- string,
idLookup host.IDLookup,
) *runner {
r := &runner{
emitter: emitter,
Expand All @@ -83,6 +87,7 @@ func NewRunner(
stderrParser: parseLogrusFields,
terminateQueue: terminateQ,
cache: cache.CreateCache(),
idLookup: idLookup,
}
if handleErrorsProvide != nil {
r.handleErrors = handleErrorsProvide()
Expand Down Expand Up @@ -210,6 +215,16 @@ func (r *runner) execute(ctx context.Context, matches *databind.Values, pidWCh,
r.setHeartBeat(act.HeartBeat)
}

// add hostID in the context to fetch and set in executor
hostID, err := r.idLookup.AgentShortEntityName()

if err == nil {
ctx = contextWithHostID(ctx, hostID)
} else {
txn.NoticeError(err)
r.log.WithError(err).Error("can't fetch host ID")
}

// Runs all the matching integration instances
outputs, err := r.definition.Run(ctx, matches, pidWCh, exitCodeCh)
if err != nil {
Expand Down Expand Up @@ -362,6 +377,10 @@ func (r *runner) handleLines(stdout <-chan []byte, extraLabels data.Map, entityR
}
}

func contextWithHostID(ctx context.Context, hostID string) context.Context {
return context.WithValue(ctx, constants.HostID, hostID)
}

func isHeartBeat(line []byte) bool {
return bytes.Equal(bytes.Trim(line, " "), heartBeatJSON)
}
Expand Down
9 changes: 5 additions & 4 deletions internal/integrations/v4/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package runner

import (
"context"
"github.com/newrelic/infrastructure-agent/pkg/entity/host"
"io/ioutil"
"os"
"runtime"
Expand Down Expand Up @@ -50,7 +51,7 @@ func Test_runner_Run(t *testing.T) {
require.NoError(t, err)

e := &testemit.RecordEmitter{}
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, nil)
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, nil, host.IDLookup{})

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
Expand Down Expand Up @@ -84,7 +85,7 @@ func Test_runner_Run_noHandleForCfgProtocol(t *testing.T) {
require.NoError(t, err)

e := &testemit.RecordEmitter{}
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, nil, nil)
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, nil, nil, host.IDLookup{})

// WHEN the runner executes the binary and handle the payload.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
Expand Down Expand Up @@ -127,7 +128,7 @@ func Test_runner_Run_failToUnMarshallCfgProtocol(t *testing.T) {
require.NoError(t, err)

e := &testemit.RecordEmitter{}
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, nil)
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, configrequest.NoopHandleFn, nil, host.IDLookup{})

// WHEN the runner executes the binary and handle the payload.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
Expand Down Expand Up @@ -166,7 +167,7 @@ func Test_runner_Run_handlesCfgProtocol(t *testing.T) {
atomic.AddUint32(&called, 1)
}
e := &testemit.RecordEmitter{}
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, mockHandleFn, nil)
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, mockHandleFn, nil, host.IDLookup{})

// WHEN the runner executes the binary and handle the payload.
ctx, cancel := context.WithTimeout(context.Background(), 1500*time.Millisecond)
Expand Down
10 changes: 7 additions & 3 deletions pkg/integrations/v4/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package v4
import (
"context"
"errors"
"github.com/newrelic/infrastructure-agent/pkg/entity/host"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -113,6 +114,7 @@ type Manager struct {
handleCmdReq cmdrequest.HandleFn
handleConfig configrequest.HandleFn
tracker *track.Tracker
idLookup host.IDLookup
}

// groupContext pairs a runner.Group with its cancellation context
Expand Down Expand Up @@ -191,6 +193,7 @@ func NewManager(
terminateDefinitionQ chan string,
configEntryQ chan configrequest.Entry,
tracker *track.Tracker,
idLookup host.IDLookup,
) *Manager {
watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand All @@ -210,6 +213,7 @@ func NewManager(
handleCmdReq: cmdrequest.NewHandleFn(definitionQ, il, illog),
handleConfig: configrequest.NewHandleFn(configEntryQ, terminateDefinitionQ, il, illog),
tracker: tracker,
idLookup: idLookup,
}

// Loads all the configuration files in the passed configFolders
Expand Down Expand Up @@ -307,7 +311,7 @@ func (mgr *Manager) loadEnabledRunnerGroups(cfgs map[string]config2.YAML) {
func (mgr *Manager) loadRunnerGroup(path string, cfg config2.YAML, cmdFF *runner.CmdFF) (*groupContext, error) {
f := runner.NewFeatures(mgr.config.AgentFeatures, cmdFF)
loader := runner.NewLoadFn(cfg, f)
gr, fc, err := runner.NewGroup(loader, mgr.lookup, mgr.config.PassthroughEnvironment, mgr.emitter, mgr.handleCmdReq, mgr.handleConfig, path, mgr.terminateDefinitionQueue)
gr, fc, err := runner.NewGroup(loader, mgr.lookup, mgr.config.PassthroughEnvironment, mgr.emitter, mgr.handleCmdReq, mgr.handleConfig, path, mgr.terminateDefinitionQueue, mgr.idLookup)
if err != nil {
return nil, err
}
Expand All @@ -324,7 +328,7 @@ func (mgr *Manager) handleRequestsQueue(ctx context.Context) {
return

case def := <-mgr.definitionQueue:
r := runner.NewRunner(def, mgr.emitter, nil, nil, mgr.handleCmdReq, nil, mgr.terminateDefinitionQueue)
r := runner.NewRunner(def, mgr.emitter, nil, nil, mgr.handleCmdReq, nil, mgr.terminateDefinitionQueue, mgr.idLookup)
if def.CmdChanReq != nil {
// tracking so cmd requests can be stopped by hash
runCtx, pidWCh := mgr.tracker.Track(ctx, def.CmdChanReq.CmdChannelCmdHash, &def)
Expand All @@ -339,7 +343,7 @@ func (mgr *Manager) handleRequestsQueue(ctx context.Context) {
}
case entry := <-mgr.configEntryQueue:
ds, _ := entry.Databind.DataSources()
r := runner.NewRunner(entry.Definition, mgr.emitter, ds, nil, nil, nil, mgr.terminateDefinitionQueue)
r := runner.NewRunner(entry.Definition, mgr.emitter, ds, nil, nil, nil, mgr.terminateDefinitionQueue, mgr.idLookup)
runCtx, pidWCh := mgr.tracker.Track(ctx, entry.Definition.Hash(), &entry.Definition)
go r.Run(runCtx, pidWCh, nil)

Expand Down
Loading

0 comments on commit 6b63975

Please sign in to comment.