diff --git a/filebeat/input/journald/pkg/journalctl/reader.go b/filebeat/input/journald/pkg/journalctl/reader.go index efc01a0b2f13..25b90d9a490f 100644 --- a/filebeat/input/journald/pkg/journalctl/reader.go +++ b/filebeat/input/journald/pkg/journalctl/reader.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield" input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/common/backoff" "github.com/elastic/elastic-agent-libs/logp" ) @@ -81,6 +82,8 @@ type Reader struct { jctl Jctl jctlFactory JctlFactory + + backoff backoff.Backoff } // handleSeekAndCursor returns the correct arguments for seek and cursor. @@ -177,6 +180,7 @@ func New( logger: logger, canceler: canceler, jctlFactory: newJctl, + backoff: backoff.NewExpBackoff(canceler.Done(), 100*time.Millisecond, 2*time.Second), } return &r, nil @@ -220,7 +224,17 @@ func (r *Reader) Next(cancel input.Canceler) (JournalEntry, error) { args = append(args, "--after-cursor", r.cursor) } - // TODO (belimawr): Restart with exponential backoff + // If the last restart (if any) was more than 5s ago, + // recreate the backoff and do not wait. + // We recreate the backoff so r.backoff.Last().IsZero() + // will return true next time it's called making us to + // wait in case jouranlctl crashes in less than 5s. + if !r.backoff.Last().IsZero() && time.Now().Sub(r.backoff.Last()) > 5*time.Second { + r.backoff = backoff.NewExpBackoff(cancel.Done(), 100*time.Millisecond, 2*time.Second) + } else { + r.backoff.Wait() + } + jctl, err := r.jctlFactory(r.canceler, r.logger.Named("journalctl-runner"), "journalctl", args...) if err != nil { // If we cannot restart journalct, there is nothing we can do. diff --git a/filebeat/input/mqtt/client_mocked.go b/filebeat/input/mqtt/client_mocked.go index 79fd0be50cf9..fcab2b800535 100644 --- a/filebeat/input/mqtt/client_mocked.go +++ b/filebeat/input/mqtt/client_mocked.go @@ -72,6 +72,7 @@ type mockedBackoff struct { waits []bool waitIndex int + last time.Time } var _ backoff.Backoff = new(mockedBackoff) @@ -79,6 +80,7 @@ var _ backoff.Backoff = new(mockedBackoff) func (m *mockedBackoff) Wait() bool { wait := m.waits[m.waitIndex] m.waitIndex++ + m.last = time.Now() return wait } @@ -86,6 +88,10 @@ func (m *mockedBackoff) Reset() { m.resetCount++ } +func (m *mockedBackoff) Last() time.Time { + return m.last +} + type mockedToken struct { timeout bool } diff --git a/filebeat/tests/mockjournalctl/main.go b/filebeat/tests/mockjournalctl/main.go new file mode 100644 index 000000000000..ae1b04a8f9d2 --- /dev/null +++ b/filebeat/tests/mockjournalctl/main.go @@ -0,0 +1,99 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "encoding/json" + "fmt" + "math/rand/v2" + "os" + "time" +) + +// This simple mock for journalclt that can be used to test error conditions. +// If a file called 'exit' exists in the same folder as the Filebeat binary +// then this mock will exit immediately, otherwise it will generate errors +// randomly and eventually exit. +// +// The easiest way to use this mock is to compile it as 'journalctl' and +// manipulate the $PATH environment variable from the Filebeat process you're +// testing. +func main() { + if _, err := os.Stat("exit"); err == nil { + os.Exit(42) + } + + fatalChance := 10 + stdoutTicker := time.NewTicker(time.Second) + stderrTicker := time.NewTicker(time.Second) + fatalTicker := time.NewTicker(time.Second) + + jsonEncoder := json.NewEncoder(os.Stdout) + count := uint64(0) + for { + count++ + select { + case t := <-stdoutTicker.C: + mockData["MESSAGE"] = fmt.Sprintf("Count: %010d", count) + mockData["__CURSOR"] = fmt.Sprintf("cursor-%010d-now-%s", count, time.Now().Format(time.RFC3339Nano)) + mockData["__REALTIME_TIMESTAMP"] = fmt.Sprintf("%d", t.UnixMicro()) + jsonEncoder.Encode(mockData) + case t := <-stderrTicker.C: + fmt.Fprintf(os.Stderr, "a random error at %s, count: %010d\n", t.Format(time.RFC3339), count) + case t := <-fatalTicker.C: + chance := rand.IntN(100) + if chance < fatalChance { + fmt.Fprintf(os.Stderr, "fatal error, exiting at %s\n", t.Format(time.RFC3339)) + os.Exit(rand.IntN(125)) + } + } + } +} + +var mockData = map[string]string{ + "MESSAGE": "Count: 0000000001", + "PRIORITY": "6", + "SYSLOG_IDENTIFIER": "TestRestartsJournalctlOnError", + "_AUDIT_LOGINUID": "1000", + "_AUDIT_SESSION": "2", + "_BOOT_ID": "567980bb85ae41da8518f409570b0cb9", + "_CAP_EFFECTIVE": "0", + "_CMDLINE": "/bin/cat", + "_COMM": "cat", + "_EXE": "/usr/bin/cat", + "_GID": "1000", + "_HOSTNAME": "millennium-falcon", + "_MACHINE_ID": "851f339d77174301b29e417ecb2ec6a8", + "_PID": "235728", + "_RUNTIME_SCOPE": "system", + "_STREAM_ID": "92765bf7ba214e23a2ee986d76578bef", + "_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/session-2.scope", + "_SYSTEMD_INVOCATION_ID": "89e7dffc4a0140f086a3171235fae8d9", + "_SYSTEMD_OWNER_UID": "1000", + "_SYSTEMD_SESSION": "2", + "_SYSTEMD_SLICE": "user-1000.slice", + "_SYSTEMD_UNIT": "session-2.scope", + "_SYSTEMD_USER_SLICE": "-.slice", + "_TRANSPORT": "stdout", + "_UID": "1000", + "__CURSOR": "s=e82795fad4ce42b79fb3da0866d91f7e;i=4eb1b1;b=567980bb85ae41da8518f409570b0cb9;m=2bd4e2166;t=6200adaf0a66a;x=d9b1ac66921eaac9", + "__MONOTONIC_TIMESTAMP": "11765948774", + "__REALTIME_TIMESTAMP": "1724080855230058", + "__SEQNUM": "5157297", + "__SEQNUM_ID": "e82795fad4ce42b79fb3da0866d91f7e", +} diff --git a/libbeat/common/backoff/backoff.go b/libbeat/common/backoff/backoff.go index 5439e38cfbb2..6128d1d713f4 100644 --- a/libbeat/common/backoff/backoff.go +++ b/libbeat/common/backoff/backoff.go @@ -17,6 +17,8 @@ package backoff +import "time" + // Backoff defines the interface for backoff strategies. type Backoff interface { // Wait blocks for a duration of time governed by the backoff strategy. @@ -24,6 +26,9 @@ type Backoff interface { // Reset resets the backoff duration to an initial value governed by the backoff strategy. Reset() + + Last() time.Time + // Last returns the time when the last call to Wait returned } // WaitOnError is a convenience method, if an error is received it will block, if not errors is diff --git a/libbeat/common/backoff/equal_jitter.go b/libbeat/common/backoff/equal_jitter.go index d5b5c7d250c8..77a0f0642c62 100644 --- a/libbeat/common/backoff/equal_jitter.go +++ b/libbeat/common/backoff/equal_jitter.go @@ -71,3 +71,8 @@ func (b *EqualJitterBackoff) Wait() bool { return true } } + +// Last returns the time when the last call to Wait returned +func (b *EqualJitterBackoff) Last() time.Time { + return b.last +} diff --git a/libbeat/common/backoff/exponential.go b/libbeat/common/backoff/exponential.go index c7211480cd14..ab4236b8185d 100644 --- a/libbeat/common/backoff/exponential.go +++ b/libbeat/common/backoff/exponential.go @@ -65,3 +65,8 @@ func (b *ExpBackoff) Wait() bool { return true } } + +// Last returns the time when the last call to Wait returned +func (b *ExpBackoff) Last() time.Time { + return b.last +}