From 6d7ad28665123d1e1624487b0b6c9d929270aab2 Mon Sep 17 00:00:00 2001
From: yamatcha <soju-yamashita@cybozu.co.jp>
Date: Wed, 20 Nov 2024 02:50:21 +0000
Subject: [PATCH] Add log-rotation-size option

---
 cmd/moco-agent/cmd/root.go | 18 ++++++++-
 docs/moco-agent.md         |  1 +
 server/rotate.go           | 48 ++++++++++++++++++++---
 server/rotate_test.go      | 78 +++++++++++++++++++++++++++++++++++---
 4 files changed, 133 insertions(+), 12 deletions(-)

diff --git a/cmd/moco-agent/cmd/root.go b/cmd/moco-agent/cmd/root.go
index 09be0e3..b54a093 100644
--- a/cmd/moco-agent/cmd/root.go
+++ b/cmd/moco-agent/cmd/root.go
@@ -52,6 +52,7 @@ var config struct {
 	connIdleTime            time.Duration
 	connectionTimeout       time.Duration
 	logRotationSchedule     string
+	logRotationSize         int64
 	readTimeout             time.Duration
 	maxDelayThreshold       time.Duration
 	socketPath              string
@@ -127,7 +128,7 @@ var rootCmd = &cobra.Command{
 		metrics.Init(registry, clusterName, index)
 
 		c := cron.New(cron.WithLogger(rLogger.WithName("cron")))
-		if _, err := c.AddFunc(config.logRotationSchedule, agent.RotateLog); err != nil {
+		if _, err := c.AddFunc(config.logRotationSchedule, func() { agent.RotateErrorLog(); agent.RotateSlowLog() }); err != nil {
 			rLogger.Error(err, "failed to parse the cron spec", "spec", config.logRotationSchedule)
 			return err
 		}
@@ -142,6 +143,20 @@ var rootCmd = &cobra.Command{
 			}
 		}()
 
+		// Rotate if the file size exceeds logRotationSize
+		ticker := time.NewTicker(time.Second)
+		if config.logRotationSize > 0 {
+			go func() {
+				select {
+				case <-ctx.Done():
+					return
+				case <-ticker.C:
+					agent.RotateLogIfSizeExceeded(config.logRotationSize)
+				}
+			}()
+			defer ticker.Stop()
+		}
+
 		reloader, err := cert.NewReloader(config.grpcCertDir, rLogger.WithName("cert-reloader"))
 		if err != nil {
 			return err
@@ -239,6 +254,7 @@ func init() {
 	fs.DurationVar(&config.connIdleTime, "max-idle-time", 30*time.Second, "The maximum amount of time a connection may be idle")
 	fs.DurationVar(&config.connectionTimeout, "connection-timeout", 5*time.Second, "Dial timeout")
 	fs.StringVar(&config.logRotationSchedule, "log-rotation-schedule", logRotationScheduleDefault, "Cron format schedule for MySQL log rotation")
+	fs.Int64Var(&config.logRotationSize, "log-rotation-size", 0, "Rotate MySQL log files when it exceeds the specified size in bytes.")
 	fs.DurationVar(&config.readTimeout, "read-timeout", 30*time.Second, "I/O read timeout")
 	fs.DurationVar(&config.maxDelayThreshold, "max-delay", time.Minute, "Acceptable max commit delay considering as ready; the zero value accepts any delay")
 	fs.StringVar(&config.socketPath, "socket-path", socketPathDefault, "Path of mysqld socket file.")
diff --git a/docs/moco-agent.md b/docs/moco-agent.md
index d0c17d5..017b5d8 100644
--- a/docs/moco-agent.md
+++ b/docs/moco-agent.md
@@ -11,6 +11,7 @@ Flags:
       --grpc-cert-dir string                 gRPC certificate directory (default "/grpc-cert")
   -h, --help                                 help for moco-agent
       --log-rotation-schedule string         Cron format schedule for MySQL log rotation (default "*/5 * * * *")
+      --log-rotation-size int                Rotate MySQL log files when it exceeds the specified size in bytes.
       --logfile string                       Log filename
       --logformat string                     Log format [plain,logfmt,json]
       --loglevel string                      Log level [critical,error,warning,info,debug]
diff --git a/server/rotate.go b/server/rotate.go
index cd1ced0..7b830b8 100644
--- a/server/rotate.go
+++ b/server/rotate.go
@@ -10,8 +10,8 @@ import (
 	"github.com/cybozu-go/moco-agent/metrics"
 )
 
-// RotateLog rotates log files
-func (a *Agent) RotateLog() {
+// RotateLog rotates error log files
+func (a *Agent) RotateErrorLog() {
 	ctx := context.Background()
 
 	metrics.LogRotationCount.Inc()
@@ -25,16 +25,33 @@ func (a *Agent) RotateLog() {
 		return
 	}
 
+	if _, err := a.db.ExecContext(ctx, "FLUSH LOCAL ERROR LOGS"); err != nil {
+		a.logger.Error(err, "failed to exec FLUSH LOCAL ERROR LOGS")
+		metrics.LogRotationFailureCount.Inc()
+		return
+	}
+
+	durationSeconds := time.Since(startTime).Seconds()
+	metrics.LogRotationDurationSeconds.Observe(durationSeconds)
+}
+
+// RotateLog rotates slow log files
+func (a *Agent) RotateSlowLog() {
+	ctx := context.Background()
+
+	metrics.LogRotationCount.Inc()
+	startTime := time.Now()
+
 	slowFile := filepath.Join(a.logDir, mocoagent.MySQLSlowLogName)
-	err = os.Rename(slowFile, slowFile+".0")
+	err := os.Rename(slowFile, slowFile+".0")
 	if err != nil && !os.IsNotExist(err) {
 		a.logger.Error(err, "failed to rotate slow query log file")
 		metrics.LogRotationFailureCount.Inc()
 		return
 	}
 
-	if _, err := a.db.ExecContext(ctx, "FLUSH LOCAL ERROR LOGS, SLOW LOGS"); err != nil {
-		a.logger.Error(err, "failed to exec FLUSH LOCAL LOGS")
+	if _, err := a.db.ExecContext(ctx, "FLUSH LOCAL SLOW LOGS"); err != nil {
+		a.logger.Error(err, "failed to exec FLUSH LOCAL SLOW LOGS")
 		metrics.LogRotationFailureCount.Inc()
 		return
 	}
@@ -42,3 +59,24 @@ func (a *Agent) RotateLog() {
 	durationSeconds := time.Since(startTime).Seconds()
 	metrics.LogRotationDurationSeconds.Observe(durationSeconds)
 }
+
+// RotateLogIfSizeExceeded rotates log files if it exceeds rotationSize
+func (a *Agent) RotateLogIfSizeExceeded(rotationSize int64) {
+	errFile := filepath.Join(a.logDir, mocoagent.MySQLErrorLogName)
+	errFileStat, err := os.Stat(errFile)
+	if err != nil {
+		a.logger.Error(err, "failed to get stat of error log file")
+	}
+	if errFileStat.Size() > rotationSize {
+		a.RotateErrorLog()
+	}
+
+	slowFile := filepath.Join(a.logDir, mocoagent.MySQLSlowLogName)
+	slowFileStat, err := os.Stat(slowFile)
+	if err != nil {
+		a.logger.Error(err, "failed to get stat of slow query log file")
+	}
+	if slowFileStat.Size() > rotationSize {
+		a.RotateSlowLog()
+	}
+}
diff --git a/server/rotate_test.go b/server/rotate_test.go
index a1a9695..8dca622 100644
--- a/server/rotate_test.go
+++ b/server/rotate_test.go
@@ -1,6 +1,7 @@
 package server
 
 import (
+	"bytes"
 	"os"
 	"path/filepath"
 	"time"
@@ -12,7 +13,7 @@ import (
 	"github.com/prometheus/client_golang/prometheus/testutil"
 )
 
-var _ = Describe("log rotation", func() {
+var _ = Describe("log rotation", Ordered, func() {
 	It("should rotate logs", func() {
 		By("starting MySQLd")
 		StartMySQLD(replicaHost, replicaPort, replicaServerID)
@@ -45,13 +46,14 @@ var _ = Describe("log rotation", func() {
 			Expect(err).ShouldNot(HaveOccurred())
 		}
 
-		agent.RotateLog()
+		agent.RotateErrorLog()
+		agent.RotateSlowLog()
 
 		for _, file := range logFiles {
 			_, err := os.Stat(file + ".0")
 			Expect(err).ShouldNot(HaveOccurred())
 		}
-		Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 1))
+		Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 2))
 		Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 0))
 
 		By("creating the same name directory")
@@ -62,9 +64,73 @@ var _ = Describe("log rotation", func() {
 			Expect(err).ShouldNot(HaveOccurred())
 		}
 
-		agent.RotateLog()
+		agent.RotateErrorLog()
+		agent.RotateSlowLog()
 
-		Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 2))
-		Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 1))
+		Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 4))
+		Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 2))
+	})
+
+	It("should rotate logs by RotateLogIfSizeExceeded if size exceeds", func() {
+		By("starting MySQLd")
+		StartMySQLD(replicaHost, replicaPort, replicaServerID)
+		defer StopAndRemoveMySQLD(replicaHost)
+
+		sockFile := filepath.Join(socketDir(replicaHost), "mysqld.sock")
+		tmpDir, err := os.MkdirTemp("", "moco-test-agent-")
+		Expect(err).NotTo(HaveOccurred())
+		defer os.RemoveAll(tmpDir)
+
+		conf := MySQLAccessorConfig{
+			Host:              "localhost",
+			Port:              replicaPort,
+			Password:          agentUserPassword,
+			ConnMaxIdleTime:   30 * time.Minute,
+			ConnectionTimeout: 3 * time.Second,
+			ReadTimeout:       30 * time.Second,
+		}
+		agent, err := New(conf, testClusterName, sockFile, tmpDir, maxDelayThreshold, time.Second, testLogger)
+		Expect(err).ShouldNot(HaveOccurred())
+		defer agent.CloseDB()
+
+		By("preparing log files for testing")
+		slowFile := filepath.Join(tmpDir, mocoagent.MySQLSlowLogName)
+		errFile := filepath.Join(tmpDir, mocoagent.MySQLErrorLogName)
+		logFiles := []string{slowFile, errFile}
+
+		logDataSize := 512
+		data := bytes.Repeat([]byte("a"), logDataSize)
+		for _, file := range logFiles {
+			f, err := os.Create(file)
+			Expect(err).ShouldNot(HaveOccurred())
+			f.Write(data)
+		}
+
+		agent.RotateLogIfSizeExceeded(int64(logDataSize) + 1)
+
+		Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 4))
+		Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 2))
+
+		agent.RotateLogIfSizeExceeded(int64(logDataSize) - 1)
+
+		for _, file := range logFiles {
+			_, err := os.Stat(file + ".0")
+			Expect(err).ShouldNot(HaveOccurred())
+		}
+		Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 6))
+		Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 2))
+
+		By("creating the same name directory")
+		for _, file := range logFiles {
+			err := os.Rename(file+".0", file)
+			Expect(err).ShouldNot(HaveOccurred())
+			err = os.Mkdir(file+".0", 0777)
+			Expect(err).ShouldNot(HaveOccurred())
+		}
+
+		agent.RotateLogIfSizeExceeded(int64(logDataSize) - 1)
+
+		Expect(testutil.ToFloat64(metrics.LogRotationCount)).To(BeNumerically("==", 8))
+		Expect(testutil.ToFloat64(metrics.LogRotationFailureCount)).To(BeNumerically("==", 4))
 	})
 })