Skip to content

Commit

Permalink
logger file: compression is no more blocking (#854)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmachard authored Oct 27, 2024
1 parent b09de44 commit 9e8f442
Showing 1 changed file with 129 additions and 82 deletions.
211 changes: 129 additions & 82 deletions workers/logfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package workers
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
Expand All @@ -16,6 +15,8 @@ import (
"strings"
"time"

"github.com/klauspost/compress/gzip"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-dnscollector/transformers"
Expand Down Expand Up @@ -53,20 +54,28 @@ type LogFile struct {
fileFd *os.File
fileSize int64
fileDir, fileName, fileExt, filePrefix string
commpressTimer *time.Timer
textFormat []string
compressQueue chan string
}

func NewLogFile(config *pkgconfig.Config, logger *logger.Logger, name string) *LogFile {
bufSize := config.Global.Worker.ChannelBufferSize
if config.Loggers.LogFile.ChannelBufferSize > 0 {
bufSize = config.Loggers.LogFile.ChannelBufferSize
}
w := &LogFile{GenericWorker: NewGenericWorker(config, logger, name, "file", bufSize, pkgconfig.DefaultMonitor)}
w := &LogFile{
GenericWorker: NewGenericWorker(config, logger, name, "file", bufSize, pkgconfig.DefaultMonitor),
compressQueue: make(chan string, 1),
}
w.ReadConfig()
if err := w.OpenFile(); err != nil {
if err := w.OpenCurrentFile(); err != nil {
w.LogFatal(pkgconfig.PrefixLogWorker+"["+name+"] file - unable to open output file:", err)
}

// start compressor
go w.startCompressor()
w.initializeCompressionQueue()

return w
}

Expand All @@ -88,7 +97,7 @@ func (w *LogFile) ReadConfig() {
w.LogInfo("running in mode: %s", w.GetConfig().Loggers.LogFile.Mode)
}

func (w *LogFile) Cleanup() error {
func (w *LogFile) RemoveOldFiles() error {
if w.GetConfig().Loggers.LogFile.MaxFiles == 0 {
return nil
}
Expand Down Expand Up @@ -124,7 +133,7 @@ func (w *LogFile) Cleanup() error {
sort.Ints(logFiles)

// too much log files ?
diffNB := len(logFiles) - w.GetConfig().Loggers.LogFile.MaxFiles
diffNB := len(logFiles) - (w.GetConfig().Loggers.LogFile.MaxFiles - 1)
if diffNB > 0 {
for i := 0; i < diffNB; i++ {
filename := fmt.Sprintf("%s-%d%s", w.filePrefix, logFiles[i], w.fileExt)
Expand All @@ -141,7 +150,8 @@ func (w *LogFile) Cleanup() error {
return nil
}

func (w *LogFile) OpenFile() error {
func (w *LogFile) OpenCurrentFile() error {
w.LogInfo("create new log file: %s", w.GetConfig().Loggers.LogFile.FilePath)

fd, err := os.OpenFile(w.GetConfig().Loggers.LogFile.FilePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
Expand Down Expand Up @@ -177,87 +187,79 @@ func (w *LogFile) OpenFile() error {

}

w.LogInfo("file opened with success: %s", w.GetConfig().Loggers.LogFile.FilePath)
w.LogInfo("new log file created")
return nil
}

func (w *LogFile) GetMaxSize() int64 {
return int64(1024*1024) * int64(w.GetConfig().Loggers.LogFile.MaxSize)
}

func (w *LogFile) CompressFile() {
entries, err := os.ReadDir(w.fileDir)
func (w *LogFile) compressFile(filename string) {
w.LogInfo("start to compress in background: %s", filename)

// prepare dest filename
baseName := filepath.Base(filename)
baseName = strings.TrimPrefix(baseName, "tocompress-")
tmpFile := filename + compressSuffix
dstFile := filepath.Join(filepath.Dir(filename), baseName+compressSuffix)

fd, err := os.Open(filename)
if err != nil {
w.LogError("unable to list all files: %s", err)
w.LogError("compress - failed to open file: %s", err)
return
}
defer fd.Close()

for _, entry := range entries {
// ignore folder
if entry.IsDir() {
continue
}

matched, _ := regexp.MatchString(`^`+w.filePrefix+`-\d+`+w.fileExt+`$`, entry.Name())
if matched {
src := filepath.Join(w.fileDir, entry.Name())
dst := filepath.Join(w.fileDir, entry.Name()+compressSuffix)

fd, err := os.Open(src)
if err != nil {
w.LogError("compress - failed to open file: ", err)
continue
}
defer fd.Close()

fi, err := os.Stat(src)
if err != nil {
w.LogError("compress - failed to stat file: ", err)
continue
}
fi, err := os.Stat(filename)
if err != nil {
w.LogError("compress - failed to stat file: %s", err)
return
}

gzf, err := os.OpenFile(dst, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, fi.Mode())
if err != nil {
w.LogError("compress - failed to open compressed file: ", err)
continue
}
defer gzf.Close()
gzf, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, fi.Mode())
if err != nil {
w.LogError("compress - failed to open compressed file: %s", err)
return
}
defer gzf.Close()

gz := gzip.NewWriter(gzf)
gz := gzip.NewWriter(gzf)

if _, err := io.Copy(gz, fd); err != nil {
w.LogError("compress - failed to compress file: ", err)
os.Remove(dst)
continue
}
if err := gz.Close(); err != nil {
w.LogError("compress - failed to close gz writer: ", err)
os.Remove(dst)
continue
}
if err := gzf.Close(); err != nil {
w.LogError("compress - failed to close gz file: ", err)
os.Remove(dst)
continue
}

if err := fd.Close(); err != nil {
w.LogError("compress - failed to close log file: ", err)
os.Remove(dst)
continue
}
if err := os.Remove(src); err != nil {
w.LogError("compress - failed to remove log file: ", err)
os.Remove(dst)
continue
}
if _, err := io.Copy(gz, fd); err != nil {
w.LogError("compress - failed to compress file: %s", err)
os.Remove(tmpFile)
return
}
if err := gz.Close(); err != nil {
w.LogError("compress - failed to close gz writer: %s", err)
os.Remove(tmpFile)
return
}
if err := gzf.Close(); err != nil {
w.LogError("compress - failed to close gz file: %s", err)
os.Remove(tmpFile)
return
}

// post rotate command?
w.CompressPostRotateCommand(dst)
}
if err := fd.Close(); err != nil {
w.LogError("compress - failed to close log file: %s", err)
os.Remove(tmpFile)
return
}
if err := os.Remove(filename); err != nil {
w.LogError("compress - failed to remove log file: %s", err)
os.Remove(tmpFile)
return
}

w.commpressTimer.Reset(time.Duration(w.GetConfig().Loggers.LogFile.CompressInterval) * time.Second)
// finally rename the gzip file
if err := os.Rename(tmpFile, dstFile); err != nil {
w.LogError("compress - unable to rename file: %s", err)
os.Remove(tmpFile)
return
}
w.LogInfo("compression terminated - %s", dstFile)
}

func (w *LogFile) PostRotateCommand(filename string) {
Expand Down Expand Up @@ -305,24 +307,34 @@ func (w *LogFile) RotateFile() error {
}

// Rename current log file
bfpath := filepath.Join(w.fileDir, fmt.Sprintf("%s-%d%s", w.filePrefix, time.Now().UnixNano(), w.fileExt))
newFilename := fmt.Sprintf("%s-%d%s", w.filePrefix, time.Now().UnixNano(), w.fileExt)
if w.config.Loggers.LogFile.Compress {
newFilename = fmt.Sprintf("tocompress-%s", newFilename)
}
bfpath := filepath.Join(w.fileDir, newFilename)
err := os.Rename(w.GetConfig().Loggers.LogFile.FilePath, bfpath)
if err != nil {
return err
}

// post rotate command?
w.PostRotateCommand(bfpath)
if w.config.Loggers.LogFile.Compress {
go func() {
w.compressQueue <- bfpath // Envoi asynchrone dans le canal pour compression
}()
} else {
w.PostRotateCommand(bfpath)
}

// keep only max files
err = w.Cleanup()
err = w.RemoveOldFiles()
if err != nil {
w.LogError("unable to cleanup log files: %s", err)
return err
}

// re-create new one
if err := w.OpenFile(); err != nil {
if err := w.OpenCurrentFile(); err != nil {
w.LogError("unable to re-create file: %s", err)
return err
}
Expand Down Expand Up @@ -399,6 +411,44 @@ func (w *LogFile) WriteToDnstap(data []byte) {
w.fileSize += int64(n)
}

func (w *LogFile) initializeCompressionQueue() {
// Get all files in the log directory
files, err := os.ReadDir(w.fileDir)
if err != nil {
w.LogError("error reading log directory: %v", err)
return
}

// Find files that start with "tocompress-"
for _, file := range files {
fileName := file.Name()

// Check if the file is both marked for compression and has a `.gz` suffix
if strings.HasPrefix(fileName, "tocompress-") && strings.HasSuffix(fileName, ".gz") {
// Build the full path of the file
fullPath := filepath.Join(w.fileDir, fileName)

// Attempt to remove incomplete .gz file
if err := os.Remove(fullPath); err != nil {
w.LogError("error deleting incomplete compressed file %s: %v", fileName, err)
}
continue
}

// If it's a pending compression file, add it to the compression queue
if strings.HasPrefix(fileName, "tocompress-") && !strings.HasSuffix(fileName, ".gz") {
fullPath := filepath.Join(w.fileDir, fileName)
w.compressQueue <- fullPath
}
}
}

func (w *LogFile) startCompressor() {
for filename := range w.compressQueue {
w.compressFile(filename)
}
}

func (w *LogFile) StartCollect() {
w.LogInfo("starting data collection")
defer w.CollectDone()
Expand Down Expand Up @@ -432,6 +482,7 @@ func (w *LogFile) StartCollect() {
w.LogInfo("input channel closed!")
return
}

// count global messages
w.CountIngressTraffic()

Expand All @@ -447,6 +498,7 @@ func (w *LogFile) StartCollect() {

// send to output channel
w.CountEgressTraffic()

w.GetOutputChannel() <- dm

// send to next ?
Expand All @@ -462,7 +514,6 @@ func (w *LogFile) StartLogging() {
// prepare some timers
flushInterval := time.Duration(w.GetConfig().Loggers.LogFile.FlushInterval) * time.Second
flushTimer := time.NewTimer(flushInterval)
w.commpressTimer = time.NewTimer(time.Duration(w.GetConfig().Loggers.LogFile.CompressInterval) * time.Second)

buffer := new(bytes.Buffer)
var data []byte
Expand All @@ -476,9 +527,10 @@ func (w *LogFile) StartLogging() {
for {
select {
case <-w.OnLoggerStopped():
close(w.compressQueue)

// stop timer
flushTimer.Stop()
w.commpressTimer.Stop()

// Force write remaining batch data
if batchSize > 0 {
Expand Down Expand Up @@ -585,11 +637,6 @@ func (w *LogFile) StartLogging() {
buffer.Reset()
flushTimer.Reset(flushInterval)

case <-w.commpressTimer.C:
if w.GetConfig().Loggers.LogFile.Compress {
w.CompressFile()
}

}
}
}

0 comments on commit 9e8f442

Please sign in to comment.