Skip to content

Commit

Permalink
filebeat,packetbeat,winlogbeat - Register input metrics diagnostic ho…
Browse files Browse the repository at this point in the history
…ok (#35798)

Register an elastic-agent diagnostics hook to return the input metrics (encoded to JSON)
in agent diagnostic dumps.
  • Loading branch information
andrewkroh authored Jun 30, 2023
1 parent 59447f6 commit e9272ad
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add metrics for gcp-pubsub input. {pull}35614[35614]
- [GCS] Added scheduler debug logs and improved the context passing mechanism by removing them from struct params and passing them as function arguments. {pull}35674[35674]
- Allow non-AWS endpoints for awss3 input. {issue}35496[35496] {pull}35520[35520]
- Under elastic-agent the input metrics will now be included in agent diagnostics dumps. {pull}35798[35798]
- Add Okta input package for entity analytics. {pull}35611[35611]
- Expose harvester metrics from filestream input {pull}35835[35835] {issue}33771[33771]
- Add device support for Azure AD entity analytics. {pull}35807[35807]
Expand Down Expand Up @@ -378,6 +379,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415

- Added `packetbeat.interfaces.fanout_group` to allow a Packetbeat sniffer to join an AF_PACKET fanout group. {issue}35451[35451] {pull}35453[35453]
- Add AF_PACKET metrics. {issue}35428[35428] {pull}35489[35489]
- Under elastic-agent the input metrics will now be included in agent diagnostics dumps. {pull}35798[35798]
- Add support for multiple regions in GCP {pull}32964[32964]

*Packetbeat*
Expand All @@ -393,6 +395,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415

- Set `host.os.type` and `host.os.family` to "windows" if not already set. {pull}35435[35435]
- Handle empty DNS answer data in QueryResults for the Sysmon Pipeline {pull}35207[35207]
- Under elastic-agent the input metrics will now be included in agent diagnostics dumps. {pull}35798[35798]


*Elastic Log Driver*
Expand Down
12 changes: 12 additions & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
}
}

if b.Manager != nil {
b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.",
"input_metrics.json", "application/json", func() []byte {
data, err := inputmon.MetricSnapshotJSON()
if err != nil {
logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err)
return []byte(err.Error())
}
return data
})
}

// Add inputs created by the modules
config.Inputs = append(config.Inputs, moduleInputs...)

Expand Down
13 changes: 9 additions & 4 deletions libbeat/monitoring/inputmon/httphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ func (h *handler) allInputs(w http.ResponseWriter, req *http.Request) {
return
}

metrics := monitoring.CollectStructSnapshot(h.registry, monitoring.Full, false)
filtered := filteredSnapshot(h.registry, requestedType)

w.Header().Set(contentType, applicationJSON)
serveJSON(w, filtered, requestedPretty)
}

func filteredSnapshot(r *monitoring.Registry, requestedType string) []map[string]any {
metrics := monitoring.CollectStructSnapshot(r, monitoring.Full, false)

filtered := make([]map[string]any, 0, len(metrics))
for _, ifc := range metrics {
Expand All @@ -84,9 +91,7 @@ func (h *handler) allInputs(w http.ResponseWriter, req *http.Request) {

filtered = append(filtered, m)
}

w.Header().Set(contentType, applicationJSON)
serveJSON(w, filtered, requestedPretty)
return filtered
}

func serveJSON(w http.ResponseWriter, value any, pretty bool) {
Expand Down
7 changes: 7 additions & 0 deletions libbeat/monitoring/inputmon/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package inputmon

import (
"encoding/json"
"strings"

"github.com/google/uuid"
Expand Down Expand Up @@ -78,3 +79,9 @@ func sanitizeID(id string) string {
func globalRegistry() *monitoring.Registry {
return monitoring.GetNamespace("dataset").GetRegistry()
}

// MetricSnapshotJSON returns a snapshot of the input metric values from the
// global 'dataset' monitoring namespace encoded as a JSON array (pretty formatted).
func MetricSnapshotJSON() ([]byte, error) {
return json.MarshalIndent(filteredSnapshot(globalRegistry(), ""), "", " ")
}
25 changes: 25 additions & 0 deletions libbeat/monitoring/inputmon/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/monitoring"
)
Expand Down Expand Up @@ -79,3 +80,27 @@ func TestNewInputMonitor(t *testing.T) {
})
}
}

func TestMetricSnapshotJSON(t *testing.T) {
require.NoError(t, globalRegistry().Clear())
t.Cleanup(func() {
require.NoError(t, globalRegistry().Clear())
})

r, cancel := NewInputRegistry("test", "my-id", nil)
defer cancel()
monitoring.NewInt(r, "foo_total").Set(100)

jsonBytes, err := MetricSnapshotJSON()
require.NoError(t, err)

const expected = `[
{
"foo_total": 100,
"id": "my-id",
"input": "test"
}
]`

assert.Equal(t, expected, string(jsonBytes))
}
12 changes: 12 additions & 0 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ func (pb *packetbeat) Run(b *beat.Beat) error {
}
}

if b.Manager != nil {
b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.",
"input_metrics.json", "application/json", func() []byte {
data, err := inputmon.MetricSnapshotJSON()
if err != nil {
logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err)
return []byte(err.Error())
}
return data
})
}

if !b.Manager.Enabled() {
return pb.runStatic(b, pb.factory)
}
Expand Down
12 changes: 12 additions & 0 deletions winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ func (eb *Winlogbeat) Run(b *beat.Beat) error {
}
}

if b.Manager != nil {
b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.",
"input_metrics.json", "application/json", func() []byte {
data, err := inputmon.MetricSnapshotJSON()
if err != nil {
logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err)
return []byte(err.Error())
}
return data
})
}

var wg sync.WaitGroup
for _, log := range eb.eventLogs {
state := persistedState[log.source.Name()]
Expand Down

0 comments on commit e9272ad

Please sign in to comment.