Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Add a metrics observer to the queue #39774

Merged
merged 40 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
3fcb875
remove drop on cancel option
faec May 29, 2024
47c07a6
producer.Cancel -> producer.Close
faec May 29, 2024
f9d4c39
remove OnDrop callbacks
faec May 29, 2024
d398b46
remove internal cancellation helpers
faec May 29, 2024
bad2498
remove the queue's shipper metrics hook
faec May 29, 2024
eca723b
remove unused fields and producer cancel tests
faec May 29, 2024
c79cc3e
Merge branch 'remove-producer-cancel' into queue-metrics
faec May 29, 2024
bffd70d
fix merge
faec May 29, 2024
7409be1
moving metric ownership around
faec May 29, 2024
75ac0f4
plumbing for queue metrics
faec May 29, 2024
ca949b8
flesh out queue observer internals
faec May 29, 2024
a279862
update queue filled percent
faec May 29, 2024
d0f1939
Merge branch 'main' into queue-metrics
faec May 30, 2024
517ffe1
clean up shipper metric hooks
faec May 30, 2024
4f6d02c
use the metrics observer from the memqueue
faec May 30, 2024
fd18f4e
configure gauges
faec May 30, 2024
2f6ba9b
report queue metrics from the disk queue
faec May 30, 2024
ce2c287
fix disk queue initialization
faec May 30, 2024
e70c13b
outputObserver -> retryObserver
faec May 30, 2024
e6dbb2d
move queue draining logic into the queue
faec May 30, 2024
afa3793
shadow acked var the simple way
faec May 30, 2024
f674a9f
Merge branch 'main' of github.com:elastic/beats into queue-metrics
faec May 31, 2024
076326a
fix active events, metric paths
faec May 31, 2024
729ff87
add memory queue observer tests
faec Jun 5, 2024
91bfeef
add disk queue tests
faec Jun 5, 2024
1d0b0a8
Merge branch 'main' of github.com:elastic/beats into queue-metrics
faec Jun 6, 2024
73aded1
add nil observer checks during queue creation
faec Jun 6, 2024
983e8af
make check
faec Jun 6, 2024
8c1d66a
adjust flaky test
faec Jun 6, 2024
1278fab
Merge branch 'main' of github.com:elastic/beats into queue-metrics
faec Jun 6, 2024
2981307
fix producer queue shutdown handling
faec Jun 6, 2024
0a114c4
Merge branch 'main' into queue-metrics
pierrehilbert Jun 7, 2024
0f06ff0
fix masked panic in some unit tests
faec Jun 7, 2024
e4a1ee6
fix ack handler initialization
faec Jun 7, 2024
d49def3
Merge branch 'queue-metrics' of github.com:faec/beats into queue-metrics
faec Jun 7, 2024
7621c1f
fix goroutine checker in test
faec Jun 7, 2024
2df11e4
add metrics nil check
faec Jun 7, 2024
714678c
fix tests
faec Jun 10, 2024
b3a728e
document queue metrics
faec Jun 11, 2024
3b5f068
hopefully decrease test flakiness
faec Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions filebeat/tests/system/test_reload_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ def test_start_stop(self):
inputs=False,
)

proc = self.start_beat()

os.mkdir(self.working_dir + "/logs/")
logfile = self.working_dir + "/logs/test.log"
os.mkdir(self.working_dir + "/configs/")
Expand All @@ -103,6 +101,8 @@ def test_start_stop(self):
with open(logfile, 'w') as f:
f.write("Hello world\n")

proc = self.start_beat()

self.wait_until(lambda: self.output_lines() == 1)

# Remove input by moving the file
Expand Down
49 changes: 44 additions & 5 deletions libbeat/docs/metrics-in-logs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

Every 30 seconds (by default), {beatname_uc} collects a _snapshot_ of metrics about itself. From this snapshot, {beatname_uc} computes a _delta snapshot_; this delta snapshot contains any metrics that have _changed_ since the last snapshot. Note that the values of the metrics are the values when the snapshot is taken, _NOT_ the _difference_ in values from the last snapshot.

If this delta snapshot contains _any_ metrics (indicating at least one metric that has changed since the last snapshot), this delta snapshot is serialized as JSON and emitted in {beatname_uc}'s logs at the `INFO` log level. Here is an example of such a log entry:
If this delta snapshot contains _any_ metrics (indicating at least one metric that has changed since the last snapshot), this delta snapshot is serialized as JSON and emitted in {beatname_uc}'s logs at the `INFO` log level. Most snapshot fields report the change in the metric since the last snapshot, however some fields are _gauges_, which always report the current value. Here is an example of such a log entry:

[source,json]
----
{"log.level":"info","@timestamp":"2023-07-14T12:50:36.811Z","log.logger":"monitoring","log.origin":{"file.name":"log/log.go","file.line":187},"message":"Non-zero metrics in the last 30s","service.name":"filebeat","monitoring":{"metrics":{"beat":{"cgroup":{"memory":{"mem":{"usage":{"bytes":0}}}},"cpu":{"system":{"ticks":692690,"time":{"ms":60}},"total":{"ticks":3167250,"time":{"ms":150},"value":3167250},"user":{"ticks":2474560,"time":{"ms":90}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":32},"info":{"ephemeral_id":"2bab8688-34c0-4522-80af-db86948d547d","uptime":{"ms":617670096},"version":"8.6.2"},"memstats":{"gc_next":57189272,"memory_alloc":43589824,"memory_total":275281335792,"rss":183574528},"runtime":{"goroutines":212}},"filebeat":{"events":{"active":5,"added":52,"done":49},"harvester":{"open_files":6,"running":6,"started":1}},"libbeat":{"config":{"module":{"running":15}},"output":{"events":{"acked":48,"active":0,"batches":6,"total":48},"read":{"bytes":210},"write":{"bytes":26923}},"pipeline":{"clients":15,"events":{"active":5,"filtered":1,"published":51,"total":52},"queue":{"acked":48}}},"registrar":{"states":{"current":14,"update":49},"writes":{"success":6,"total":6}},"system":{"load":{"1":0.91,"15":0.37,"5":0.4,"norm":{"1":0.1138,"15":0.0463,"5":0.05}}}},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2023-07-14T12:50:36.811Z","log.logger":"monitoring","log.origin":{"file.name":"log/log.go","file.line":187},"message":"Non-zero metrics in the last 30s","service.name":"filebeat","monitoring":{"metrics":{"beat":{"cgroup":{"memory":{"mem":{"usage":{"bytes":0}}}},"cpu":{"system":{"ticks":692690,"time":{"ms":60}},"total":{"ticks":3167250,"time":{"ms":150},"value":3167250},"user":{"ticks":2474560,"time":{"ms":90}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":32},"info":{"ephemeral_id":"2bab8688-34c0-4522-80af-db86948d547d","uptime":{"ms":617670096},"version":"8.6.2"},"memstats":{"gc_next":57189272,"memory_alloc":43589824,"memory_total":275281335792,"rss":183574528},"runtime":{"goroutines":212}},"filebeat":{"events":{"active":5,"added":52,"done":49},"harvester":{"open_files":6,"running":6,"started":1}},"libbeat":{"config":{"module":{"running":15}},"output":{"events":{"acked":48,"active":0,"batches":6,"total":48},"read":{"bytes":210},"write":{"bytes":26923}},"pipeline":{"clients":15,"events":{"active":5,"filtered":1,"published":51,"total":52},"queue":{"max_events":3500,"filled":{"events":5,"bytes":6425,"pct":0.0014},"added":{"events":52,"bytes":65702},"consumed":{"events":52,"bytes":65702},"removed":{"events":48,"bytes":59277},"acked":48}}},"registrar":{"states":{"current":14,"update":49},"writes":{"success":6,"total":6}},"system":{"load":{"1":0.91,"15":0.37,"5":0.4,"norm":{"1":0.1138,"15":0.0463,"5":0.05}}}},"ecs.version":"1.6.0"}}
----

[discrete]
Expand Down Expand Up @@ -113,6 +113,24 @@ Focussing on the `.monitoring.metrics` field, and formatting the JSON, it's valu
"total": 52
},
"queue": {
"max_events": 3500,
"filled": {
"events": 5,
"bytes": 6425,
"pct": 0.0014
},
"added": {
"events": 52,
"bytes": 65702
},
"consumed": {
"events": 52,
"bytes": 65702
},
"removed": {
"events": 48,
"bytes": 59277
},
"acked": 48
}
}
Expand All @@ -130,12 +148,12 @@ Focussing on the `.monitoring.metrics` field, and formatting the JSON, it's valu
"system": {
"load": {
"1": 0.91,
"5": 0.4,
"15": 0.37,
"5": 0.4,
"norm": {
"1": 0.1138,
"5": 0.05,
"15": 0.0463
"15": 0.0463,
"5": 0.05
}
}
}
Expand Down Expand Up @@ -170,9 +188,30 @@ endif::[]
| `.output.events.total` | Integer | Number of events currently being processed by the output. | If this number grows over time, it may indicate that the output destination (e.g. {ls} pipeline or {es} cluster) is not able to accept events at the same or faster rate than what {beatname_uc} is sending to it.
| `.output.events.acked` | Integer | Number of events acknowledged by the output destination. | Generally, we want this number to be the same as `.output.events.total` as this indicates that the output destination has reliably received all the events sent to it.
| `.output.events.failed` | Integer | Number of events that {beatname_uc} tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it's useful to check {beatname_uc}'s logs right before this log entry's `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later.
| `.output.events.dropped` | Integer | Number of events that {beatname_uc} gave up sending to the output destination because of a permanent (non-retryable) error.
| `.output.events.dead_letter` | Integer | Number of events that {beatname_uc} successfully sent to a configured dead letter index after they failed to ingest in the primary index.
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs.
|===

[cols="1,1,2,2"]
|===
| Field path (relative to `.monitoring.metrics.libbeat.pipeline`) | Type | Meaning | Troubleshooting hints

| `.queue.max_events` | Integer (gauge) | The queue's maximum event count if it has one, otherwise zero.
| `.queue.max_bytes` | Integer (gauge) | The queue's maximum byte count if it has one, otherwise zero.
| `.queue.filled.events` | Integer (gauge) | Number of events currently stored by the queue. |
| `.queue.filled.bytes` | Integer (gauge) | Number of bytes currently stored by the queue. |
| `.queue.filled.pct` | Float (gauge) | How full the queue is relative to its maximum size, as a fraction from 0 to 1. | Low throughput while `queue.filled.pct` is low means congestion in the input. Low throughput while `queue.filled.pct` is high means congestion in the output.
| `.queue.added.events` | Integer | Number of events added to the queue by input workers. |
| `.queue.added.bytes` | Integer | Number of bytes added to the queue by input workers. |
| `.queue.consumed.events` | Integer | Number of events sent to output workers. |
| `.queue.consumed.bytes` | Integer | Number of bytes sent to output workers. |
| `.queue.removed.events` | Integer | Number of events removed from the queue after being processed by output workers. |
| `.queue.removed.bytes` | Integer | Number of bytes removed from the queue after being processed by output workers. |
|===

When using the memory queue, byte metrics are only set if the output supports them. Currently only the Elasticsearch output supports byte metrics.

ifeval::["{beatname_lc}"=="filebeat"]
[cols="1,1,2,2"]
|===
Expand Down
63 changes: 33 additions & 30 deletions libbeat/monitoring/report/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,36 +37,39 @@ import (
// TODO: Replace this with a proper solution that uses the metric type from
// where it is defined. See: https://github.com/elastic/beats/issues/5433
var gauges = map[string]bool{
"libbeat.output.events.active": true,
"libbeat.pipeline.events.active": true,
"libbeat.pipeline.clients": true,
"libbeat.pipeline.queue.max_events": true,
"libbeat.pipeline.queue.filled.pct.events": true,
"libbeat.config.module.running": true,
"registrar.states.current": true,
"filebeat.events.active": true,
"filebeat.harvester.running": true,
"filebeat.harvester.open_files": true,
"beat.memstats.memory_total": true,
"beat.memstats.memory_alloc": true,
"beat.memstats.rss": true,
"beat.memstats.gc_next": true,
"beat.info.uptime.ms": true,
"beat.cgroup.memory.mem.usage.bytes": true,
"beat.cpu.user.ticks": true,
"beat.cpu.system.ticks": true,
"beat.cpu.total.value": true,
"beat.cpu.total.ticks": true,
"beat.handles.open": true,
"beat.handles.limit.hard": true,
"beat.handles.limit.soft": true,
"beat.runtime.goroutines": true,
"system.load.1": true,
"system.load.5": true,
"system.load.15": true,
"system.load.norm.1": true,
"system.load.norm.5": true,
"system.load.norm.15": true,
"libbeat.output.events.active": true,
"libbeat.pipeline.events.active": true,
"libbeat.pipeline.clients": true,
"libbeat.pipeline.queue.max_events": true,
"libbeat.pipeline.queue.max_bytes": true,
"libbeat.pipeline.queue.filled.events": true,
"libbeat.pipeline.queue.filled.bytes": true,
"libbeat.pipeline.queue.filled.pct": true,
"libbeat.config.module.running": true,
"registrar.states.current": true,
"filebeat.events.active": true,
"filebeat.harvester.running": true,
"filebeat.harvester.open_files": true,
"beat.memstats.memory_total": true,
"beat.memstats.memory_alloc": true,
"beat.memstats.rss": true,
"beat.memstats.gc_next": true,
"beat.info.uptime.ms": true,
"beat.cgroup.memory.mem.usage.bytes": true,
"beat.cpu.user.ticks": true,
"beat.cpu.system.ticks": true,
"beat.cpu.total.value": true,
"beat.cpu.total.ticks": true,
"beat.handles.open": true,
"beat.handles.limit.hard": true,
"beat.handles.limit.soft": true,
"beat.runtime.goroutines": true,
"system.load.1": true,
"system.load.5": true,
"system.load.15": true,
"system.load.norm.1": true,
"system.load.norm.5": true,
"system.load.norm.15": true,
}

// isGauge returns true when the given metric key name represents a gauge value.
Expand Down
16 changes: 5 additions & 11 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,12 @@
mutex sync.Mutex
waiter *clientCloseWaiter

eventFlags publisher.EventFlags
canDrop bool
eventWaitGroup *sync.WaitGroup
eventFlags publisher.EventFlags
canDrop bool

// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once

Check failure on line 45 in libbeat/publisher/pipeline/client.go

View workflow job for this annotation

GitHub Actions / lint (windows)

field `closeOnce` is unused (unused)

observer observer
eventListener beat.EventListener
Expand Down Expand Up @@ -132,10 +131,8 @@
}

func (c *client) Close() error {
// first stop ack handling. ACK handler might block on wait (with timeout), waiting
// for pending events to be ACKed.
c.closeOnce.Do(func() {
c.isOpen.Store(false)
if c.isOpen.Swap(false) {
// Only do shutdown handling the first time Close is called
c.onClosing()

c.logger.Debug("client: closing acker")
Expand All @@ -158,7 +155,7 @@
}
c.logger.Debug("client: done closing processors")
}
})
}
return nil
}

Expand All @@ -180,9 +177,6 @@
}

func (c *client) onPublished() {
if c.eventWaitGroup != nil {
c.eventWaitGroup.Add(1)
}
c.observer.publishedEvent()
if c.clientListener != nil {
c.clientListener.Published()
Expand Down
75 changes: 21 additions & 54 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,59 +60,27 @@ func TestClient(t *testing.T) {
// Note: no asserts. If closing fails we have a deadlock, because Publish
// would block forever

cases := map[string]struct {
context bool
close func(client beat.Client, cancel func())
}{
"close unblocks client without context": {
context: false,
close: func(client beat.Client, _ func()) {
client.Close()
},
},
"close unblocks client with context": {
context: true,
close: func(client beat.Client, _ func()) {
client.Close()
},
},
"context cancel unblocks client": {
context: true,
close: func(client beat.Client, cancel func()) {
cancel()
},
},
}

logp.TestingSetup()
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

for name, test := range cases {
t.Run(name, func(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)
pipeline := makePipeline(t, Settings{}, makeTestQueue())
defer pipeline.Close()

pipeline := makePipeline(t, Settings{}, makeTestQueue())
defer pipeline.Close()

client, err := pipeline.ConnectWith(beat.ClientConfig{})
if err != nil {
t.Fatal(err)
}
defer client.Close()
client, err := pipeline.ConnectWith(beat.ClientConfig{})
if err != nil {
t.Fatal(err)
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
client.Publish(beat.Event{})
}()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
client.Publish(beat.Event{})
}()

test.close(client, func() {
client.Close()
})
wg.Wait()
})
}
client.Close()
wg.Wait()
})

t.Run("no infinite loop when processing fails", func(t *testing.T) {
Expand Down Expand Up @@ -216,9 +184,6 @@ func TestClient(t *testing.T) {
}

func TestClientWaitClose(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

makePipeline := func(settings Settings, qu queue.Queue) *Pipeline {
p, err := New(beat.Info{},
Monitors{},
Expand All @@ -241,6 +206,9 @@ func TestClientWaitClose(t *testing.T) {
defer pipeline.Close()

t.Run("WaitClose blocks", func(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitClose: 500 * time.Millisecond,
})
Expand Down Expand Up @@ -272,6 +240,8 @@ func TestClientWaitClose(t *testing.T) {
})

t.Run("ACKing events unblocks WaitClose", func(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)
client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitClose: time.Minute,
})
Expand Down Expand Up @@ -344,9 +314,6 @@ func TestMonitoring(t *testing.T) {
require.NoError(t, err)
defer pipeline.Close()

metricsSnapshot := monitoring.CollectFlatSnapshot(metrics, monitoring.Full, true)
assert.Equal(t, int64(maxEvents), metricsSnapshot.Ints["pipeline.queue.max_events"])

telemetrySnapshot := monitoring.CollectFlatSnapshot(telemetry, monitoring.Full, true)
assert.Equal(t, "output_name", telemetrySnapshot.Strings["output.name"])
assert.Equal(t, int64(batchSize), telemetrySnapshot.Ints["output.batch_size"])
Expand Down
16 changes: 8 additions & 8 deletions libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
type eventConsumer struct {
logger *logp.Logger

// eventConsumer calls the observer methods eventsRetry and eventsDropped.
observer outputObserver
// eventConsumer calls the retryObserver methods eventsRetry and eventsDropped.
retryObserver retryObserver

// When the output changes, the new target is sent to the worker routine
// on this channel. Clients should call eventConsumer.setTarget().
Expand Down Expand Up @@ -73,12 +73,12 @@ type retryRequest struct {

func newEventConsumer(
log *logp.Logger,
observer outputObserver,
observer retryObserver,
) *eventConsumer {
c := &eventConsumer{
logger: log,
observer: observer,
queueReader: makeQueueReader(),
logger: log,
retryObserver: observer,
queueReader: makeQueueReader(),

targetChan: make(chan consumerTarget),
retryChan: make(chan retryRequest),
Expand Down Expand Up @@ -163,7 +163,7 @@ outerLoop:
// Successfully sent a batch to the output workers
if len(retryBatches) > 0 {
// This was a retry, report it to the observer
c.observer.eventsRetry(len(active.Events()))
c.retryObserver.eventsRetry(len(active.Events()))
retryBatches = retryBatches[1:]
} else {
// This was directly from the queue, clear the value so we can
Expand All @@ -183,7 +183,7 @@ outerLoop:
alive := req.batch.reduceTTL()

countDropped := countFailed - len(req.batch.Events())
c.observer.eventsDropped(countDropped)
c.retryObserver.eventsDropped(countDropped)

if !alive {
log.Info("Drop batch")
Expand Down
Loading
Loading