Skip to content

Commit

Permalink
Add a backoff wait when restarting journalctl
Browse files Browse the repository at this point in the history
When restarting journalclt, a exponential backoff will be used if the
last restart was less than 5s ago. In the unlikely case jouranlctl
will crash right after being installed, the exponential backoff will
make Filebeat restart journalctl at most once every two seconds.
  • Loading branch information
belimawr committed Sep 10, 2024
1 parent 9b9bc8f commit 3a5aaec
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 1 deletion.
16 changes: 15 additions & 1 deletion filebeat/input/journald/pkg/journalctl/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/elastic-agent-libs/logp"
)

Expand Down Expand Up @@ -81,6 +82,8 @@ type Reader struct {

jctl Jctl
jctlFactory JctlFactory

backoff backoff.Backoff
}

// handleSeekAndCursor returns the correct arguments for seek and cursor.
Expand Down Expand Up @@ -177,6 +180,7 @@ func New(
logger: logger,
canceler: canceler,
jctlFactory: newJctl,
backoff: backoff.NewExpBackoff(canceler.Done(), 100*time.Millisecond, 2*time.Second),
}

return &r, nil
Expand Down Expand Up @@ -220,7 +224,17 @@ func (r *Reader) Next(cancel input.Canceler) (JournalEntry, error) {
args = append(args, "--after-cursor", r.cursor)
}

// TODO (belimawr): Restart with exponential backoff
// If the last restart (if any) was more than 5s ago,
// recreate the backoff and do not wait.
// We recreate the backoff so r.backoff.Last().IsZero()
// will return true next time it's called making us to
// wait in case jouranlctl crashes in less than 5s.
if !r.backoff.Last().IsZero() && time.Now().Sub(r.backoff.Last()) > 5*time.Second {
r.backoff = backoff.NewExpBackoff(cancel.Done(), 100*time.Millisecond, 2*time.Second)
} else {
r.backoff.Wait()
}

jctl, err := r.jctlFactory(r.canceler, r.logger.Named("journalctl-runner"), "journalctl", args...)
if err != nil {
// If we cannot restart journalct, there is nothing we can do.
Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/mqtt/client_mocked.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,26 @@ type mockedBackoff struct {

waits []bool
waitIndex int
last time.Time
}

var _ backoff.Backoff = new(mockedBackoff)

func (m *mockedBackoff) Wait() bool {
wait := m.waits[m.waitIndex]
m.waitIndex++
m.last = time.Now()
return wait
}

func (m *mockedBackoff) Reset() {
m.resetCount++
}

func (m *mockedBackoff) Last() time.Time {
return m.last
}

type mockedToken struct {
timeout bool
}
Expand Down
99 changes: 99 additions & 0 deletions filebeat/tests/mockjournalctl/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package main

import (
"encoding/json"
"fmt"
"math/rand/v2"
"os"
"time"
)

// This simple mock for journalclt that can be used to test error conditions.
// If a file called 'exit' exists in the same folder as the Filebeat binary
// then this mock will exit immediately, otherwise it will generate errors
// randomly and eventually exit.
//
// The easiest way to use this mock is to compile it as 'journalctl' and
// manipulate the $PATH environment variable from the Filebeat process you're
// testing.
func main() {
if _, err := os.Stat("exit"); err == nil {
os.Exit(42)
}

fatalChance := 10
stdoutTicker := time.NewTicker(time.Second)
stderrTicker := time.NewTicker(time.Second)
fatalTicker := time.NewTicker(time.Second)

jsonEncoder := json.NewEncoder(os.Stdout)
count := uint64(0)
for {
count++
select {
case t := <-stdoutTicker.C:
mockData["MESSAGE"] = fmt.Sprintf("Count: %010d", count)
mockData["__CURSOR"] = fmt.Sprintf("cursor-%010d-now-%s", count, time.Now().Format(time.RFC3339Nano))
mockData["__REALTIME_TIMESTAMP"] = fmt.Sprintf("%d", t.UnixMicro())
jsonEncoder.Encode(mockData)
case t := <-stderrTicker.C:
fmt.Fprintf(os.Stderr, "a random error at %s, count: %010d\n", t.Format(time.RFC3339), count)
case t := <-fatalTicker.C:
chance := rand.IntN(100)
if chance < fatalChance {
fmt.Fprintf(os.Stderr, "fatal error, exiting at %s\n", t.Format(time.RFC3339))
os.Exit(rand.IntN(125))
}
}
}
}

var mockData = map[string]string{
"MESSAGE": "Count: 0000000001",
"PRIORITY": "6",
"SYSLOG_IDENTIFIER": "TestRestartsJournalctlOnError",
"_AUDIT_LOGINUID": "1000",
"_AUDIT_SESSION": "2",
"_BOOT_ID": "567980bb85ae41da8518f409570b0cb9",
"_CAP_EFFECTIVE": "0",
"_CMDLINE": "/bin/cat",
"_COMM": "cat",
"_EXE": "/usr/bin/cat",
"_GID": "1000",
"_HOSTNAME": "millennium-falcon",
"_MACHINE_ID": "851f339d77174301b29e417ecb2ec6a8",
"_PID": "235728",
"_RUNTIME_SCOPE": "system",
"_STREAM_ID": "92765bf7ba214e23a2ee986d76578bef",
"_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/session-2.scope",
"_SYSTEMD_INVOCATION_ID": "89e7dffc4a0140f086a3171235fae8d9",
"_SYSTEMD_OWNER_UID": "1000",
"_SYSTEMD_SESSION": "2",
"_SYSTEMD_SLICE": "user-1000.slice",
"_SYSTEMD_UNIT": "session-2.scope",
"_SYSTEMD_USER_SLICE": "-.slice",
"_TRANSPORT": "stdout",
"_UID": "1000",
"__CURSOR": "s=e82795fad4ce42b79fb3da0866d91f7e;i=4eb1b1;b=567980bb85ae41da8518f409570b0cb9;m=2bd4e2166;t=6200adaf0a66a;x=d9b1ac66921eaac9",
"__MONOTONIC_TIMESTAMP": "11765948774",
"__REALTIME_TIMESTAMP": "1724080855230058",
"__SEQNUM": "5157297",
"__SEQNUM_ID": "e82795fad4ce42b79fb3da0866d91f7e",
}
5 changes: 5 additions & 0 deletions libbeat/common/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

package backoff

import "time"

// Backoff defines the interface for backoff strategies.
type Backoff interface {
// Wait blocks for a duration of time governed by the backoff strategy.
Wait() bool

// Reset resets the backoff duration to an initial value governed by the backoff strategy.
Reset()

Last() time.Time
// Last returns the time when the last call to Wait returned
}

// WaitOnError is a convenience method, if an error is received it will block, if not errors is
Expand Down
5 changes: 5 additions & 0 deletions libbeat/common/backoff/equal_jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,8 @@ func (b *EqualJitterBackoff) Wait() bool {
return true
}
}

// Last returns the time when the last call to Wait returned
func (b *EqualJitterBackoff) Last() time.Time {
return b.last
}
5 changes: 5 additions & 0 deletions libbeat/common/backoff/exponential.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,8 @@ func (b *ExpBackoff) Wait() bool {
return true
}
}

// Last returns the time when the last call to Wait returned
func (b *ExpBackoff) Last() time.Time {
return b.last
}

0 comments on commit 3a5aaec

Please sign in to comment.