From 45f2b4d146e7e1b71135208f14fd2c10b15988c0 Mon Sep 17 00:00:00 2001 From: Rico Berger Date: Wed, 23 Nov 2022 09:48:01 +0100 Subject: [PATCH] Add Metrics (#41) klogs exports the following metrics now: - "klogs_input_records_total": The number of received records from Fluent Bit / Kafka. - "klogs_errors_total": The number of errors when writing logs to ClickHouse or receiving messages from Kafka. - "klogs_batch_size": The number of logs written to ClickHouse in one write. - "klogs_flush_time_seconds": The time in seconds needed to write logs to ClickHouse. The metrics server is listining on ":2021" per default. This can be changed via the "Metrics_Server_Address" setting or the "--metrics-server.address" flag in the ingester. --- cluster/fluent-bit/ingester/ingester.yaml | 2 +- cmd/ingester/README.md | 1 + cmd/ingester/main.go | 32 ++- cmd/plugin/README.md | 1 + cmd/plugin/main.go | 45 +++- pkg/kafka/consumer.go | 253 +++++++++++----------- pkg/metrics/metrics.go | 93 ++++++++ 7 files changed, 267 insertions(+), 160 deletions(-) create mode 100644 pkg/metrics/metrics.go diff --git a/cluster/fluent-bit/ingester/ingester.yaml b/cluster/fluent-bit/ingester/ingester.yaml index 2115bc7..bb73cc3 100644 --- a/cluster/fluent-bit/ingester/ingester.yaml +++ b/cluster/fluent-bit/ingester/ingester.yaml @@ -30,7 +30,7 @@ spec: - --log.format=json - --log.level=trace ports: - - containerPort: 8080 + - containerPort: 2021 name: http livenessProbe: httpGet: diff --git a/cmd/ingester/README.md b/cmd/ingester/README.md index c5976eb..bcf8976 100644 --- a/cmd/ingester/README.md +++ b/cmd/ingester/README.md @@ -10,6 +10,7 @@ An example Deployment for the ClickHouse ingester can be found in the [ingester. | Command-Line Flag | Environment Variable | Description | Default | | ----------------- | -------------------- | ----------- | ------- | +| `--metrics-server.address` | `METRICS_SERVER_ADDRESS` | The address, where the metrics server should listen on. | `:2021` | | `--clickhouse.address` | `CLICKHOUSE_ADDRESS` | ClickHouse address to connect to. | | | `--clickhouse.database` | `CLICKHOUSE_DATABASE` | ClickHouse database name. | `logs` | | `--clickhouse.username` | `CLICKHOUSE_USERNAME` | ClickHouse username for the connection. | | diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index c95a4f4..16dbe7c 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -1,9 +1,7 @@ package main import ( - "context" "fmt" - "net/http" "os" "strconv" "strings" @@ -12,15 +10,16 @@ import ( "github.com/kobsio/klogs/pkg/clickhouse" "github.com/kobsio/klogs/pkg/kafka" "github.com/kobsio/klogs/pkg/log" + "github.com/kobsio/klogs/pkg/metrics" "github.com/kobsio/klogs/pkg/version" - "github.com/prometheus/client_golang/prometheus/promhttp" flag "github.com/spf13/pflag" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) var ( + metricsServerAddress string clickhouseAddress string clickhouseUsername string clickhousePassword string @@ -48,6 +47,11 @@ var ( // init is used to set the defaults for all configuration parameters and to set all flags and environment variables, for // the ClickHouse, Kafka and logging configuration. func init() { + defaultMetricsServerAddress := ":2021" + if os.Getenv("METRICS_SERVER_ADDRESS") != "" { + defaultMetricsServerAddress = os.Getenv("METRICS_SERVER_ADDRESS") + } + defaultCickhouseAddress := "" if os.Getenv("CLICKHOUSE_ADDRESS") != "" { defaultCickhouseAddress = os.Getenv("CLICKHOUSE_ADDRESS") @@ -182,6 +186,8 @@ func init() { defaultLogLevel = os.Getenv("LOG_LEVEL") } + flag.StringVar(&metricsServerAddress, "metrics-server.address", defaultMetricsServerAddress, "The address, where the metrics server should listen on.") + flag.StringVar(&clickhouseAddress, "clickhouse.address", defaultCickhouseAddress, "ClickHouse address to connect to.") flag.StringVar(&clickhouseUsername, "clickhouse.username", defaultClickHouseUsername, "ClickHouse username for the connection.") flag.StringVar(&clickhousePassword, "clickhouse.password", defaultClickHousePassword, "ClickHouse password for the connection.") @@ -261,20 +267,10 @@ func main() { log.Info(nil, "Clickhouse configuration", zap.String("clickhouseAddress", clickhouseAddress), zap.String("clickhouseUsername", clickhouseUsername), zap.String("clickhousePassword", "*****"), zap.String("clickhouseDatabase", clickhouseDatabase), zap.String("clickhouseDialTimeout", clickhouseDialTimeout), zap.String("clickhouseConnMaxLifetime", clickhouseConnMaxLifetime), zap.Int("clickhouseMaxIdleConns", clickhouseMaxIdleConns), zap.Int("clickhouseMaxOpenConns", clickhouseMaxOpenConns), zap.Int64("clickhouseBatchSize", clickhouseBatchSize), zap.Duration("clickhouseFlushInterval", clickhouseFlushInterval)) log.Info(nil, "Kafka configuration", zap.String("kafkaBrokers", kafkaBrokers), zap.String("kafkaGroup", kafkaGroup), zap.String("kafkaVersion", kafkaVersion), zap.String("kafkaTopics", kafkaTopics)) - // Create a http server, which can be used for the liveness and readiness probe in Kubernetes. The server also - // serves our Prometheus metrics. - router := http.NewServeMux() - router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "OK") - }) - router.Handle("/metrics", promhttp.Handler()) - - server := http.Server{ - Addr: ":8080", - Handler: router, - } - - go server.ListenAndServe() + // Create the metrics server, which serves the Prometheus metrics for the ingester. The server can also be used to + // define a readiness and liveness probe. + metricsServer := metrics.New(metricsServerAddress) + go metricsServer.Start() // Create a new client for the configured ClickHouse instance. Then pass the ClickHouse client to the Run function // of the Kafka package, which listens for message in the configured Kafka instance. These messages are then written @@ -285,5 +281,5 @@ func main() { } kafka.Run(kafkaBrokers, kafkaGroup, kafkaVersion, kafkaTopics, kafkaTimestampKey, clickhouseBatchSize, clickhouseFlushInterval, clickhouseForceNumberFields, clickhouseForceUnderscores, client) - server.Shutdown(context.Background()) + metricsServer.Stop() } diff --git a/cmd/plugin/README.md b/cmd/plugin/README.md index 6e88aae..62f68ea 100644 --- a/cmd/plugin/README.md +++ b/cmd/plugin/README.md @@ -10,6 +10,7 @@ An example configuration file can be found in the [fluent-bit-cm.yaml](../../clu | Option | Description | Default | | ------ | ----------- | ------- | +| `Metrics_Server_Address` | The address, where the metrics server should listen on. | `:2021` | | `Address` | The address, where ClickHouse is listining on, e.g. `clickhouse-clickhouse.clickhouse.svc.cluster.local:9000`. | | | `Database` | The name of the database for the logs. | `logs` | | `Username` | The username, to authenticate to ClickHouse. | | diff --git a/cmd/plugin/main.go b/cmd/plugin/main.go index ad58ce7..0f1b04f 100644 --- a/cmd/plugin/main.go +++ b/cmd/plugin/main.go @@ -22,16 +22,18 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" ) +import "github.com/kobsio/klogs/pkg/metrics" const ( - defaultDatabase string = "logs" - defaultDialTimeout string = "10s" - defaultConnMaxLifetime string = "1h" - defaultMaxIdleConns int = 1 - defaultMaxOpenConns int = 1 - defaultBatchSize int64 = 10000 - defaultFlushInterval time.Duration = 60 * time.Second - defaultForceUnderscores bool = false + defaultMetricsServerAddress string = ":2021" + defaultDatabase string = "logs" + defaultDialTimeout string = "10s" + defaultConnMaxLifetime string = "1h" + defaultMaxIdleConns int = 1 + defaultMaxOpenConns int = 1 + defaultBatchSize int64 = 10000 + defaultFlushInterval time.Duration = 60 * time.Second + defaultForceUnderscores bool = false ) var ( @@ -42,6 +44,7 @@ var ( forceUnderscores bool lastFlush = time.Now() client *clickhouse.Client + metricsServer metrics.Server ) //export FLBPluginRegister @@ -93,6 +96,21 @@ func FLBPluginInit(plugin unsafe.Pointer) int { log.Info(nil, "Version information", version.Info()...) log.Info(nil, "Build context", version.BuildContext()...) + // Read the configuration for the address where the metrics server should listen on. Then create a new metrics + // server and start the server in a new Go routine via the `Start` method. + // + // When the plugin exits the metrics server should be stopped via the `Stop` method. + metricsServerAddress := output.FLBPluginConfigKey(plugin, "metrics_server_address") + if metricsServerAddress == "" { + metricsServerAddress = defaultMetricsServerAddress + } + + metricsServer = metrics.New(metricsServerAddress) + go metricsServer.Start() + + // Read all configuration values required for the ClickHouse client. Once we have all configuration values we + // create a new ClickHouse client, which can then be used to write the logs from Fluent Bit into ClickHouse when the + // FLBPluginFlushCtx function is called. address := output.FLBPluginConfigKey(plugin, "address") database = output.FLBPluginConfigKey(plugin, "database") @@ -193,6 +211,8 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int break } + metrics.InputRecordsTotalMetric.Inc() + var timestamp time.Time switch t := ts.(type) { case output.FLBTime: @@ -312,18 +332,22 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int } startFlushTime := time.Now() - if client.BufferLen() < int(batchSize) && lastFlush.Add(flushInterval).After(startFlushTime) { + currentBatchSize := client.BufferLen() + if currentBatchSize < int(batchSize) && lastFlush.Add(flushInterval).After(startFlushTime) { return output.FLB_OK } - log.Info(nil, "Start flushing", zap.Int("batchSize", client.BufferLen()), zap.Duration("flushInterval", startFlushTime.Sub(lastFlush))) + log.Info(nil, "Start flushing", zap.Int("batchSize", currentBatchSize), zap.Duration("flushInterval", startFlushTime.Sub(lastFlush))) err := client.BufferWrite() if err != nil { + metrics.ErrorsTotalMetric.Inc() log.Error(nil, "Error while writing buffer", zap.Error(err)) return output.FLB_ERROR } lastFlush = time.Now() + metrics.BatchSizeMetric.Observe(float64(currentBatchSize)) + metrics.FlushTimeSecondsMetric.Observe(lastFlush.Sub(startFlushTime).Seconds()) log.Info(nil, "End flushing", zap.Duration("flushTime", lastFlush.Sub(startFlushTime))) return output.FLB_OK @@ -337,6 +361,7 @@ func FLBPluginExit() int { //export FLBPluginExitCtx func FLBPluginExitCtx(ctx unsafe.Pointer) int { + metricsServer.Stop() return output.FLB_OK } diff --git a/pkg/kafka/consumer.go b/pkg/kafka/consumer.go index 5f2670c..3b52fe3 100644 --- a/pkg/kafka/consumer.go +++ b/pkg/kafka/consumer.go @@ -11,26 +11,12 @@ import ( "github.com/kobsio/klogs/pkg/clickhouse" flatten "github.com/kobsio/klogs/pkg/flatten/string" "github.com/kobsio/klogs/pkg/log" + "github.com/kobsio/klogs/pkg/metrics" "github.com/Shopify/sarama" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/zap" ) -var ( - flushIntervalMetric = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "kafka_clickhouse", - Name: "flush_interval_seconds", - Help: "The flush interval describes after which time the messages from Kafka are written to ClickHouse", - }) - batchSizeMetric = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "kafka_clickhouse", - Name: "batch_size_count", - Help: "The batch size describes how many messages from Kafka are written to ClickHouse", - }) -) - // Consumer represents a Sarama consumer group consumer. type Consumer struct { ready chan bool @@ -60,138 +46,143 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai // NOTE: Do not move the code below to a goroutine. The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29 for message := range claim.Messages() { + metrics.InputRecordsTotalMetric.Inc() + var record map[string]interface{} record = make(map[string]interface{}) err := json.Unmarshal(message.Value, &record) if err != nil { + metrics.ErrorsTotalMetric.Inc() log.Error(nil, "Could not unmarshal log line", zap.Error(err)) - } - - data, err := flatten.Flatten(record) - if err != nil { - log.Error(nil, "Could not flat data", zap.Error(err)) - break - } - - row := clickhouse.Row{ - FieldsString: make(map[string]string), - FieldsNumber: make(map[string]float64), - } - - for k, v := range data { - var stringValue string - var numberValue float64 - var isNumber bool - var isNil bool - - switch t := v.(type) { - case nil: - isNil = true - case string: - stringValue = t - case []byte: - stringValue = string(t) - case int: - isNumber = true - numberValue = float64(t) - case int8: - isNumber = true - numberValue = float64(t) - case int16: - isNumber = true - numberValue = float64(t) - case int32: - isNumber = true - numberValue = float64(t) - case int64: - isNumber = true - numberValue = float64(t) - case float32: - isNumber = true - numberValue = float64(t) - case float64: - isNumber = true - numberValue = float64(t) - case uint8: - isNumber = true - numberValue = float64(t) - case uint16: - isNumber = true - numberValue = float64(t) - case uint32: - isNumber = true - numberValue = float64(t) - case uint64: - isNumber = true - numberValue = float64(t) - default: - stringValue = fmt.Sprintf("%v", v) - } + } else { + data, err := flatten.Flatten(record) + if err != nil { + metrics.ErrorsTotalMetric.Inc() + log.Error(nil, "Could not flat data", zap.Error(err)) + } else { + row := clickhouse.Row{ + FieldsString: make(map[string]string), + FieldsNumber: make(map[string]float64), + } - if !isNil { - switch k { - case consumer.timestampKey: - parsedTime, err := strconv.ParseFloat(stringValue, 64) - if err != nil { - log.Warn(nil, "Could not parse timestamp, defaulting to now", zap.Error(err)) - row.Timestamp = time.Now() - } else { - sec, dec := math.Modf(parsedTime) - row.Timestamp = time.Unix(int64(sec), int64(dec*(1e9))) - } - case "cluster": - row.Cluster = stringValue - case "kubernetes_namespace_name": - row.Namespace = stringValue - case "kubernetes_labels_k8s-app": - row.App = stringValue - case "kubernetes_labels_app": - row.App = stringValue - case "kubernetes_pod_name": - row.Pod = stringValue - case "kubernetes_container_name": - row.Container = stringValue - case "kubernetes_host": - row.Host = stringValue - case "log": - row.Log = stringValue - default: - formattedKey := k - if consumer.clickhouseForceUnderscores { - formattedKey = strings.ReplaceAll(k, ".", "_") + for k, v := range data { + var stringValue string + var numberValue float64 + var isNumber bool + var isNil bool + + switch t := v.(type) { + case nil: + isNil = true + case string: + stringValue = t + case []byte: + stringValue = string(t) + case int: + isNumber = true + numberValue = float64(t) + case int8: + isNumber = true + numberValue = float64(t) + case int16: + isNumber = true + numberValue = float64(t) + case int32: + isNumber = true + numberValue = float64(t) + case int64: + isNumber = true + numberValue = float64(t) + case float32: + isNumber = true + numberValue = float64(t) + case float64: + isNumber = true + numberValue = float64(t) + case uint8: + isNumber = true + numberValue = float64(t) + case uint16: + isNumber = true + numberValue = float64(t) + case uint32: + isNumber = true + numberValue = float64(t) + case uint64: + isNumber = true + numberValue = float64(t) + default: + stringValue = fmt.Sprintf("%v", v) } - if isNumber { - row.FieldsNumber[formattedKey] = numberValue - } else { - if contains(k, consumer.clickhouseForceNumberFields) { - parsedNumber, err := strconv.ParseFloat(stringValue, 64) - if err == nil { - row.FieldsNumber[formattedKey] = parsedNumber + if !isNil { + switch k { + case consumer.timestampKey: + parsedTime, err := strconv.ParseFloat(stringValue, 64) + if err != nil { + log.Warn(nil, "Could not parse timestamp, defaulting to now", zap.Error(err)) + row.Timestamp = time.Now() + } else { + sec, dec := math.Modf(parsedTime) + row.Timestamp = time.Unix(int64(sec), int64(dec*(1e9))) + } + case "cluster": + row.Cluster = stringValue + case "kubernetes_namespace_name": + row.Namespace = stringValue + case "kubernetes_labels_k8s-app": + row.App = stringValue + case "kubernetes_labels_app": + row.App = stringValue + case "kubernetes_pod_name": + row.Pod = stringValue + case "kubernetes_container_name": + row.Container = stringValue + case "kubernetes_host": + row.Host = stringValue + case "log": + row.Log = stringValue + default: + formattedKey := k + if consumer.clickhouseForceUnderscores { + formattedKey = strings.ReplaceAll(k, ".", "_") + } + + if isNumber { + row.FieldsNumber[formattedKey] = numberValue } else { - row.FieldsString[formattedKey] = stringValue + if contains(k, consumer.clickhouseForceNumberFields) { + parsedNumber, err := strconv.ParseFloat(stringValue, 64) + if err == nil { + row.FieldsNumber[formattedKey] = parsedNumber + } else { + row.FieldsString[formattedKey] = stringValue + } + } else { + row.FieldsString[formattedKey] = stringValue + } } - } else { - row.FieldsString[formattedKey] = stringValue } } } - } - } - consumer.clickhouseClient.BufferAdd(row) - session.MarkMessage(message, "") + consumer.clickhouseClient.BufferAdd(row) + session.MarkMessage(message, "") + startFlushTime := time.Now() + currentBatchSize := consumer.clickhouseClient.BufferLen() - if consumer.clickhouseClient.BufferLen() >= int(consumer.clickhouseBatchSize) || consumer.lastFlush.Add(consumer.clickhouseFlushInterval).Before(time.Now()) { - flushIntervalMetric.Set(time.Now().Sub(consumer.lastFlush).Seconds()) - batchSizeMetric.Set(float64(consumer.clickhouseClient.BufferLen())) - - err := consumer.clickhouseClient.BufferWrite() - if err != nil { - log.Error(nil, "Could nor write buffer", zap.Error(err)) - } else { - consumer.lastFlush = time.Now() + if currentBatchSize >= int(consumer.clickhouseBatchSize) || consumer.lastFlush.Add(consumer.clickhouseFlushInterval).Before(time.Now()) { + err := consumer.clickhouseClient.BufferWrite() + if err != nil { + metrics.ErrorsTotalMetric.Inc() + log.Error(nil, "Could nor write buffer", zap.Error(err)) + } else { + consumer.lastFlush = time.Now() + metrics.BatchSizeMetric.Observe(float64(currentBatchSize)) + metrics.FlushTimeSecondsMetric.Observe(consumer.lastFlush.Sub(startFlushTime).Seconds()) + } + } } } } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000..1109eb2 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,93 @@ +package metrics + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/kobsio/klogs/pkg/log" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" +) + +var ( + InputRecordsTotalMetric = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "klogs", + Name: "input_records_total", + Help: "Number of received records.", + }) + ErrorsTotalMetric = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "klogs", + Name: "errors_total", + Help: "Number of errors when writing records to ClickHouse", + }) + BatchSizeMetric = promauto.NewSummary(prometheus.SummaryOpts{ + Namespace: "klogs", + Name: "batch_size", + Help: "The number of records which are written to ClickHouse.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) + FlushTimeSecondsMetric = promauto.NewSummary(prometheus.SummaryOpts{ + Namespace: "klogs", + Name: "flush_time_seconds", + Help: "The time needed to write the records in seconds.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) +) + +// Server is the interface of a metrics service, which provides the options to start and stop the underlying http +// server. +type Server interface { + Start() + Stop() +} + +// server implements the Server interface. +type server struct { + *http.Server +} + +// Start starts serving the metrics server. +func (s *server) Start() { + log.Info(nil, "Metrics server started", zap.String("address", s.Addr)) + + if err := s.ListenAndServe(); err != nil { + if err != http.ErrServerClosed { + log.Error(nil, "Metrics server died unexpected", zap.Error(err)) + } + } +} + +// Stop terminates the metrics server gracefully. +func (s *server) Stop() { + log.Debug(nil, "Start shutdown of the metrics server") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := s.Shutdown(ctx) + if err != nil { + log.Error(nil, "Graceful shutdown of the metrics server failed", zap.Error(err)) + } +} + +// New return a new metrics server, which is used to serve Prometheus metrics on the specified address under the +// /metrics path. +func New(address string) Server { + router := http.DefaultServeMux + router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "OK") + }) + router.Handle("/metrics", promhttp.Handler()) + + return &server{ + &http.Server{ + Addr: address, + Handler: router, + }, + } +}