From 0cd57755c1bc7cef14a94f71970982a4e19dcb5d Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 18 Jul 2023 10:06:57 +0200 Subject: [PATCH] Fix empty new file edge case (#36076) It's possible that file scanning happens when the file was created but was not yet written to. In this case the size is 0. We should not spawn any resources (e.g. harvesters) for such files until they actually have some content. We create events only when read something from a file, so having a harvester on an empty file is not useful. Not handling this edge case also causes our tests to be flaky, sometimes an expected size does not match an actual size (0). --- filebeat/input/filestream/fswatch.go | 5 ++ filebeat/input/filestream/fswatch_test.go | 62 +++++++++++++++++++++-- 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index 0c14bcee8ce..b5b02ab11c7 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -199,6 +199,11 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { // remaining files in newFiles are newly created files for path, fd := range newFilesByName { + // no need to react on empty new files + if fd.Info.Size() == 0 { + w.log.Warnf("file %q has no content yet, skipping", fd.Filename) + continue + } select { case <-ctx.Done(): return diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index 2a30dd8b3cd..d2dbc893b8e 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -30,6 +30,7 @@ import ( loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) func TestFileWatcher(t *testing.T) { @@ -219,10 +220,10 @@ scanner: paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr) @@ -252,16 +253,69 @@ scanner: require.Equal(t, loginp.OpDone, e.Op) }) + t.Run("does not emit events for empty files", func(t *testing.T) { + dir := t.TempDir() + paths := []string{filepath.Join(dir, "*.log")} + cfgStr := ` +scanner: + check_interval: 10ms +` + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + err := logp.DevelopmentSetup(logp.ToObserverOutput()) + require.NoError(t, err) + + fw := createWatcherWithConfig(t, paths, cfgStr) + go fw.Run(ctx) + + basename := "created.log" + filename := filepath.Join(dir, basename) + err = os.WriteFile(filename, nil, 0777) + require.NoError(t, err) + + t.Run("issues a warning in logs", func(t *testing.T) { + var lastWarning string + expLogMsg := fmt.Sprintf("file %q has no content yet, skipping", filename) + require.Eventually(t, func() bool { + logs := logp.ObserverLogs().FilterLevelExact(logp.WarnLevel.ZapLevel()).TakeAll() + if len(logs) == 0 { + return false + } + lastWarning = logs[len(logs)-1].Message + return strings.Contains(lastWarning, expLogMsg) + }, 100*time.Millisecond, 10*time.Millisecond, "required a warning message %q but got %q", expLogMsg, lastWarning) + }) + + t.Run("emits a create event once something is written to the empty file", func(t *testing.T) { + err = os.WriteFile(filename, []byte("hello"), 0777) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: filename, + OldPath: filename, + Op: loginp.OpWrite, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Info: testFileInfo{name: basename, size: 5}, // +5 bytes appended + }, + } + requireEqualEvents(t, expEvent, e) + }) + }) + t.Run("does not emit an event for a fingerprint collision", func(t *testing.T) { dir := t.TempDir() paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms fingerprint.enabled: true ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr)