From 195e58ee00a949a53dd779c0f7c7b3a675a5210d Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 18 Jul 2023 11:55:59 +0000 Subject: [PATCH] [7.17](backport #36076) Fix empty file edge case (#36095) * Fix empty new file edge case (#36076) It's possible that file scanning happens when the file was created but was not yet written to. In this case the size is 0. We should not spawn any resources (e.g. harvesters) for such files until they actually have some content. We create events only when read something from a file, so having a harvester on an empty file is not useful. Not handling this edge case also causes our tests to be flaky, sometimes an expected size does not match an actual size (0). (cherry picked from commit 0cd57755c1bc7cef14a94f71970982a4e19dcb5d) # Conflicts: # filebeat/input/filestream/fswatch_test.go * Resolve conflicts --------- Co-authored-by: Denis --- filebeat/input/filestream/fswatch.go | 5 ++ filebeat/input/filestream/fswatch_test.go | 62 +++++++++++++++++++++-- 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index c6afc72b986..3203b90be1d 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -199,6 +199,11 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { // remaining files in newFiles are newly created files for path, fd := range newFilesByName { + // no need to react on empty new files + if fd.Info.Size() == 0 { + w.log.Warnf("file %q has no content yet, skipping", fd.Filename) + continue + } select { case <-ctx.Done(): return diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index 8cfcff723c4..23e2af4bbd4 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -30,6 +30,7 @@ import ( loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" ) func TestFileWatcher(t *testing.T) { @@ -219,10 +220,10 @@ scanner: paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr) @@ -252,16 +253,69 @@ scanner: require.Equal(t, loginp.OpDone, e.Op) }) + t.Run("does not emit events for empty files", func(t *testing.T) { + dir := t.TempDir() + paths := []string{filepath.Join(dir, "*.log")} + cfgStr := ` +scanner: + check_interval: 10ms +` + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + err := logp.DevelopmentSetup(logp.ToObserverOutput()) + require.NoError(t, err) + + fw := createWatcherWithConfig(t, paths, cfgStr) + go fw.Run(ctx) + + basename := "created.log" + filename := filepath.Join(dir, basename) + err = os.WriteFile(filename, nil, 0777) + require.NoError(t, err) + + t.Run("issues a warning in logs", func(t *testing.T) { + var lastWarning string + expLogMsg := fmt.Sprintf("file %q has no content yet, skipping", filename) + require.Eventually(t, func() bool { + logs := logp.ObserverLogs().FilterLevelExact(logp.WarnLevel.ZapLevel()).TakeAll() + if len(logs) == 0 { + return false + } + lastWarning = logs[len(logs)-1].Message + return strings.Contains(lastWarning, expLogMsg) + }, 100*time.Millisecond, 10*time.Millisecond, "required a warning message %q but got %q", expLogMsg, lastWarning) + }) + + t.Run("emits a create event once something is written to the empty file", func(t *testing.T) { + err = os.WriteFile(filename, []byte("hello"), 0777) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: filename, + OldPath: filename, + Op: loginp.OpWrite, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Info: testFileInfo{path: basename, size: 5}, // +5 bytes appended + }, + } + requireEqualEvents(t, expEvent, e) + }) + }) + t.Run("does not emit an event for a fingerprint collision", func(t *testing.T) { dir := t.TempDir() paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms fingerprint.enabled: true ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr)