diff --git a/filebeat/docs/inputs/input-journald.asciidoc b/filebeat/docs/inputs/input-journald.asciidoc index bbc4211b0c5..1eb161deef7 100644 --- a/filebeat/docs/inputs/input-journald.asciidoc +++ b/filebeat/docs/inputs/input-journald.asciidoc @@ -122,10 +122,8 @@ The position to start reading the journal from. Valid settings are: * `head`: Starts reading at the beginning of the journal. After a restart, {beatname_uc} resends all log messages in the journal. -* `tail`: Starts reading at the end of the journal. After a restart, -{beatname_uc} resends the last message, which might result in duplicates. If -multiple log messages are written to a journal while {beatname_uc} is down, -only the last log message is sent on restart. +* `tail`: Starts reading at the end of the journal. This means that no events +will be sent until a new message is written. * `cursor`: On first read, starts reading at the beginning of the journal. After a reload or restart, continues reading at the last known position. @@ -133,6 +131,13 @@ If you have old log files and want to skip lines, start {beatname_uc} with `seek: tail` specified. Then stop {beatname_uc}, set `seek: cursor`, and restart {beatname_uc}. +[float] +[id="{beatname_lc}-input-{type}-cursor_seek_fallback"] +==== `cursor_seek_fallback` + +The position to start reading the journal from if no cursor information is +available. Valid options are `head` and `tail`. + [float] [id="{beatname_lc}-input-{type}-units"] ==== `units` diff --git a/filebeat/input/journald/input_filtering_test.go b/filebeat/input/journald/input_filtering_test.go index 625104a491c..7724c6285fa 100644 --- a/filebeat/input/journald/input_filtering_test.go +++ b/filebeat/input/journald/input_filtering_test.go @@ -215,3 +215,79 @@ func TestInputIncludeMatches(t *testing.T) { }) } } + +// TestInputSeek test the output of various seek modes while reading +// from input-multiline-parser.journal. +func TestInputSeek(t *testing.T) { + tests := map[string]struct { + config mapstr.M + expectedMessages []string + }{ + "seek head": { + config: map[string]any{ + "seek": "head", + }, + expectedMessages: []string{ + "pam_unix(sudo:session): session closed for user root", + "Started Outputs some log lines.", + "1st line", + "2nd line", + "3rd line", + "4th line", + "5th line", + "6th line", + }, + }, + "seek tail": { + config: map[string]any{ + "seek": "tail", + }, + expectedMessages: nil, // No messages are expected for seek=tail. + }, + "seek cursor": { + config: map[string]any{ + "seek": "cursor", + }, + expectedMessages: []string{ + "pam_unix(sudo:session): session closed for user root", + "Started Outputs some log lines.", + "1st line", + "2nd line", + "3rd line", + "4th line", + "5th line", + "6th line", + }, + }, + "seek cursor fallback": { + config: map[string]any{ + "seek": "cursor", + "cursor_seek_fallback": "tail", + }, + expectedMessages: nil, // No messages are expected because it will fall back to seek=tail. + }, + } + + for name, testCase := range tests { + t.Run(name, func(t *testing.T) { + env := newInputTestingEnvironment(t) + conf := mapstr.M{ + "paths": []string{path.Join("testdata", "input-multiline-parser.journal")}, + } + conf.DeepUpdate(testCase.config) + inp := env.mustCreateInput(conf) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + defer cancelInput() + + env.waitUntilEventCount(len(testCase.expectedMessages)) + + for idx, event := range env.pipeline.GetAllEvents() { + if got, expected := event.Fields["message"], testCase.expectedMessages[idx]; got != expected { + t.Fatalf("expecting event message %q, got %q", expected, got) + } + } + }) + } +}