Skip to content

Commit

Permalink
Make timestamp key configurable (#8)
Browse files Browse the repository at this point in the history
It is now possible to configure the timestamp key for the Kafka to
ClickHouse ingestor. For that a new command-line flag
"kafka.timestamp-key" and a new environment variable
"KAFKA_TIMESTAMP_KEY" was added, which allows to customize the key,
where we should look for the correct timestamp. The default value for
this option is "@timestamp".
  • Loading branch information
ricoberger authored Oct 3, 2021
1 parent ec28ddf commit 93617cd
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
- [#5](https://github.com/kobsio/fluent-bit-clickhouse/pull/5): Add support for Kafka, so that Fluent Bit writes all logs to Kafka and we then write the logs from Kafka to ClickHouse.
- [#6](https://github.com/kobsio/fluent-bit-clickhouse/pull/6): Use consistent naming.
- [#7](https://github.com/kobsio/fluent-bit-clickhouse/pull/7): Adjust documentation.
- [#8](https://github.com/kobsio/fluent-bit-clickhouse/pull/8): Make timestamp key configurable.

## [v0.4.0](https://github.com/kobsio/fluent-bit-clickhouse/releases/tag/v0.4.0) (2021-09-08)

Expand Down
2 changes: 1 addition & 1 deletion cmd/fluent-bit-kafka-clickhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ We are using [kind](https://kind.sigs.k8s.io/docs/user/quick-start/) for local d
Once the cluster is running we can build and push the Docker image for Fluent Bit:

```sh
docker build -f cmd/kafka/Dockerfile -t localhost:5000/fluent-bit-clickhouse:latest-kafka .
docker build -f cmd/fluent-bit-kafka-clickhouse/Dockerfile -t localhost:5000/fluent-bit-clickhouse:latest-kafka .
docker push localhost:5000/fluent-bit-clickhouse:latest-kafka

# To run the Docker image locally, the following command can be used:
Expand Down
9 changes: 8 additions & 1 deletion cmd/fluent-bit-kafka-clickhouse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
kafkaGroup string
kafkaVersion string
kafkaTopics string
kafkaTimestampKey string
logFormat string
logLevel string
showVersion bool
Expand Down Expand Up @@ -107,6 +108,11 @@ func init() {
defaultKafkaTopics = os.Getenv("KAFKA_TOPICS")
}

defaultKafkaTimestampKey := "@timestamp"
if os.Getenv("KAFKA_TIMESTAMP_KEY") != "" {
defaultKafkaTimestampKey = os.Getenv("KAFKA_TIMESTAMP_KEY")
}

defaultLogFormat := "plain"
if os.Getenv("LOG_FORMAT") != "" {
defaultLogFormat = os.Getenv("LOG_FORMAT")
Expand All @@ -130,6 +136,7 @@ func init() {
flag.StringVar(&kafkaGroup, "kafka.group", defaultKafkaGroup, "Kafka consumer group definition")
flag.StringVar(&kafkaVersion, "kafka.version", defaultKafkaVersion, "Kafka cluster version")
flag.StringVar(&kafkaTopics, "kafka.topics", defaultKafkaTopics, "Kafka topics to be consumed, as a comma separated list")
flag.StringVar(&kafkaTimestampKey, "kafka.timestamp-key", defaultKafkaTimestampKey, "JSON key where the record timestamp is stored")

flag.StringVar(&logFormat, "log.format", defaultLogFormat, "Set the output format of the logs. Must be \"plain\" or \"json\".")
flag.StringVar(&logLevel, "log.level", defaultLogLevel, "Set the log level. Must be \"trace\", \"debug\", \"info\", \"warn\", \"error\", \"fatal\" or \"panic\".")
Expand Down Expand Up @@ -206,6 +213,6 @@ func main() {
log.WithError(err).Fatalf("could not create ClickHouse client")
}

kafka.Run(kafkaBrokers, kafkaGroup, kafkaVersion, kafkaTopics, clickhouseBatchSize, clickhouseFlushInterval, client)
kafka.Run(kafkaBrokers, kafkaGroup, kafkaVersion, kafkaTopics, kafkaTimestampKey, clickhouseBatchSize, clickhouseFlushInterval, client)
server.Shutdown(context.Background())
}
3 changes: 2 additions & 1 deletion pkg/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
// Consumer represents a Sarama consumer group consumer.
type Consumer struct {
ready chan bool
timestampKey string
lastFlush time.Time
clickhouseBatchSize int64
clickhouseFlushInterval time.Duration
Expand Down Expand Up @@ -84,7 +85,7 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
}

switch k {
case "@timestamp":
case consumer.timestampKey:
parsedTime, err := strconv.ParseFloat(value, 64)
if err != nil {
log.WithError(err).Warnf("could not parse timestamp")
Expand Down
3 changes: 2 additions & 1 deletion pkg/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var (

// Run creates a new client for the given Kafka configuration and listens for incomming messages. These messages are
// then written to ClickHouse when the batch size or flush interval is over.
func Run(kafkaBrokers, kafkaGroup, kafkaVersion, kafkaTopics string, clickhouseBatchSize int64, clickhouseFlushInterval time.Duration, clickhouseClient *clickhouse.Client) {
func Run(kafkaBrokers, kafkaGroup, kafkaVersion, kafkaTopics, kafkaTimestampKey string, clickhouseBatchSize int64, clickhouseFlushInterval time.Duration, clickhouseClient *clickhouse.Client) {
version, err := sarama.ParseKafkaVersion(kafkaVersion)
if err != nil {
log.WithError(err).Fatalf("error parsing Kafka version")
Expand All @@ -35,6 +35,7 @@ func Run(kafkaBrokers, kafkaGroup, kafkaVersion, kafkaTopics string, clickhouseB
// Create a new consumer, which handles all incomming messages from Kafka and writes the messages to ClickHouse.
consumer := Consumer{
ready: make(chan bool),
timestampKey: kafkaTimestampKey,
lastFlush: time.Now(),
clickhouseBatchSize: clickhouseBatchSize,
clickhouseFlushInterval: clickhouseFlushInterval,
Expand Down

0 comments on commit 93617cd

Please sign in to comment.