From 5ce0873de7caf31ae58ecc140ec99810b4825ade Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Fri, 31 Jul 2020 10:18:00 +0200 Subject: [PATCH] Add username, password and tls config options for Kafka. --- .../cmd/configfile.go | 11 ++++++ internal/config/config.go | 3 ++ internal/integration/kafka/kafka.go | 37 ++++++++++++++++++- 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/cmd/chirpstack-application-server/cmd/configfile.go b/cmd/chirpstack-application-server/cmd/configfile.go index 08aa822a8..2689aa14f 100644 --- a/cmd/chirpstack-application-server/cmd/configfile.go +++ b/cmd/chirpstack-application-server/cmd/configfile.go @@ -359,6 +359,11 @@ id="{{ .ApplicationServer.ID }}" # Brokers, e.g.: localhost:9092. brokers=[{{ range $index, $broker := .ApplicationServer.Integration.Kafka.Brokers }}{{ if $index }}, {{ end }}"{{ $broker }}"{{ end }}] + # TLS. + # + # Set this to true when the Kafka client must connect using TLS to the Broker. + tls={{ .ApplicationServer.Integration.TLS }} + # Topic for events. topic="{{ .ApplicationServer.Integration.Kafka.Topic }}" @@ -370,6 +375,12 @@ id="{{ .ApplicationServer.ID }}" # message. There is no need to parse it from the key. event_key_template="{{ .ApplicationServer.Integration.Kafka.EventKeyTemplate }}" + # Username (optional). + username="{{ .ApplicationServer.Integration.Kafka.Username }}" + + # Password (optional). + password="{{ .ApplicationServer.Integration.Kafka.Password }}" + # PostgreSQL database integration. [application_server.integration.postgresql] diff --git a/internal/config/config.go b/internal/config/config.go index 85f792a10..404541203 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -221,8 +221,11 @@ type IntegrationAMQPConfig struct { // IntegrationKafkaConfig holds the Kafka integration configuration. type IntegrationKafkaConfig struct { Brokers []string `mapstructure:"brokers"` + TLS bool `mapstructure:"tls"` Topic string `mapstructure:"topic"` EventKeyTemplate string `mapstructure:"event_key_template"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` } // AzurePublishMode defines the publish-mode type. diff --git a/internal/integration/kafka/kafka.go b/internal/integration/kafka/kafka.go index b906a9cdc..76a6aeb46 100644 --- a/internal/integration/kafka/kafka.go +++ b/internal/integration/kafka/kafka.go @@ -3,14 +3,17 @@ package kafka import ( "bytes" "context" + "crypto/tls" "fmt" "text/template" + "time" "github.com/golang/protobuf/proto" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" pb "github.com/brocaar/chirpstack-api/go/v3/as/integration" "github.com/brocaar/chirpstack-application-server/internal/config" @@ -30,11 +33,41 @@ type Integration struct { // New creates a new Kafka integration. func New(m marshaler.Type, conf config.IntegrationKafkaConfig) (*Integration, error) { - w := kafka.NewWriter(kafka.WriterConfig{ + wc := kafka.WriterConfig{ Brokers: conf.Brokers, Topic: conf.Topic, Balancer: &kafka.LeastBytes{}, - }) + + // Equal to kafka.DefaultDialer. + // We do not want to use kafka.DefaultDialer itself, as we might modify + // it below to setup SASLMechanism. + Dialer: &kafka.Dialer{ + Timeout: 10 * time.Second, + DualStack: true, + }, + } + + if conf.TLS { + wc.Dialer.TLS = &tls.Config{} + } + + if conf.Username != "" || conf.Password != "" { + fmt.Println("username", conf.Username) + fmt.Println("password", conf.Password) + wc.Dialer.SASLMechanism = plain.Mechanism{ + Username: conf.Username, + Password: conf.Password, + } + } + + log.WithFields(log.Fields{ + "brokers": conf.Brokers, + "topic": conf.Topic, + }).Info("integration/kafka: connecting to kafka broker(s)") + + w := kafka.NewWriter(wc) + + log.Info("integration/kafka: connected to kafka broker(s)") kt, err := template.New("key").Parse(conf.EventKeyTemplate) if err != nil {