diff --git a/filebeat/tests/system/test_reload_inputs.py b/filebeat/tests/system/test_reload_inputs.py index 53644837c2c..6f380bb6d4e 100644 --- a/filebeat/tests/system/test_reload_inputs.py +++ b/filebeat/tests/system/test_reload_inputs.py @@ -91,8 +91,6 @@ def test_start_stop(self): inputs=False, ) - proc = self.start_beat() - os.mkdir(self.working_dir + "/logs/") logfile = self.working_dir + "/logs/test.log" os.mkdir(self.working_dir + "/configs/") @@ -103,6 +101,8 @@ def test_start_stop(self): with open(logfile, 'w') as f: f.write("Hello world\n") + proc = self.start_beat() + self.wait_until(lambda: self.output_lines() == 1) # Remove input by moving the file diff --git a/libbeat/docs/metrics-in-logs.asciidoc b/libbeat/docs/metrics-in-logs.asciidoc index 97aac4f3a30..27d27ef9d43 100644 --- a/libbeat/docs/metrics-in-logs.asciidoc +++ b/libbeat/docs/metrics-in-logs.asciidoc @@ -2,11 +2,11 @@ Every 30 seconds (by default), {beatname_uc} collects a _snapshot_ of metrics about itself. From this snapshot, {beatname_uc} computes a _delta snapshot_; this delta snapshot contains any metrics that have _changed_ since the last snapshot. Note that the values of the metrics are the values when the snapshot is taken, _NOT_ the _difference_ in values from the last snapshot. -If this delta snapshot contains _any_ metrics (indicating at least one metric that has changed since the last snapshot), this delta snapshot is serialized as JSON and emitted in {beatname_uc}'s logs at the `INFO` log level. Here is an example of such a log entry: +If this delta snapshot contains _any_ metrics (indicating at least one metric that has changed since the last snapshot), this delta snapshot is serialized as JSON and emitted in {beatname_uc}'s logs at the `INFO` log level. Most snapshot fields report the change in the metric since the last snapshot, however some fields are _gauges_, which always report the current value. Here is an example of such a log entry: [source,json] ---- -{"log.level":"info","@timestamp":"2023-07-14T12:50:36.811Z","log.logger":"monitoring","log.origin":{"file.name":"log/log.go","file.line":187},"message":"Non-zero metrics in the last 30s","service.name":"filebeat","monitoring":{"metrics":{"beat":{"cgroup":{"memory":{"mem":{"usage":{"bytes":0}}}},"cpu":{"system":{"ticks":692690,"time":{"ms":60}},"total":{"ticks":3167250,"time":{"ms":150},"value":3167250},"user":{"ticks":2474560,"time":{"ms":90}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":32},"info":{"ephemeral_id":"2bab8688-34c0-4522-80af-db86948d547d","uptime":{"ms":617670096},"version":"8.6.2"},"memstats":{"gc_next":57189272,"memory_alloc":43589824,"memory_total":275281335792,"rss":183574528},"runtime":{"goroutines":212}},"filebeat":{"events":{"active":5,"added":52,"done":49},"harvester":{"open_files":6,"running":6,"started":1}},"libbeat":{"config":{"module":{"running":15}},"output":{"events":{"acked":48,"active":0,"batches":6,"total":48},"read":{"bytes":210},"write":{"bytes":26923}},"pipeline":{"clients":15,"events":{"active":5,"filtered":1,"published":51,"total":52},"queue":{"acked":48}}},"registrar":{"states":{"current":14,"update":49},"writes":{"success":6,"total":6}},"system":{"load":{"1":0.91,"15":0.37,"5":0.4,"norm":{"1":0.1138,"15":0.0463,"5":0.05}}}},"ecs.version":"1.6.0"}} +{"log.level":"info","@timestamp":"2023-07-14T12:50:36.811Z","log.logger":"monitoring","log.origin":{"file.name":"log/log.go","file.line":187},"message":"Non-zero metrics in the last 30s","service.name":"filebeat","monitoring":{"metrics":{"beat":{"cgroup":{"memory":{"mem":{"usage":{"bytes":0}}}},"cpu":{"system":{"ticks":692690,"time":{"ms":60}},"total":{"ticks":3167250,"time":{"ms":150},"value":3167250},"user":{"ticks":2474560,"time":{"ms":90}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":32},"info":{"ephemeral_id":"2bab8688-34c0-4522-80af-db86948d547d","uptime":{"ms":617670096},"version":"8.6.2"},"memstats":{"gc_next":57189272,"memory_alloc":43589824,"memory_total":275281335792,"rss":183574528},"runtime":{"goroutines":212}},"filebeat":{"events":{"active":5,"added":52,"done":49},"harvester":{"open_files":6,"running":6,"started":1}},"libbeat":{"config":{"module":{"running":15}},"output":{"events":{"acked":48,"active":0,"batches":6,"total":48},"read":{"bytes":210},"write":{"bytes":26923}},"pipeline":{"clients":15,"events":{"active":5,"filtered":1,"published":51,"total":52},"queue":{"max_events":3500,"filled":{"events":5,"bytes":6425,"pct":0.0014},"added":{"events":52,"bytes":65702},"consumed":{"events":52,"bytes":65702},"removed":{"events":48,"bytes":59277},"acked":48}}},"registrar":{"states":{"current":14,"update":49},"writes":{"success":6,"total":6}},"system":{"load":{"1":0.91,"15":0.37,"5":0.4,"norm":{"1":0.1138,"15":0.0463,"5":0.05}}}},"ecs.version":"1.6.0"}} ---- [discrete] @@ -113,6 +113,24 @@ Focussing on the `.monitoring.metrics` field, and formatting the JSON, it's valu "total": 52 }, "queue": { + "max_events": 3500, + "filled": { + "events": 5, + "bytes": 6425, + "pct": 0.0014 + }, + "added": { + "events": 52, + "bytes": 65702 + }, + "consumed": { + "events": 52, + "bytes": 65702 + }, + "removed": { + "events": 48, + "bytes": 59277 + }, "acked": 48 } } @@ -130,12 +148,12 @@ Focussing on the `.monitoring.metrics` field, and formatting the JSON, it's valu "system": { "load": { "1": 0.91, - "5": 0.4, "15": 0.37, + "5": 0.4, "norm": { "1": 0.1138, - "5": 0.05, - "15": 0.0463 + "15": 0.0463, + "5": 0.05 } } } @@ -170,9 +188,30 @@ endif::[] | `.output.events.total` | Integer | Number of events currently being processed by the output. | If this number grows over time, it may indicate that the output destination (e.g. {ls} pipeline or {es} cluster) is not able to accept events at the same or faster rate than what {beatname_uc} is sending to it. | `.output.events.acked` | Integer | Number of events acknowledged by the output destination. | Generally, we want this number to be the same as `.output.events.total` as this indicates that the output destination has reliably received all the events sent to it. | `.output.events.failed` | Integer | Number of events that {beatname_uc} tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it's useful to check {beatname_uc}'s logs right before this log entry's `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later. +| `.output.events.dropped` | Integer | Number of events that {beatname_uc} gave up sending to the output destination because of a permanent (non-retryable) error. +| `.output.events.dead_letter` | Integer | Number of events that {beatname_uc} successfully sent to a configured dead letter index after they failed to ingest in the primary index. | `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs. |=== +[cols="1,1,2,2"] +|=== +| Field path (relative to `.monitoring.metrics.libbeat.pipeline`) | Type | Meaning | Troubleshooting hints + +| `.queue.max_events` | Integer (gauge) | The queue's maximum event count if it has one, otherwise zero. +| `.queue.max_bytes` | Integer (gauge) | The queue's maximum byte count if it has one, otherwise zero. +| `.queue.filled.events` | Integer (gauge) | Number of events currently stored by the queue. | +| `.queue.filled.bytes` | Integer (gauge) | Number of bytes currently stored by the queue. | +| `.queue.filled.pct` | Float (gauge) | How full the queue is relative to its maximum size, as a fraction from 0 to 1. | Low throughput while `queue.filled.pct` is low means congestion in the input. Low throughput while `queue.filled.pct` is high means congestion in the output. +| `.queue.added.events` | Integer | Number of events added to the queue by input workers. | +| `.queue.added.bytes` | Integer | Number of bytes added to the queue by input workers. | +| `.queue.consumed.events` | Integer | Number of events sent to output workers. | +| `.queue.consumed.bytes` | Integer | Number of bytes sent to output workers. | +| `.queue.removed.events` | Integer | Number of events removed from the queue after being processed by output workers. | +| `.queue.removed.bytes` | Integer | Number of bytes removed from the queue after being processed by output workers. | +|=== + +When using the memory queue, byte metrics are only set if the output supports them. Currently only the Elasticsearch output supports byte metrics. + ifeval::["{beatname_lc}"=="filebeat"] [cols="1,1,2,2"] |=== diff --git a/libbeat/monitoring/report/log/log.go b/libbeat/monitoring/report/log/log.go index e11e8228cf7..b40c6d33e42 100644 --- a/libbeat/monitoring/report/log/log.go +++ b/libbeat/monitoring/report/log/log.go @@ -37,36 +37,39 @@ import ( // TODO: Replace this with a proper solution that uses the metric type from // where it is defined. See: https://github.com/elastic/beats/issues/5433 var gauges = map[string]bool{ - "libbeat.output.events.active": true, - "libbeat.pipeline.events.active": true, - "libbeat.pipeline.clients": true, - "libbeat.pipeline.queue.max_events": true, - "libbeat.pipeline.queue.filled.pct.events": true, - "libbeat.config.module.running": true, - "registrar.states.current": true, - "filebeat.events.active": true, - "filebeat.harvester.running": true, - "filebeat.harvester.open_files": true, - "beat.memstats.memory_total": true, - "beat.memstats.memory_alloc": true, - "beat.memstats.rss": true, - "beat.memstats.gc_next": true, - "beat.info.uptime.ms": true, - "beat.cgroup.memory.mem.usage.bytes": true, - "beat.cpu.user.ticks": true, - "beat.cpu.system.ticks": true, - "beat.cpu.total.value": true, - "beat.cpu.total.ticks": true, - "beat.handles.open": true, - "beat.handles.limit.hard": true, - "beat.handles.limit.soft": true, - "beat.runtime.goroutines": true, - "system.load.1": true, - "system.load.5": true, - "system.load.15": true, - "system.load.norm.1": true, - "system.load.norm.5": true, - "system.load.norm.15": true, + "libbeat.output.events.active": true, + "libbeat.pipeline.events.active": true, + "libbeat.pipeline.clients": true, + "libbeat.pipeline.queue.max_events": true, + "libbeat.pipeline.queue.max_bytes": true, + "libbeat.pipeline.queue.filled.events": true, + "libbeat.pipeline.queue.filled.bytes": true, + "libbeat.pipeline.queue.filled.pct": true, + "libbeat.config.module.running": true, + "registrar.states.current": true, + "filebeat.events.active": true, + "filebeat.harvester.running": true, + "filebeat.harvester.open_files": true, + "beat.memstats.memory_total": true, + "beat.memstats.memory_alloc": true, + "beat.memstats.rss": true, + "beat.memstats.gc_next": true, + "beat.info.uptime.ms": true, + "beat.cgroup.memory.mem.usage.bytes": true, + "beat.cpu.user.ticks": true, + "beat.cpu.system.ticks": true, + "beat.cpu.total.value": true, + "beat.cpu.total.ticks": true, + "beat.handles.open": true, + "beat.handles.limit.hard": true, + "beat.handles.limit.soft": true, + "beat.runtime.goroutines": true, + "system.load.1": true, + "system.load.5": true, + "system.load.15": true, + "system.load.norm.1": true, + "system.load.norm.5": true, + "system.load.norm.15": true, } // isGauge returns true when the given metric key name represents a gauge value. diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index 7ecce6fd8c7..af756213a63 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -37,9 +37,8 @@ type client struct { mutex sync.Mutex waiter *clientCloseWaiter - eventFlags publisher.EventFlags - canDrop bool - eventWaitGroup *sync.WaitGroup + eventFlags publisher.EventFlags + canDrop bool // Open state, signaling, and sync primitives for coordinating client Close. isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore. @@ -132,10 +131,8 @@ func (c *client) publish(e beat.Event) { } func (c *client) Close() error { - // first stop ack handling. ACK handler might block on wait (with timeout), waiting - // for pending events to be ACKed. - c.closeOnce.Do(func() { - c.isOpen.Store(false) + if c.isOpen.Swap(false) { + // Only do shutdown handling the first time Close is called c.onClosing() c.logger.Debug("client: closing acker") @@ -158,7 +155,7 @@ func (c *client) Close() error { } c.logger.Debug("client: done closing processors") } - }) + } return nil } @@ -180,9 +177,6 @@ func (c *client) onNewEvent() { } func (c *client) onPublished() { - if c.eventWaitGroup != nil { - c.eventWaitGroup.Add(1) - } c.observer.publishedEvent() if c.clientListener != nil { c.clientListener.Published() diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 25080c90615..f729d417ca5 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -60,59 +60,27 @@ func TestClient(t *testing.T) { // Note: no asserts. If closing fails we have a deadlock, because Publish // would block forever - cases := map[string]struct { - context bool - close func(client beat.Client, cancel func()) - }{ - "close unblocks client without context": { - context: false, - close: func(client beat.Client, _ func()) { - client.Close() - }, - }, - "close unblocks client with context": { - context: true, - close: func(client beat.Client, _ func()) { - client.Close() - }, - }, - "context cancel unblocks client": { - context: true, - close: func(client beat.Client, cancel func()) { - cancel() - }, - }, - } - logp.TestingSetup() + routinesChecker := resources.NewGoroutinesChecker() + defer routinesChecker.Check(t) - for name, test := range cases { - t.Run(name, func(t *testing.T) { - routinesChecker := resources.NewGoroutinesChecker() - defer routinesChecker.Check(t) + pipeline := makePipeline(t, Settings{}, makeTestQueue()) + defer pipeline.Close() - pipeline := makePipeline(t, Settings{}, makeTestQueue()) - defer pipeline.Close() - - client, err := pipeline.ConnectWith(beat.ClientConfig{}) - if err != nil { - t.Fatal(err) - } - defer client.Close() + client, err := pipeline.ConnectWith(beat.ClientConfig{}) + if err != nil { + t.Fatal(err) + } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - client.Publish(beat.Event{}) - }() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + client.Publish(beat.Event{}) + }() - test.close(client, func() { - client.Close() - }) - wg.Wait() - }) - } + client.Close() + wg.Wait() }) t.Run("no infinite loop when processing fails", func(t *testing.T) { @@ -216,9 +184,6 @@ func TestClient(t *testing.T) { } func TestClientWaitClose(t *testing.T) { - routinesChecker := resources.NewGoroutinesChecker() - defer routinesChecker.Check(t) - makePipeline := func(settings Settings, qu queue.Queue) *Pipeline { p, err := New(beat.Info{}, Monitors{}, @@ -241,6 +206,9 @@ func TestClientWaitClose(t *testing.T) { defer pipeline.Close() t.Run("WaitClose blocks", func(t *testing.T) { + routinesChecker := resources.NewGoroutinesChecker() + defer routinesChecker.Check(t) + client, err := pipeline.ConnectWith(beat.ClientConfig{ WaitClose: 500 * time.Millisecond, }) @@ -272,6 +240,8 @@ func TestClientWaitClose(t *testing.T) { }) t.Run("ACKing events unblocks WaitClose", func(t *testing.T) { + routinesChecker := resources.NewGoroutinesChecker() + defer routinesChecker.Check(t) client, err := pipeline.ConnectWith(beat.ClientConfig{ WaitClose: time.Minute, }) @@ -344,9 +314,6 @@ func TestMonitoring(t *testing.T) { require.NoError(t, err) defer pipeline.Close() - metricsSnapshot := monitoring.CollectFlatSnapshot(metrics, monitoring.Full, true) - assert.Equal(t, int64(maxEvents), metricsSnapshot.Ints["pipeline.queue.max_events"]) - telemetrySnapshot := monitoring.CollectFlatSnapshot(telemetry, monitoring.Full, true) assert.Equal(t, "output_name", telemetrySnapshot.Strings["output.name"]) assert.Equal(t, int64(batchSize), telemetrySnapshot.Ints["output.batch_size"]) diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index 1ff8c1bc95d..a7806a3ded2 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -31,8 +31,8 @@ import ( type eventConsumer struct { logger *logp.Logger - // eventConsumer calls the observer methods eventsRetry and eventsDropped. - observer outputObserver + // eventConsumer calls the retryObserver methods eventsRetry and eventsDropped. + retryObserver retryObserver // When the output changes, the new target is sent to the worker routine // on this channel. Clients should call eventConsumer.setTarget(). @@ -73,12 +73,12 @@ type retryRequest struct { func newEventConsumer( log *logp.Logger, - observer outputObserver, + observer retryObserver, ) *eventConsumer { c := &eventConsumer{ - logger: log, - observer: observer, - queueReader: makeQueueReader(), + logger: log, + retryObserver: observer, + queueReader: makeQueueReader(), targetChan: make(chan consumerTarget), retryChan: make(chan retryRequest), @@ -163,7 +163,7 @@ outerLoop: // Successfully sent a batch to the output workers if len(retryBatches) > 0 { // This was a retry, report it to the observer - c.observer.eventsRetry(len(active.Events())) + c.retryObserver.eventsRetry(len(active.Events())) retryBatches = retryBatches[1:] } else { // This was directly from the queue, clear the value so we can @@ -183,7 +183,7 @@ outerLoop: alive := req.batch.reduceTTL() countDropped := countFailed - len(req.batch.Events()) - c.observer.eventsDropped(countDropped) + c.retryObserver.eventsDropped(countDropped) if !alive { log.Info("Drop batch") diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index b34d6a64d2c..d7e07846e0c 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -19,6 +19,7 @@ package pipeline import ( "sync" + "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" @@ -38,11 +39,6 @@ import ( type outputController struct { beat beat.Info monitors Monitors - observer outputObserver - - // If eventWaitGroup is non-nil, it will be decremented as the queue - // reports upstream acknowledgment of published events. - eventWaitGroup *sync.WaitGroup // The queue is not created until the outputController is assigned a // nonempty outputs.Group, in case the output group requests a proxy @@ -58,10 +54,15 @@ type outputController struct { // is called. queueFactory queue.QueueFactory + // consumer is a helper goroutine that reads event batches from the queue + // and sends them to workerChan for an output worker to process. + consumer *eventConsumer + + // Each worker is a goroutine that will read batches from workerChan and + // send them to the output. + workers []outputWorker workerChan chan publisher.Batch - consumer *eventConsumer - workers []outputWorker // The InputQueueSize can be set when the Beat is started, in // libbeat/cmd/instance/Settings we need to preserve that // value and pass it into the queue factory. The queue @@ -85,54 +86,42 @@ type outputWorker interface { func newOutputController( beat beat.Info, monitors Monitors, - observer outputObserver, - eventWaitGroup *sync.WaitGroup, + retryObserver retryObserver, queueFactory queue.QueueFactory, inputQueueSize int, ) (*outputController, error) { controller := &outputController{ beat: beat, monitors: monitors, - observer: observer, - eventWaitGroup: eventWaitGroup, queueFactory: queueFactory, workerChan: make(chan publisher.Batch), - consumer: newEventConsumer(monitors.Logger, observer), + consumer: newEventConsumer(monitors.Logger, retryObserver), inputQueueSize: inputQueueSize, } return controller, nil } -func (c *outputController) Close() error { +func (c *outputController) WaitClose(timeout time.Duration) error { + // First: signal the queue that we're shutting down, and wait up to the + // given duration for it to drain and process ACKs. + c.closeQueue(timeout) + + // We've drained the queue as much as we can, signal eventConsumer to + // close, and wait for it to finish. After consumer.close returns, + // there will be no more writes to c.workerChan, so it is safe to close. c.consumer.close() close(c.workerChan) + // Signal the output workers to close. This step is a hint, and carries + // no guarantees. For example, on close the Elasticsearch output workers + // will close idle connections, but will not change any behavior for + // active connections, giving any remaining events a chance to ingest + // before we terminate. for _, out := range c.workers { out.Close() } - // Closing the queue stops ACKs from propagating, so we close everything - // else first to give it a chance to wait for any outstanding events to be - // acknowledged. - c.queueLock.Lock() - if c.queue != nil { - c.queue.Close() - } - for _, req := range c.pendingRequests { - // We can only end up here if there was an attempt to connect to the - // pipeline but it was shut down before any output was set. - // In this case, return nil and Pipeline.ConnectWith will pass on a - // real error to the caller. - // NOTE: under the current shutdown process, Pipeline.Close (and hence - // outputController.Close) is ~never called. So even if we did have - // blocked callers here, in a real shutdown they will never be woken - // up. But in hopes of a day when the shutdown process is more robust, - // I've decided to do the right thing here anyway. - req.responseChan <- nil - } - c.queueLock.Unlock() - return nil } @@ -203,6 +192,32 @@ func (c *outputController) Reload( return nil } +// Close the queue, waiting up to the specified timeout for pending events +// to complete. +func (c *outputController) closeQueue(timeout time.Duration) { + c.queueLock.Lock() + defer c.queueLock.Unlock() + if c.queue != nil { + c.queue.Close() + select { + case <-c.queue.Done(): + case <-time.After(timeout): + } + } + for _, req := range c.pendingRequests { + // We can only end up here if there was an attempt to connect to the + // pipeline but it was shut down before any output was set. + // In this case, return nil and Pipeline.ConnectWith will pass on a + // real error to the caller. + // NOTE: under the current shutdown process, Pipeline.Close (and hence + // outputController.Close) is ~never called. So even if we did have + // blocked callers here, in a real shutdown they will never be woken + // up. But in hopes of a day when the shutdown process is more robust, + // I've decided to do the right thing here anyway. + req.responseChan <- nil + } +} + // queueProducer creates a queue producer with the given config, blocking // until the queue is created if it does not yet exist. func (c *outputController) queueProducer(config queue.ProducerConfig) queue.Producer { @@ -233,16 +248,6 @@ func (c *outputController) queueProducer(config queue.ProducerConfig) queue.Prod return <-request.responseChan } -// onACK receives event acknowledgment notifications from the queue and -// forwards them to the metrics observer and the pipeline's global event -// wait group if one is set. -func (c *outputController) onACK(eventCount int) { - c.observer.queueACKed(eventCount) - if c.eventWaitGroup != nil { - c.eventWaitGroup.Add(-eventCount) - } -} - func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { logger := c.monitors.Logger if len(outGrp.Clients) == 0 { @@ -266,12 +271,21 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { if factory == nil { factory = c.queueFactory } + // Queue metrics are reported under the pipeline namespace + var pipelineMetrics *monitoring.Registry + if c.monitors.Metrics != nil { + pipelineMetrics := c.monitors.Metrics.GetRegistry("pipeline") + if pipelineMetrics == nil { + pipelineMetrics = c.monitors.Metrics.NewRegistry("pipeline") + } + } + queueObserver := queue.NewQueueObserver(pipelineMetrics) - queue, err := factory(logger, c.onACK, c.inputQueueSize, outGrp.EncoderFactory) + queue, err := factory(logger, queueObserver, c.inputQueueSize, outGrp.EncoderFactory) if err != nil { logger.Errorf("queue creation failed, falling back to default memory queue, check your queue configuration") s, _ := memqueue.SettingsForUserConfig(nil) - queue = memqueue.NewQueue(logger, c.onACK, s, c.inputQueueSize, outGrp.EncoderFactory) + queue = memqueue.NewQueue(logger, queueObserver, s, c.inputQueueSize, outGrp.EncoderFactory) } c.queue = queue @@ -279,8 +293,6 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { queueReg := c.monitors.Telemetry.NewRegistry("queue") monitoring.NewString(queueReg, "name").Set(c.queue.QueueType()) } - maxEvents := c.queue.BufferConfig().MaxEvents - c.observer.queueMaxEvents(maxEvents) // Now that we've created a queue, go through and unblock any callers // that are waiting for a producer. diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 6834af2c7f3..2e4f0df990f 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -150,9 +150,9 @@ func TestQueueCreatedOnlyAfterOutputExists(t *testing.T) { // We aren't testing the values sent to eventConsumer, we // just need a placeholder here so outputController can // send configuration updates without blocking. - targetChan: make(chan consumerTarget, 4), + targetChan: make(chan consumerTarget, 4), + retryObserver: nilObserver, }, - observer: nilObserver, } // Set to an empty output group. This should not create a queue. controller.Set(outputs.Group{}) @@ -173,9 +173,9 @@ func TestOutputQueueFactoryTakesPrecedence(t *testing.T) { memqueue.Settings{Events: 1}, ), consumer: &eventConsumer{ - targetChan: make(chan consumerTarget, 4), + targetChan: make(chan consumerTarget, 4), + retryObserver: nilObserver, }, - observer: nilObserver, } controller.Set(outputs.Group{ Clients: []outputs.Client{newMockClient(nil)}, @@ -189,15 +189,15 @@ func TestOutputQueueFactoryTakesPrecedence(t *testing.T) { func TestFailedQueueFactoryRevertsToDefault(t *testing.T) { defaultSettings, _ := memqueue.SettingsForUserConfig(nil) - failedFactory := func(_ *logp.Logger, _ func(int), _ int, _ queue.EncoderFactory) (queue.Queue, error) { + failedFactory := func(_ *logp.Logger, _ queue.Observer, _ int, _ queue.EncoderFactory) (queue.Queue, error) { return nil, fmt.Errorf("This queue creation intentionally failed") } controller := outputController{ queueFactory: failedFactory, consumer: &eventConsumer{ - targetChan: make(chan consumerTarget, 4), + targetChan: make(chan consumerTarget, 4), + retryObserver: nilObserver, }, - observer: nilObserver, monitors: Monitors{ Logger: logp.NewLogger("tests"), }, @@ -213,9 +213,9 @@ func TestQueueProducerBlocksUntilOutputIsSet(t *testing.T) { controller := outputController{ queueFactory: memqueue.FactoryForSettings(memqueue.Settings{Events: 1}), consumer: &eventConsumer{ - targetChan: make(chan consumerTarget, 4), + targetChan: make(chan consumerTarget, 4), + retryObserver: nilObserver, }, - observer: nilObserver, } // Send producer requests from different goroutines. They should all // block, because there is no queue, but they should become unblocked diff --git a/libbeat/publisher/pipeline/monitoring.go b/libbeat/publisher/pipeline/monitoring.go index 0bc63a739f9..4a1e5ad76a1 100644 --- a/libbeat/publisher/pipeline/monitoring.go +++ b/libbeat/publisher/pipeline/monitoring.go @@ -18,15 +18,13 @@ package pipeline import ( - "math" - "github.com/elastic/elastic-agent-libs/monitoring" ) type observer interface { pipelineObserver clientObserver - outputObserver + retryObserver cleanup() } @@ -47,18 +45,14 @@ type clientObserver interface { publishedEvent() // An event was rejected by the queue failedPublishEvent() + eventsACKed(count int) } -type outputObserver interface { +type retryObserver interface { // Events encountered too many errors and were permanently dropped. eventsDropped(int) // Events were sent back to an output worker after an earlier failure. eventsRetry(int) - // The queue received acknowledgment for events from the output workers. - // (This may include events already reported via eventsDropped.) - queueACKed(n int) - // Report the maximum event count supported by the queue. - queueMaxEvents(n int) } // metricsObserver is used by many component in the publisher pipeline, to report @@ -165,24 +159,12 @@ func (o *metricsObserver) clientClosed() { o.vars.clients.Dec() } func (o *metricsObserver) newEvent() { o.vars.eventsTotal.Inc() o.vars.activeEvents.Inc() - o.setPercentageFull() -} - -// setPercentageFull is used interally to set the `queue.full` metric -func (o *metricsObserver) setPercentageFull() { - maxEvt := o.vars.queueMaxEvents.Get() - if maxEvt != 0 { - pct := float64(o.vars.activeEvents.Get()) / float64(maxEvt) - pctRound := math.Round(pct/0.0005) * 0.0005 - o.vars.percentQueueFull.Set(pctRound) - } } // (client) event is filtered out (on purpose or failed) func (o *metricsObserver) filteredEvent() { o.vars.eventsFiltered.Inc() o.vars.activeEvents.Dec() - o.setPercentageFull() } // (client) managed to push an event into the publisher pipeline @@ -190,28 +172,15 @@ func (o *metricsObserver) publishedEvent() { o.vars.eventsPublished.Inc() } +// (client) number of ACKed events from this client +func (o *metricsObserver) eventsACKed(n int) { + o.vars.activeEvents.Sub(uint64(n)) +} + // (client) client closing down or DropIfFull is set func (o *metricsObserver) failedPublishEvent() { o.vars.eventsFailed.Inc() o.vars.activeEvents.Dec() - o.setPercentageFull() -} - -// -// queue events -// - -// (queue) number of events ACKed by the queue/broker in use -func (o *metricsObserver) queueACKed(n int) { - o.vars.queueACKed.Add(uint64(n)) - o.vars.activeEvents.Sub(uint64(n)) - o.setPercentageFull() -} - -// (queue) maximum queue event capacity -func (o *metricsObserver) queueMaxEvents(n int) { - o.vars.queueMaxEvents.Set(uint64(n)) - o.setPercentageFull() } // @@ -239,7 +208,6 @@ func (*emptyObserver) newEvent() {} func (*emptyObserver) filteredEvent() {} func (*emptyObserver) publishedEvent() {} func (*emptyObserver) failedPublishEvent() {} -func (*emptyObserver) queueACKed(n int) {} -func (*emptyObserver) queueMaxEvents(int) {} +func (*emptyObserver) eventsACKed(n int) {} func (*emptyObserver) eventsDropped(int) {} func (*emptyObserver) eventsRetry(int) {} diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index dbe87681ea6..a5a13a0584e 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -22,7 +22,6 @@ package pipeline import ( "fmt" - "sync" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -64,12 +63,9 @@ type Pipeline struct { observer observer - // wait close support. If eventWaitGroup is non-nil, then publishing - // an event through this pipeline will increment it and acknowledging - // a published event will decrement it, so the pipeline can wait on - // the group on shutdown to allow pending events to be acknowledged. + // If waitCloseTimeout is positive, then the pipeline will wait up to the + // specified time when it is closed for pending events to be acknowledged. waitCloseTimeout time.Duration - eventWaitGroup *sync.WaitGroup processors processing.Supporter } @@ -132,9 +128,7 @@ func New( processors: settings.Processors, } if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 { - // If wait-on-close is enabled, give the pipeline a WaitGroup for - // events that have been Published but not yet ACKed. - p.eventWaitGroup = &sync.WaitGroup{} + p.waitCloseTimeout = settings.WaitClose } if monitors.Metrics != nil { @@ -153,7 +147,7 @@ func New( return nil, err } - output, err := newOutputController(beat, monitors, p.observer, p.eventWaitGroup, queueFactory, settings.InputQueueSize) + output, err := newOutputController(beat, monitors, p.observer, queueFactory, settings.InputQueueSize) if err != nil { return nil, err } @@ -172,24 +166,8 @@ func (p *Pipeline) Close() error { log.Debug("close pipeline") - if p.eventWaitGroup != nil { - ch := make(chan struct{}) - go func() { - p.eventWaitGroup.Wait() - ch <- struct{}{} - }() - - select { - case <-ch: - // all events have been ACKed - - case <-time.After(p.waitCloseTimeout): - // timeout -> close pipeline with pending events - } - } - // Note: active clients are not closed / disconnected. - p.outputController.Close() + p.outputController.WaitClose(p.waitCloseTimeout) p.observer.cleanup() return nil @@ -238,20 +216,14 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { processors: processors, eventFlags: eventFlags, canDrop: canDrop, - eventWaitGroup: p.eventWaitGroup, observer: p.observer, } ackHandler := cfg.EventListener - producerCfg := queue.ProducerConfig{} - var waiter *clientCloseWaiter if waitClose > 0 { waiter = newClientCloseWaiter(waitClose) - } - - if waiter != nil { if ackHandler == nil { ackHandler = waiter } else { @@ -259,9 +231,16 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } } - if ackHandler != nil { - producerCfg.ACK = ackHandler.ACKEvents - } else { + producerCfg := queue.ProducerConfig{ + ACK: func(count int) { + client.observer.eventsACKed(count) + if ackHandler != nil { + ackHandler.ACKEvents(count) + } + }, + } + + if ackHandler == nil { ackHandler = acker.Nil() } diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index 78725b043f1..a8cf34b895a 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -125,10 +125,6 @@ type testProducer struct { cancel func() } -func (q *testQueue) Metrics() (queue.Metrics, error) { - return queue.Metrics{}, nil -} - func (q *testQueue) Close() error { if q.close != nil { return q.close() @@ -136,6 +132,10 @@ func (q *testQueue) Close() error { return nil } +func (q *testQueue) Done() <-chan struct{} { + return nil +} + func (q *testQueue) QueueType() string { return "test" } diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 55098b10fa8..20e6648d927 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -54,6 +54,13 @@ eventLoop: } } + // Check the batch size so we can report to the metrics observer + batchByteCount := 0 + for _, frame := range frames { + batchByteCount += int(frame.bytesOnDisk) + } + dq.observer.ConsumeEvents(len(frames), batchByteCount) + // There is a mild race condition here based on queue closure: events // written to readerLoop.output may have been buffered before the // queue was closed, and we may be reading its leftovers afterwards. diff --git a/libbeat/publisher/queue/diskqueue/consumer_test.go b/libbeat/publisher/queue/diskqueue/consumer_test.go new file mode 100644 index 00000000000..80378029be2 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/consumer_test.go @@ -0,0 +1,61 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/elastic-agent-libs/monitoring" +) + +func TestQueueGetObserver(t *testing.T) { + reg := monitoring.NewRegistry() + const eventCount = 50 + dq := diskQueue{ + observer: queue.NewQueueObserver(reg), + readerLoop: &readerLoop{ + output: make(chan *readFrame, eventCount), + }, + } + for i := 0; i < eventCount; i++ { + dq.readerLoop.output <- &readFrame{bytesOnDisk: 123} + } + _, err := dq.Get(eventCount) + assert.NoError(t, err, "Queue Get call should succeed") + assertRegistryUint(t, reg, "queue.consumed.events", eventCount, "Get call should report consumed events") + assertRegistryUint(t, reg, "queue.consumed.bytes", eventCount*123, "Get call should report consumed bytes") +} + +func assertRegistryUint(t *testing.T, reg *monitoring.Registry, key string, expected uint64, message string) { + t.Helper() + + entry := reg.Get(key) + if entry == nil { + assert.Failf(t, message, "registry key '%v' doesn't exist", key) + return + } + value, ok := reg.Get(key).(*monitoring.Uint) + if !ok { + assert.Failf(t, message, "registry key '%v' doesn't refer to a uint64", key) + return + } + assert.Equal(t, expected, value.Get(), message) +} diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go index 93051dd4581..4f30a0e58ba 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop.go +++ b/libbeat/publisher/queue/diskqueue/core_loop.go @@ -47,7 +47,7 @@ func (dq *diskQueue) run() { // After receiving new ACKs, a segment might be ready to delete. dq.maybeDeleteACKed() - case <-dq.done: + case <-dq.close: dq.handleShutdown() return @@ -84,21 +84,10 @@ func (dq *diskQueue) run() { // If there were blocked producers waiting for more queue space, // we might be able to unblock them now. dq.maybeUnblockProducers() - - case metricsReq := <-dq.metricsRequestChan: - dq.handleMetricsRequest(metricsReq) } } } -// handleMetricsRequest responds to an event on the metricsRequestChan chan -func (dq *diskQueue) handleMetricsRequest(request metricsRequest) { - resp := metricsRequestResponse{ - sizeOnDisk: dq.segments.sizeOnDisk(), - } - request.response <- resp -} - func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) { // Pathological case checking: make sure the incoming frame isn't bigger // than an entire segment all by itself (as long as it isn't, it is @@ -122,6 +111,7 @@ func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) { // pending list and report success, then dispatch it to the // writer loop if no other requests are outstanding. dq.enqueueWriteFrame(request.frame) + dq.observer.AddEvent(int(request.frame.sizeOnDisk())) request.responseChan <- true } else { // The queue is too full. Either add the request to blockedProducers, @@ -186,6 +176,8 @@ func (dq *diskQueue) handleDeleterLoopResponse(response deleterLoopResponse) { dq.deleting = false newAckedSegments := []*queueSegment{} errors := []error{} + removedEventCount := 0 + removedByteCount := 0 for i, err := range response.results { if err != nil { // This segment had an error, so it stays in the acked list. @@ -193,8 +185,15 @@ func (dq *diskQueue) handleDeleterLoopResponse(response deleterLoopResponse) { errors = append(errors, fmt.Errorf("couldn't delete segment %d: %w", dq.segments.acked[i].id, err)) + } else { + removedEventCount += int(dq.segments.acked[i].frameCount) + // For the metrics observer, we (can) only report the size of the raw + // events, not the segment header, so subtract that here so it doesn't + // look like we're deleting more than was added in the first place. + removedByteCount += int(dq.segments.acked[i].byteCount - dq.segments.acked[i].headerSize()) } } + dq.observer.RemoveEvents(removedEventCount, removedByteCount) if len(dq.segments.acked) > len(response.results) { // Preserve any new acked segments that were added during the deletion // request. @@ -479,9 +478,13 @@ func (dq *diskQueue) canAcceptFrameOfSize(frameSize uint64) bool { return true } - // Compute the current queue size. We accept if there is enough capacity - // left in the queue after accounting for the existing segments and the - // pending writes that were already accepted. + // We accept if there is enough capacity left in the queue after accounting + // for the existing segments and the pending writes that were already + // accepted. + return dq.currentSize()+frameSize <= dq.settings.MaxBufferSize +} + +func (dq *diskQueue) currentSize() uint64 { pendingBytes := uint64(0) for _, sf := range dq.pendingFrames { pendingBytes += sf.frame.sizeOnDisk() @@ -490,7 +493,5 @@ func (dq *diskQueue) canAcceptFrameOfSize(frameSize uint64) bool { if dq.writing { pendingBytes += dq.writeRequestSize } - currentSize := pendingBytes + dq.segments.sizeOnDisk() - - return currentSize+frameSize <= dq.settings.MaxBufferSize + return pendingBytes + dq.segments.sizeOnDisk() } diff --git a/libbeat/publisher/queue/diskqueue/core_loop_test.go b/libbeat/publisher/queue/diskqueue/core_loop_test.go index a48142655f9..5a0a35b367b 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop_test.go +++ b/libbeat/publisher/queue/diskqueue/core_loop_test.go @@ -18,10 +18,13 @@ package diskqueue import ( + "errors" "fmt" "testing" + "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" ) func TestHandleProducerWriteRequest(t *testing.T) { @@ -127,6 +130,7 @@ func TestHandleProducerWriteRequest(t *testing.T) { for description, test := range testCases { dq := &diskQueue{ logger: logp.L(), + observer: queue.NewQueueObserver(nil), settings: settings, segments: test.segments, } @@ -949,6 +953,65 @@ func TestCanAcceptFrameOfSize(t *testing.T) { } } +func TestObserverAddEvent(t *testing.T) { + // Check that write requests accepted by the queue are reported to its + // metrics observer. + reg := monitoring.NewRegistry() + dq := diskQueue{ + settings: Settings{ + MaxBufferSize: 100000, + MaxSegmentSize: 1000, + WriteAheadLimit: 10, + }, + observer: queue.NewQueueObserver(reg), + } + eventFrame := &writeFrame{serialized: make([]byte, 123)} + request := producerWriteRequest{ + frame: eventFrame, + responseChan: make(chan bool, 1), + } + dq.handleProducerWriteRequest(request) + assertRegistryUint(t, reg, "queue.added.events", 1, "handleProducerWriteRequest should report the added event") + assertRegistryUint(t, reg, "queue.added.bytes", eventFrame.sizeOnDisk(), "handleProducerWriteRequest should report the added bytes") +} + +func TestObserverDeleteSegment(t *testing.T) { + // Check that the results of segment deletions are reported to the + // metrics observer. + reg := monitoring.NewRegistry() + dq := diskQueue{ + logger: logp.NewLogger("testing"), + observer: queue.NewQueueObserver(reg), + } + // Note the segment header size is added to the test values, because segment + // metadata isn't included in event metrics. + dq.segments.acked = []*queueSegment{ + { + frameCount: 50, + byteCount: 1234 + segmentHeaderSize, + }, + { + frameCount: 25, + byteCount: 567 + segmentHeaderSize, + }, + } + // Handle a deletion response of length 1, which means the second acked + // segment shouldn't be reported yet. + dq.handleDeleterLoopResponse(deleterLoopResponse{results: []error{nil}}) + assertRegistryUint(t, reg, "queue.removed.events", 50, "Deleted events should be reported") + assertRegistryUint(t, reg, "queue.removed.bytes", 1234, "Deleted bytes should be reported") + + // Report an error, which should not change the metrics values + dq.handleDeleterLoopResponse(deleterLoopResponse{results: []error{errors.New("some error")}}) + assertRegistryUint(t, reg, "queue.removed.events", 50, "Failed deletion shouldn't report any removed events") + assertRegistryUint(t, reg, "queue.removed.bytes", 1234, "Failed deletion shouldn't report any removed bytes") + + // Now send a nil error, which should add the second segment to the metrics. + dq.handleDeleterLoopResponse(deleterLoopResponse{results: []error{nil}}) + assertRegistryUint(t, reg, "queue.removed.events", 50+25, "Deleted events should be reported") + assertRegistryUint(t, reg, "queue.removed.bytes", 1234+567, "Deleted bytes should be reported") +} + func boolRef(b bool) *bool { return &b } diff --git a/libbeat/publisher/queue/diskqueue/producer.go b/libbeat/publisher/queue/diskqueue/producer.go index 7d084adf5ea..c379ac40637 100644 --- a/libbeat/publisher/queue/diskqueue/producer.go +++ b/libbeat/publisher/queue/diskqueue/producer.go @@ -87,7 +87,7 @@ func (producer *diskQueueProducer) publish( // blocking the core loop. response := <-request.responseChan return response - case <-producer.queue.done: + case <-producer.queue.close: return false case <-producer.done: return false diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 4fedcfa6a6e..e07c0185ade 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -20,13 +20,10 @@ package diskqueue import ( "errors" "fmt" - "io" "os" - "sync" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/opt" ) // The string used to specify this queue in beats configurations. @@ -36,6 +33,7 @@ const QueueType = "disk" // of queue.Queue. type diskQueue struct { logger *logp.Logger + observer queue.Observer settings Settings // Metadata related to the segment files. @@ -50,10 +48,6 @@ type diskQueue struct { writerLoop *writerLoop deleterLoop *deleterLoop - // Wait group for shutdown of the goroutines associated with this queue: - // reader loop, writer loop, deleter loop, and core loop (diskQueue.run()). - waitGroup sync.WaitGroup - // writing is true if the writer loop is processing a request, false // otherwise. writing bool @@ -74,9 +68,6 @@ type diskQueue struct { // The API channel used by diskQueueProducer to write events. producerWriteRequestChan chan producerWriteRequest - // API channel used by the public Metrics() API to request queue metrics - metricsRequestChan chan metricsRequest - // pendingFrames is a list of all incoming data frames that have been // accepted by the queue and are waiting to be sent to the writer loop. // Segment ids in this list always appear in sorted order, even between @@ -88,18 +79,13 @@ type diskQueue struct { // waiting for free space in the queue. blockedProducers []producerWriteRequest - // The channel to signal our goroutines to shut down. - done chan struct{} -} - -// channel request for metrics from an external client -type metricsRequest struct { - response chan metricsRequestResponse -} + // The channel to signal our goroutines to shut down, used by + // (*diskQueue).Close. + close chan struct{} -// metrics response from the disk queue -type metricsRequestResponse struct { - sizeOnDisk uint64 + // The channel to report that shutdown is finished, used by + // (*diskQueue).Done. + done chan struct{} } // FactoryForSettings is a simple wrapper around NewQueue so a concrete @@ -108,11 +94,11 @@ type metricsRequestResponse struct { func FactoryForSettings(settings Settings) queue.QueueFactory { return func( logger *logp.Logger, - ackCallback func(eventCount int), + observer queue.Observer, inputQueueSize int, encoderFactory queue.EncoderFactory, ) (queue.Queue, error) { - return NewQueue(logger, ackCallback, settings, encoderFactory) + return NewQueue(logger, observer, settings, encoderFactory) } } @@ -120,13 +106,16 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { // and settings, creating it if it doesn't exist. func NewQueue( logger *logp.Logger, - writeToDiskCallback func(eventCount int), + observer queue.Observer, settings Settings, encoderFactory queue.EncoderFactory, ) (*diskQueue, error) { logger = logger.Named("diskqueue") logger.Debugf( "Initializing disk queue at path %v", settings.directoryPath()) + if observer == nil { + observer = queue.NewQueueObserver(nil) + } if settings.MaxBufferSize > 0 && settings.MaxBufferSize < settings.MaxSegmentSize*2 { @@ -135,6 +124,7 @@ func NewQueue( "twice the segment size (%v)", settings.MaxBufferSize, settings.MaxSegmentSize) } + observer.MaxBytes(int(settings.MaxBufferSize)) // Create the given directory path if it doesn't exist. err := os.MkdirAll(settings.directoryPath(), os.ModePerm) @@ -182,6 +172,15 @@ func NewQueue( lastID := initialSegments[len(initialSegments)-1].id nextSegmentID = lastID + 1 } + // Check the initial contents to report to the metrics observer. + initialEventCount := 0 + initialByteCount := 0 + for _, segment := range initialSegments { + initialEventCount += int(segment.frameCount) + // Event metrics for the queue observer don't include segment headser size + initialByteCount += int(segment.byteCount - segment.headerSize()) + } + observer.Restore(initialEventCount, initialByteCount) // If any of the initial segments are older than the current queue // position, move them directly to the acked list where they can be @@ -199,20 +198,13 @@ func NewQueue( nextReadPosition = queuePosition{segmentID: initialSegments[0].id} } - // We can compute the active frames right now but still need a way to report - // them to the global beat metrics. For now, just log the total. - // Note that for consistency with existing queue behavior, this excludes - // events that are still present on disk but were already sent and - // acknowledged on a previous run (we probably want to track these as well - // in the future.) - //nolint:godox // Ignore This - // TODO: pass in a context that queues can use to report these events. + // Count just the active events to report in the log activeFrameCount := 0 for _, segment := range initialSegments { activeFrameCount += int(segment.frameCount) } activeFrameCount -= int(nextReadPosition.frameIndex) - logger.Infof("Found %d existing events on queue start", activeFrameCount) + logger.Infof("Found %v queued events consuming %v bytes, %v events still pending", initialEventCount, initialByteCount, activeFrameCount) var encoder queue.Encoder if encoderFactory != nil { @@ -221,6 +213,7 @@ func NewQueue( queue := &diskQueue{ logger: logger, + observer: observer, settings: settings, segments: diskQueueSegments{ @@ -233,36 +226,20 @@ func NewQueue( acks: newDiskQueueACKs(logger, nextReadPosition, positionFile), readerLoop: newReaderLoop(settings, encoder), - writerLoop: newWriterLoop(logger, writeToDiskCallback, settings), + writerLoop: newWriterLoop(logger, settings), deleterLoop: newDeleterLoop(settings), producerWriteRequestChan: make(chan producerWriteRequest), - metricsRequestChan: make(chan metricsRequest), - done: make(chan struct{}), + close: make(chan struct{}), + done: make(chan struct{}), } - // We wait for four goroutines on shutdown: core loop, reader loop, - // writer loop, deleter loop. - queue.waitGroup.Add(4) - // Start the goroutines and return the queue! - go func() { - queue.readerLoop.run() - queue.waitGroup.Done() - }() - go func() { - queue.writerLoop.run() - queue.waitGroup.Done() - }() - go func() { - queue.deleterLoop.run() - queue.waitGroup.Done() - }() - go func() { - queue.run() - queue.waitGroup.Done() - }() + go queue.readerLoop.run() + go queue.writerLoop.run() + go queue.deleterLoop.run() + go queue.run() return queue, nil } @@ -274,12 +251,15 @@ func NewQueue( func (dq *diskQueue) Close() error { // Closing the done channel signals to the core loop that it should // shut down the other helper goroutines and wrap everything up. - close(dq.done) - dq.waitGroup.Wait() + close(dq.close) return nil } +func (dq *diskQueue) Done() <-chan struct{} { + return dq.done +} + func (dq *diskQueue) QueueType() string { return QueueType } @@ -296,29 +276,3 @@ func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer { done: make(chan struct{}), } } - -// Metrics returns current disk metrics -func (dq *diskQueue) Metrics() (queue.Metrics, error) { - respChan := make(chan metricsRequestResponse, 1) - req := metricsRequest{response: respChan} - - select { - case <-dq.done: - return queue.Metrics{}, io.EOF - case dq.metricsRequestChan <- req: - - } - - resp := metricsRequestResponse{} - select { - case <-dq.done: - return queue.Metrics{}, io.EOF - case resp = <-respChan: - } - - maxSize := dq.settings.MaxBufferSize - return queue.Metrics{ - ByteLimit: opt.UintWith(maxSize), - ByteCount: opt.UintWith(resp.sizeOnDisk), - }, nil -} diff --git a/libbeat/publisher/queue/diskqueue/queue_test.go b/libbeat/publisher/queue/diskqueue/queue_test.go index f6a4c406ed3..30c770e45a4 100644 --- a/libbeat/publisher/queue/diskqueue/queue_test.go +++ b/libbeat/publisher/queue/diskqueue/queue_test.go @@ -28,9 +28,6 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" - - "github.com/stretchr/testify/require" ) var seed int64 @@ -78,44 +75,6 @@ func TestProduceConsumer(t *testing.T) { t.Run("direct", testWith(makeTestQueue())) } -func TestMetrics(t *testing.T) { - dir, err := ioutil.TempDir("", "diskqueue_metrics") - defer func() { - _ = os.RemoveAll(dir) - }() - require.NoError(t, err) - settings := DefaultSettings() - settings.Path = dir - // lower max segment size so we can get multiple segments - settings.MaxSegmentSize = 100 - - testQueue, err := NewQueue(logp.L(), nil, settings, nil) - require.NoError(t, err) - defer testQueue.Close() - - eventsToTest := 100 - - // Send events to queue - producer := testQueue.Producer(queue.ProducerConfig{}) - sendEventsToQueue(eventsToTest, producer) - - // fetch metrics before we read any events - time.Sleep(time.Millisecond * 500) - testMetrics, err := testQueue.Metrics() - require.NoError(t, err) - - require.Equal(t, testMetrics.ByteLimit.ValueOr(0), uint64((1 << 30))) - require.NotZero(t, testMetrics.ByteCount.ValueOr(0)) - t.Logf("got %d bytes written", testMetrics.ByteCount.ValueOr(0)) - -} - -func sendEventsToQueue(count int, prod queue.Producer) { - for i := 0; i < count; i++ { - prod.Publish(queuetest.MakeEvent(mapstr.M{"count": i})) - } -} - func makeTestQueue() queuetest.QueueFactory { return func(t *testing.T) queue.Queue { dir, err := ioutil.TempDir("", "diskqueue_test") diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index 0460fc4431a..7e3661f6e5b 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -94,10 +94,7 @@ type queueSegment struct { // If this segment was loaded from a previous session, schemaVersion // points to the file schema version that was read from its header. // This is only used by queueSegment.headerSize(), which is used in - // maybeReadPending to calculate the position of the first data frame, - // and by queueSegment.shouldUseJSON(), which is used in the reader - // loop to detect old segments that used JSON encoding instead of - // the current CBOR. + // maybeReadPending to calculate the position of the first data frame. schemaVersion *uint32 // The number of bytes occupied by this segment on-disk, as of the most diff --git a/libbeat/publisher/queue/diskqueue/writer_loop.go b/libbeat/publisher/queue/diskqueue/writer_loop.go index c0e7103c41b..72cfb04642e 100644 --- a/libbeat/publisher/queue/diskqueue/writer_loop.go +++ b/libbeat/publisher/queue/diskqueue/writer_loop.go @@ -71,10 +71,6 @@ type writerLoop struct { // The logger for the writer loop, assigned when the queue creates it. logger *logp.Logger - // A callback that, if set, should be invoked with an event count when - // events are successfully written to disk. - writeToDiskCallback func(eventCount int) - // The writer loop listens on requestChan for frames to write, and // writes them to disk immediately (all queue capacity checking etc. is // done by the core loop before sending it to the writer). @@ -102,14 +98,12 @@ type writerLoop struct { func newWriterLoop( logger *logp.Logger, - writeToDiskCallback func(eventCount int), settings Settings, ) *writerLoop { buffer := &bytes.Buffer{} return &writerLoop{ - logger: logger, - writeToDiskCallback: writeToDiskCallback, - settings: settings, + logger: logger, + settings: settings, requestChan: make(chan writerLoopRequest, 1), responseChan: make(chan writerLoopResponse), @@ -243,11 +237,6 @@ outerLoop: // Try to sync the written data to disk. _ = wl.outputFile.Sync() - // If the queue has an ACK listener, notify it the frames were written. - if wl.writeToDiskCallback != nil { - wl.writeToDiskCallback(totalACKCount) - } - // Notify any producers with ACK listeners that their frames were written. for producer, ackCount := range producerACKCounts { producer.config.ACK(ackCount) diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index 1a964d8bb45..9432bd5af19 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -67,10 +67,6 @@ func (l *ackLoop) handleBatchSig() int { } if count > 0 { - if callback := l.broker.ackCallback; callback != nil { - callback(count) - } - // report acks to waiting clients l.processACK(ackedBatches, count) } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index d9aff10bd3a..b617bae6110 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/opt" ) // The string used to specify this queue in beats configurations. @@ -66,9 +65,8 @@ type broker struct { // Consumers send requests to getChan to read events from the queue. getChan chan getRequest - // Metrics() sends requests to metricChan to expose internal queue - // metrics to external callers. - metricChan chan metricsRequest + // Close triggers a queue close by sending to closeChan. + closeChan chan struct{} /////////////////////////// // internal channels @@ -77,18 +75,16 @@ type broker struct { // through this channel so ackLoop can monitor them for acknowledgments. consumedChan chan batchList - // ackCallback is a configurable callback to invoke when ACKs are processed. - // ackLoop calls this function when it advances the consumer ACK position. - // Right now this forwards the notification to queueACKed() in - // the pipeline observer, which updates the beats registry if needed. - ackCallback func(eventCount int) - // When batches are acknowledged, ackLoop saves any metadata needed // for producer callbacks and such, then notifies runLoop that it's // safe to free these events and advance the queue by sending the // acknowledged event count to this channel. deleteChan chan int + // closingChan is closed when the queue has processed a close request. + // It's used to prevent producers from blocking on a closing queue. + closingChan chan struct{} + /////////////////////////////// // internal goroutine state @@ -112,8 +108,9 @@ type Settings struct { } type queueEntry struct { - event queue.Entry - id queue.EntryID + event queue.Entry + eventSize int + id queue.EntryID producer *ackProducer producerID producerID // The order of this entry within its producer @@ -144,11 +141,11 @@ type batchList struct { func FactoryForSettings(settings Settings) queue.QueueFactory { return func( logger *logp.Logger, - ackCallback func(eventCount int), + observer queue.Observer, inputQueueSize int, encoderFactory queue.EncoderFactory, ) (queue.Queue, error) { - return NewQueue(logger, ackCallback, settings, inputQueueSize, encoderFactory), nil + return NewQueue(logger, observer, settings, inputQueueSize, encoderFactory), nil } } @@ -157,12 +154,12 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { // workers handling incoming messages and ACKs have been shut down. func NewQueue( logger *logp.Logger, - ackCallback func(eventCount int), + observer queue.Observer, settings Settings, inputQueueSize int, encoderFactory queue.EncoderFactory, ) *broker { - b := newQueue(logger, ackCallback, settings, inputQueueSize, encoderFactory) + b := newQueue(logger, observer, settings, inputQueueSize, encoderFactory) // Start the queue workers b.wg.Add(2) @@ -184,11 +181,14 @@ func NewQueue( // when the workers are active. func newQueue( logger *logp.Logger, - ackCallback func(eventCount int), + observer queue.Observer, settings Settings, inputQueueSize int, encoderFactory queue.EncoderFactory, ) *broker { + if observer == nil { + observer = queue.NewQueueObserver(nil) + } chanSize := AdjustInputQueueSize(inputQueueSize, settings.Events) // Backwards compatibility: an old way to select synchronous queue @@ -218,29 +218,34 @@ func newQueue( encoderFactory: encoderFactory, // broker API channels - pushChan: make(chan pushRequest, chanSize), - getChan: make(chan getRequest), - metricChan: make(chan metricsRequest), + pushChan: make(chan pushRequest, chanSize), + getChan: make(chan getRequest), + closeChan: make(chan struct{}), // internal runLoop and ackLoop channels consumedChan: make(chan batchList), deleteChan: make(chan int), - - ackCallback: ackCallback, + closingChan: make(chan struct{}), } b.ctx, b.ctxCancel = context.WithCancel(context.Background()) - b.runLoop = newRunLoop(b) + b.runLoop = newRunLoop(b, observer) b.ackLoop = newACKLoop(b) + observer.MaxEvents(settings.Events) + return b } func (b *broker) Close() error { - b.ctxCancel() + b.closeChan <- struct{}{} return nil } +func (b *broker) Done() <-chan struct{} { + return b.ctx.Done() +} + func (b *broker) QueueType() string { return QueueType } @@ -276,25 +281,6 @@ func (b *broker) Get(count int) (queue.Batch, error) { return resp, nil } -func (b *broker) Metrics() (queue.Metrics, error) { - - responseChan := make(chan memQueueMetrics, 1) - select { - case <-b.ctx.Done(): - return queue.Metrics{}, io.EOF - case b.metricChan <- metricsRequest{ - responseChan: responseChan}: - } - resp := <-responseChan - - return queue.Metrics{ - EventCount: opt.UintWith(uint64(resp.currentQueueSize)), - EventLimit: opt.UintWith(uint64(len(b.buf))), - UnackedConsumedEvents: opt.UintWith(uint64(resp.occupiedRead)), - OldestEntryID: resp.oldestEntryID, - }, nil -} - var batchPool = sync.Pool{ New: func() interface{} { return &batch{ diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index 6575472edbd..0d983de6520 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -46,19 +46,3 @@ type getRequest struct { } type batchDoneMsg struct{} - -// Metrics API - -type metricsRequest struct { - responseChan chan memQueueMetrics -} - -// memQueueMetrics tracks metrics that are returned by the individual memory queue implementations -type memQueueMetrics struct { - // the size of items in the queue - currentQueueSize int - // the number of items that have been read by a consumer but not yet ack'ed - occupiedRead int - - oldestEntryID queue.EntryID -} diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index a206e357aac..0ecabfe77f0 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -35,11 +35,11 @@ type ackProducer struct { } type openState struct { - log *logp.Logger - done chan struct{} - queueDone <-chan struct{} - events chan pushRequest - encoder queue.Encoder + log *logp.Logger + done chan struct{} + queueClosing <-chan struct{} + events chan pushRequest + encoder queue.Encoder } // producerID stores the order of events within a single producer, so multiple @@ -57,11 +57,11 @@ type ackHandler func(count int) func newProducer(b *broker, cb ackHandler, encoder queue.Encoder) queue.Producer { openState := openState{ - log: b.logger, - done: make(chan struct{}), - queueDone: b.ctx.Done(), - events: b.pushChan, - encoder: encoder, + log: b.logger, + done: make(chan struct{}), + queueClosing: b.closingChan, + events: b.pushChan, + encoder: encoder, } if cb != nil { @@ -141,14 +141,14 @@ func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { select { case resp := <-req.resp: return resp, true - case <-st.queueDone: + case <-st.queueClosing: st.events = nil return 0, false } case <-st.done: st.events = nil return 0, false - case <-st.queueDone: + case <-st.queueClosing: st.events = nil return 0, false } @@ -169,7 +169,7 @@ func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) { select { case resp := <-req.resp: return resp, true - case <-st.queueDone: + case <-st.queueClosing: st.events = nil return 0, false } diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 5ebf6b6f6fb..9cd209bbd51 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -32,7 +32,6 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest" - "github.com/elastic/elastic-agent-libs/mapstr" ) var seed int64 @@ -228,69 +227,6 @@ func TestProducerClosePreservesEventCount(t *testing.T) { assert.False(t, activeEvents.Load() < 0, "active event count should never be negative") } -func TestQueueMetricsDirect(t *testing.T) { - eventsToTest := 5 - maxEvents := 10 - - // Test the directEventLoop - directSettings := Settings{ - Events: maxEvents, - MaxGetRequest: 1, - FlushTimeout: 0, - } - t.Logf("Testing directEventLoop") - queueTestWithSettings(t, directSettings, eventsToTest, "directEventLoop") - -} - -func TestQueueMetricsBuffer(t *testing.T) { - eventsToTest := 5 - maxEvents := 10 - // Test Buffered Event Loop - bufferedSettings := Settings{ - Events: maxEvents, - MaxGetRequest: eventsToTest, // The buffered event loop can only return FlushMinEvents per Get() - FlushTimeout: time.Millisecond, - } - t.Logf("Testing bufferedEventLoop") - queueTestWithSettings(t, bufferedSettings, eventsToTest, "bufferedEventLoop") -} - -func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, testName string) { - testQueue := NewQueue(nil, nil, settings, 0, nil) - defer testQueue.Close() - - // Send events to queue - producer := testQueue.Producer(queue.ProducerConfig{}) - for i := 0; i < eventsToTest; i++ { - producer.Publish(queuetest.MakeEvent(mapstr.M{"count": i})) - } - queueMetricsAreValid(t, testQueue, 5, settings.Events, 0, fmt.Sprintf("%s - First send of metrics to queue", testName)) - - // Read events, don't yet ack them - batch, err := testQueue.Get(eventsToTest) - assert.NoError(t, err, "error in Get") - t.Logf("Got batch of %d events", batch.Count()) - - queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) - - // Test metrics after ack - batch.Done() - - queueMetricsAreValid(t, testQueue, 0, settings.Events, 0, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) - -} - -func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occupied int, test string) { - // wait briefly to avoid races across all the queue channels - time.Sleep(time.Millisecond * 100) - testMetrics, err := q.Metrics() - assert.NoError(t, err, "error calling metrics for test %s", test) - assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test) - assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test) - assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test) -} - func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory { return func(_ *testing.T) queue.Queue { return NewQueue(nil, nil, Settings{ @@ -326,163 +262,3 @@ func TestAdjustInputQueueSize(t *testing.T) { assert.Equal(t, int(float64(mainQueue)*maxInputQueueSizeRatio), AdjustInputQueueSize(mainQueue, mainQueue)) }) } - -func TestEntryIDs(t *testing.T) { - entryCount := 100 - - testForward := func(q queue.Queue) { - waiter := &producerACKWaiter{} - producer := q.Producer(queue.ProducerConfig{ACK: waiter.ack}) - for i := 0; i < entryCount; i++ { - id, success := producer.Publish(nil) - assert.Equal(t, success, true, "Queue publish should succeed") - assert.Equal(t, id, queue.EntryID(i), "Entry ID should match publication order") - } - - for i := 0; i < entryCount; i++ { - batch, err := q.Get(1) - assert.NoError(t, err, "Queue read should succeed") - assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry") - - metrics, err := q.Metrics() - assert.NoError(t, err, "Queue metrics call should succeed") - assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i), - fmt.Sprintf("Oldest entry ID before ACKing event %v should be %v", i, i)) - - batch.Done() - waiter.waitForEvents(1) - metrics, err = q.Metrics() - assert.NoError(t, err, "Queue metrics call should succeed") - assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i+1), - fmt.Sprintf("Oldest entry ID after ACKing event %v should be %v", i, i+1)) - - } - } - - testBackward := func(q queue.Queue) { - waiter := &producerACKWaiter{} - producer := q.Producer(queue.ProducerConfig{ACK: waiter.ack}) - for i := 0; i < entryCount; i++ { - id, success := producer.Publish(nil) - assert.Equal(t, success, true, "Queue publish should succeed") - assert.Equal(t, id, queue.EntryID(i), "Entry ID should match publication order") - } - - batches := []queue.Batch{} - - for i := 0; i < entryCount; i++ { - batch, err := q.Get(1) - assert.NoError(t, err, "Queue read should succeed") - assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry") - batches = append(batches, batch) - } - - for i := entryCount - 1; i > 0; i-- { - batches[i].Done() - - // It's hard to remove this delay since the Done signal is propagated - // asynchronously to the queue, and since this test is ensuring that the - // queue _doesn't_ advance we can't use a callback to gate the comparison - // like we do in testForward. However: - // - While this race condition could sometimes let a buggy implementation - // pass, it will not produce a false failure (so it won't contribute - // to general test flakiness) - // - That notwithstanding, when the ACK _does_ cause an incorrect - // metrics update, this delay is enough to recognize it approximately - // 100% of the time, so this test is still a good signal despite - // the slight nondeterminism. - time.Sleep(1 * time.Millisecond) - metrics, err := q.Metrics() - assert.NoError(t, err, "Queue metrics call should succeed") - assert.Equal(t, metrics.OldestEntryID, queue.EntryID(0), - fmt.Sprintf("Oldest entry ID after ACKing event %v should be 0", i)) - } - // ACK the first batch, which should unblock all the later ones - batches[0].Done() - waiter.waitForEvents(100) - metrics, err := q.Metrics() - assert.NoError(t, err, "Queue metrics call should succeed") - assert.Equal(t, metrics.OldestEntryID, queue.EntryID(100), - fmt.Sprintf("Oldest entry ID after ACKing event 0 should be %v", queue.EntryID(entryCount))) - - } - - t.Run("acking in forward order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0, nil) - testForward(testQueue) - }) - - t.Run("acking in reverse order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0, nil) - testBackward(testQueue) - }) - - t.Run("acking in forward order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0, nil) - testForward(testQueue) - }) - - t.Run("acking in reverse order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0, nil) - testBackward(testQueue) - }) -} - -// producerACKWaiter is a helper that can listen to queue producer callbacks -// and wait on them from the test thread, so we can test the queue's asynchronous -// behavior without relying on time.Sleep. -type producerACKWaiter struct { - sync.Mutex - - // The number of acks received from a producer callback. - acked int - - // The number of acks that callers have waited for in waitForEvents. - waited int - - // When non-nil, this channel is being listened to by a test thread - // blocking on ACKs, and incoming producer callbacks are forwarded - // to it. - ackChan chan int -} - -func (w *producerACKWaiter) ack(count int) { - w.Lock() - defer w.Unlock() - w.acked += count - if w.ackChan != nil { - w.ackChan <- count - } -} - -func (w *producerACKWaiter) waitForEvents(count int) { - w.Lock() - defer w.Unlock() - if w.ackChan != nil { - panic("don't call producerACKWaiter.waitForEvents from multiple goroutines") - } - - avail := w.acked - w.waited - if count <= avail { - w.waited += count - return - } - w.waited = w.acked - count -= avail - // We have advanced as far as we can, we have to wait for - // more incoming ACKs. - // Set a listener and unlock, so ACKs can come in on another - // goroutine. - w.ackChan = make(chan int) - w.Unlock() - - newAcked := 0 - for newAcked < count { - newAcked += <-w.ackChan - } - // When we're done, turn off the listener channel and update - // the number of events waited on. - w.Lock() - w.ackChan = nil - w.waited += count -} diff --git a/libbeat/publisher/queue/memqueue/runloop.go b/libbeat/publisher/queue/memqueue/runloop.go index ed14106f20c..397b41a25e8 100644 --- a/libbeat/publisher/queue/memqueue/runloop.go +++ b/libbeat/publisher/queue/memqueue/runloop.go @@ -29,6 +29,9 @@ import ( type runLoop struct { broker *broker + // observer is a metrics observer used to report internal queue state. + observer queue.Observer + // The index of the beginning of the current ring buffer within its backing // array. If the queue isn't empty, bufPos points to the oldest remaining // event. @@ -57,13 +60,18 @@ type runLoop struct { // It is active if and only if pendingGetRequest is non-nil. getTimer *time.Timer + // closing is set when a close request is received. Once closing is true, + // the queue will not accept any new events, but will continue responding + // to Gets and Acks to allow pending events to complete on shutdown. + closing bool + // TODO (https://github.com/elastic/beats/issues/37893): entry IDs were a // workaround for an external project that no longer exists. At this point // they just complicate the API and should be removed. nextEntryID queue.EntryID } -func newRunLoop(broker *broker) *runLoop { +func newRunLoop(broker *broker, observer queue.Observer) *runLoop { var timer *time.Timer // Create the timer we'll use for get requests, but stop it until a @@ -76,6 +84,7 @@ func newRunLoop(broker *broker) *runLoop { } return &runLoop{ broker: broker, + observer: observer, getTimer: timer, } } @@ -90,8 +99,8 @@ func (l *runLoop) run() { // standalone helper function to allow testing of loop invariants. func (l *runLoop) runIteration() { var pushChan chan pushRequest - // Push requests are enabled if the queue isn't yet full. - if l.eventCount < len(l.broker.buf) { + // Push requests are enabled if the queue isn't full or closing. + if l.eventCount < len(l.broker.buf) && !l.closing { pushChan = l.broker.pushChan } @@ -116,7 +125,14 @@ func (l *runLoop) runIteration() { } select { + case <-l.broker.closeChan: + l.closing = true + close(l.broker.closingChan) + // Get requests are handled immediately during shutdown + l.maybeUnblockGetRequest() + case <-l.broker.ctx.Done(): + // The queue is fully shut down, do nothing return case req := <-pushChan: // producer pushing new event @@ -133,9 +149,6 @@ func (l *runLoop) runIteration() { case count := <-l.broker.deleteChan: l.handleDelete(count) - case req := <-l.broker.metricChan: // asking broker for queue metrics - l.handleMetricsRequest(&req) - case <-timeoutChan: // The get timer has expired, handle the blocked request l.getTimer.Stop() @@ -157,8 +170,8 @@ func (l *runLoop) handleGetRequest(req *getRequest) { } func (l *runLoop) getRequestShouldBlock(req *getRequest) bool { - if l.broker.settings.FlushTimeout <= 0 { - // Never block if the flush timeout isn't positive + if l.broker.settings.FlushTimeout <= 0 || l.closing { + // Never block if the flush timeout isn't positive, or during shutdown return false } eventsAvailable := l.eventCount - l.consumedCount @@ -177,18 +190,34 @@ func (l *runLoop) handleGetReply(req *getRequest) { startIndex := l.bufPos + l.consumedCount batch := newBatch(l.broker, startIndex, batchSize) + batchBytes := 0 + for i := 0; i < batchSize; i++ { + batchBytes += batch.rawEntry(i).eventSize + } + // Send the batch to the caller and update internal state req.responseChan <- batch l.consumedBatches.append(batch) l.consumedCount += batchSize + l.observer.ConsumeEvents(batchSize, batchBytes) } func (l *runLoop) handleDelete(count int) { + byteCount := 0 + for i := 0; i < count; i++ { + entry := l.broker.buf[(l.bufPos+i)%len(l.broker.buf)] + byteCount += entry.eventSize + } // Advance position and counters. Event data was already cleared in // batch.FreeEntries when the events were vended. l.bufPos = (l.bufPos + count) % len(l.broker.buf) l.eventCount -= count l.consumedCount -= count + l.observer.RemoveEvents(count, byteCount) + if l.closing && l.eventCount == 0 { + // Our last events were acknowledged during shutdown, signal final shutdown + l.broker.ctxCancel() + } } func (l *runLoop) handleInsert(req *pushRequest) { @@ -208,8 +237,7 @@ func (l *runLoop) maybeUnblockGetRequest() { // If a get request is blocked waiting for more events, check if // we should unblock it. if getRequest := l.pendingGetRequest; getRequest != nil { - available := l.eventCount - l.consumedCount - if available >= getRequest.entryCount { + if !l.getRequestShouldBlock(getRequest) { l.pendingGetRequest = nil if !l.getTimer.Stop() { <-l.getTimer.C @@ -223,22 +251,10 @@ func (l *runLoop) insert(req *pushRequest, id queue.EntryID) { index := (l.bufPos + l.eventCount) % len(l.broker.buf) l.broker.buf[index] = queueEntry{ event: req.event, + eventSize: req.eventSize, id: id, producer: req.producer, producerID: req.producerID, } -} - -func (l *runLoop) handleMetricsRequest(req *metricsRequest) { - oldestEntryID := l.nextEntryID - if l.eventCount > 0 { - index := l.bufPos % len(l.broker.buf) - oldestEntryID = l.broker.buf[index].id - } - - req.responseChan <- memQueueMetrics{ - currentQueueSize: l.eventCount, - occupiedRead: l.consumedCount, - oldestEntryID: oldestEntryID, - } + l.observer.AddEvent(req.eventSize) } diff --git a/libbeat/publisher/queue/memqueue/runloop_test.go b/libbeat/publisher/queue/memqueue/runloop_test.go index 266704fc1fd..f6c83e8fec0 100644 --- a/libbeat/publisher/queue/memqueue/runloop_test.go +++ b/libbeat/publisher/queue/memqueue/runloop_test.go @@ -18,13 +18,17 @@ package memqueue import ( + "context" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" ) func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) { @@ -112,3 +116,88 @@ func TestFlushSettingsBlockPartialBatches(t *testing.T) { assert.Nil(t, rl.pendingGetRequest, "Queue should have no pending get request since adding an event should unblock the previous one") assert.Equal(t, 101, rl.consumedCount, "Queue should have a consumedCount of 101 after adding an event unblocked the pending get request") } + +func TestObserverAddEvent(t *testing.T) { + // Confirm that an entry inserted into the queue is reported in + // queue.added.events and queue.added.bytes. + reg := monitoring.NewRegistry() + rl := &runLoop{ + observer: queue.NewQueueObserver(reg), + broker: &broker{ + buf: make([]queueEntry, 100), + }, + } + request := &pushRequest{ + event: publisher.Event{}, + eventSize: 123, + } + rl.insert(request, 0) + assertRegistryUint(t, reg, "queue.added.events", 1, "Queue insert should report added event") + assertRegistryUint(t, reg, "queue.added.bytes", 123, "Queue insert should report added bytes") +} + +func TestObserverConsumeEvents(t *testing.T) { + // Confirm that event batches sent to the output are reported in + // queue.consumed.events and queue.consumed.bytes. + reg := monitoring.NewRegistry() + rl := &runLoop{ + observer: queue.NewQueueObserver(reg), + broker: &broker{ + buf: make([]queueEntry, 100), + }, + eventCount: 50, + } + // Initialize the queue entries to a test byte size + for i := range rl.broker.buf { + rl.broker.buf[i].eventSize = 123 + } + request := &getRequest{ + entryCount: len(rl.broker.buf), + responseChan: make(chan *batch, 1), + } + rl.handleGetReply(request) + // We should have gotten back 50 events, everything in the queue, so we expect the size + // to be 50 * 123. + assertRegistryUint(t, reg, "queue.consumed.events", 50, "Sending a batch to a Get caller should report the consumed events") + assertRegistryUint(t, reg, "queue.consumed.bytes", 50*123, "Sending a batch to a Get caller should report the consumed bytes") +} + +func TestObserverRemoveEvents(t *testing.T) { + reg := monitoring.NewRegistry() + rl := &runLoop{ + observer: queue.NewQueueObserver(reg), + broker: &broker{ + ctx: context.Background(), + buf: make([]queueEntry, 100), + deleteChan: make(chan int, 1), + }, + eventCount: 50, + } + // Initialize the queue entries to a test byte size + for i := range rl.broker.buf { + rl.broker.buf[i].eventSize = 123 + } + const deleteCount = 25 + rl.broker.deleteChan <- deleteCount + // Run one iteration of the run loop, so it can handle the delete request + rl.runIteration() + // It should have deleted 25 events, so we expect the size to be 25 * 123. + assertRegistryUint(t, reg, "queue.removed.events", deleteCount, "Deleting from the queue should report the removed events") + assertRegistryUint(t, reg, "queue.removed.bytes", deleteCount*123, "Deleting from the queue should report the removed bytes") +} + +func assertRegistryUint(t *testing.T, reg *monitoring.Registry, key string, expected uint64, message string) { + t.Helper() + + entry := reg.Get(key) + if entry == nil { + assert.Failf(t, message, "registry key '%v' doesn't exist", key) + return + } + value, ok := reg.Get(key).(*monitoring.Uint) + if !ok { + assert.Failf(t, message, "registry key '%v' doesn't refer to a uint64", key) + return + } + assert.Equal(t, expected, value.Get(), message) +} diff --git a/libbeat/publisher/queue/monitoring.go b/libbeat/publisher/queue/monitoring.go new file mode 100644 index 00000000000..5061d3f5600 --- /dev/null +++ b/libbeat/publisher/queue/monitoring.go @@ -0,0 +1,153 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package queue + +import ( + "github.com/elastic/elastic-agent-libs/monitoring" +) + +// Observer is an interface for queues to send state updates to a metrics +// or test listener. +type Observer interface { + MaxEvents(int) + MaxBytes(int) + + // Restore queue state on startup. Used by the disk queue to report events + // that are already in the queue from a previous run. + Restore(eventCount int, byteCount int) + + // All reported byte counts are zero if the output doesn't support + // early encoding. + AddEvent(byteCount int) + ConsumeEvents(eventCount int, byteCount int) + RemoveEvents(eventCount int, byteCount int) +} + +type queueObserver struct { + maxEvents *monitoring.Uint // gauge + maxBytes *monitoring.Uint // gauge + + addedEvents *monitoring.Uint + addedBytes *monitoring.Uint + consumedEvents *monitoring.Uint + consumedBytes *monitoring.Uint + removedEvents *monitoring.Uint + removedBytes *monitoring.Uint + + filledEvents *monitoring.Uint // gauge + filledBytes *monitoring.Uint // gauge + filledPct *monitoring.Float // gauge + + // backwards compatibility: the metric "acked" is the old name for + // "removed.events". Ideally we would like to define an alias in the + // monitoring API, but until that's possible we shadow it with this + // extra variable and make sure to always change removedEvents and + // acked at the same time. + acked *monitoring.Uint +} + +type nilObserver struct{} + +// Creates queue metrics in the given registry under the path "pipeline.queue". +func NewQueueObserver(metrics *monitoring.Registry) Observer { + if metrics == nil { + return nilObserver{} + } + queueMetrics := metrics.GetRegistry("queue") + if queueMetrics != nil { + err := queueMetrics.Clear() + if err != nil { + return nilObserver{} + } + } else { + queueMetrics = metrics.NewRegistry("queue") + } + + ob := &queueObserver{ + maxEvents: monitoring.NewUint(queueMetrics, "max_events"), // gauge + maxBytes: monitoring.NewUint(queueMetrics, "max_bytes"), // gauge + + addedEvents: monitoring.NewUint(queueMetrics, "added.events"), + addedBytes: monitoring.NewUint(queueMetrics, "added.bytes"), + consumedEvents: monitoring.NewUint(queueMetrics, "consumed.events"), + consumedBytes: monitoring.NewUint(queueMetrics, "consumed.bytes"), + removedEvents: monitoring.NewUint(queueMetrics, "removed.events"), + removedBytes: monitoring.NewUint(queueMetrics, "removed.bytes"), + + filledEvents: monitoring.NewUint(queueMetrics, "filled.events"), // gauge + filledBytes: monitoring.NewUint(queueMetrics, "filled.bytes"), // gauge + filledPct: monitoring.NewFloat(queueMetrics, "filled.pct"), // gauge + + // backwards compatibility: "acked" is an alias for "removed.events". + acked: monitoring.NewUint(queueMetrics, "acked"), + } + return ob +} + +func (ob *queueObserver) MaxEvents(value int) { + ob.maxEvents.Set(uint64(value)) +} + +func (ob *queueObserver) MaxBytes(value int) { + ob.maxBytes.Set(uint64(value)) +} + +func (ob *queueObserver) Restore(eventCount int, byteCount int) { + ob.filledEvents.Set(uint64(eventCount)) + ob.filledBytes.Set(uint64(byteCount)) + ob.updateFilledPct() +} + +func (ob *queueObserver) AddEvent(byteCount int) { + ob.addedEvents.Inc() + ob.addedBytes.Add(uint64(byteCount)) + + ob.filledEvents.Inc() + ob.filledBytes.Add(uint64(byteCount)) + ob.updateFilledPct() +} + +func (ob *queueObserver) ConsumeEvents(eventCount int, byteCount int) { + ob.consumedEvents.Add(uint64(eventCount)) + ob.consumedBytes.Add(uint64(byteCount)) +} + +func (ob *queueObserver) RemoveEvents(eventCount int, byteCount int) { + ob.removedEvents.Add(uint64(eventCount)) + ob.acked.Add(uint64(eventCount)) + ob.removedBytes.Add(uint64(byteCount)) + + ob.filledEvents.Sub(uint64(eventCount)) + ob.filledBytes.Sub(uint64(byteCount)) + ob.updateFilledPct() +} + +func (ob *queueObserver) updateFilledPct() { + if maxBytes := ob.maxBytes.Get(); maxBytes > 0 { + ob.filledPct.Set(float64(ob.filledBytes.Get()) / float64(maxBytes)) + } else { + ob.filledPct.Set(float64(ob.filledEvents.Get()) / float64(ob.maxEvents.Get())) + } +} + +func (nilObserver) MaxEvents(_ int) {} +func (nilObserver) MaxBytes(_ int) {} +func (nilObserver) Restore(_ int, _ int) {} +func (nilObserver) AddEvent(_ int) {} +func (nilObserver) ConsumeEvents(_ int, _ int) {} +func (nilObserver) RemoveEvents(_ int, _ int) {} diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 9c186ad30d0..075d7ad66a4 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -18,11 +18,7 @@ package queue import ( - "errors" - - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/opt" ) // Entry is a placeholder type for the objects contained by the queue, which @@ -31,31 +27,6 @@ import ( // and reduces accidental type mismatches. type Entry interface{} -// Metrics is a set of basic-user friendly metrics that report the current state of the queue. These metrics are meant to be relatively generic and high-level, and when reported directly, can be comprehensible to a user. -type Metrics struct { - //EventCount is the total events currently in the queue - EventCount opt.Uint - //ByteCount is the total byte size of the queue - ByteCount opt.Uint - //ByteLimit is the user-configured byte limit of the queue - ByteLimit opt.Uint - //EventLimit is the user-configured event limit of the queue - EventLimit opt.Uint - - //UnackedConsumedEvents is the count of events that an output consumer has read, but not yet ack'ed - UnackedConsumedEvents opt.Uint - - //OldestActiveTimestamp is the timestamp of the oldest item in the queue. - OldestActiveTimestamp common.Time - - // OldestActiveID is ID of the oldest unacknowledged event in the queue, or - // the next ID that will be assigned if the queue is empty. - OldestEntryID EntryID -} - -// ErrMetricsNotImplemented is a hopefully temporary type to mark queue metrics as not yet implemented -var ErrMetricsNotImplemented = errors.New("Queue metrics not implemented") - // Queue is responsible for accepting, forwarding and ACKing events. // A queue will receive and buffer single events from its producers. // Consumers will receive events in batches from the queues buffers. @@ -66,8 +37,14 @@ var ErrMetricsNotImplemented = errors.New("Queue metrics not implemented") // consumer or flush to some other intermediate storage), it will send an ACK signal // with the number of ACKed events to the Producer (ACK happens in batches). type Queue interface { + // Close signals the queue to shut down, but it may keep handling requests + // and acknowledgments for events that are already in progress. Close() error + // Done returns a channel that unblocks when the queue is closed and all + // its events are persisted or acknowledged. + Done() <-chan struct{} + QueueType() string BufferConfig() BufferConfig @@ -76,15 +53,13 @@ type Queue interface { // Get retrieves a batch of up to eventCount events. If eventCount <= 0, // there is no bound on the number of returned events. Get(eventCount int) (Batch, error) - - Metrics() (Metrics, error) } // If encoderFactory is provided, then the resulting queue must use it to // encode queued events before returning them. type QueueFactory func( logger *logp.Logger, - ack func(eventCount int), + observer Observer, inputQueueSize int, encoderFactory EncoderFactory, ) (Queue, error)