Skip to content

Commit

Permalink
Add username, password and tls config options for Kafka.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed Jul 31, 2020
1 parent d3ad3bc commit 5ce0873
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 2 deletions.
11 changes: 11 additions & 0 deletions cmd/chirpstack-application-server/cmd/configfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,11 @@ id="{{ .ApplicationServer.ID }}"
# Brokers, e.g.: localhost:9092.
brokers=[{{ range $index, $broker := .ApplicationServer.Integration.Kafka.Brokers }}{{ if $index }}, {{ end }}"{{ $broker }}"{{ end }}]
# TLS.
#
# Set this to true when the Kafka client must connect using TLS to the Broker.
tls={{ .ApplicationServer.Integration.TLS }}
# Topic for events.
topic="{{ .ApplicationServer.Integration.Kafka.Topic }}"
Expand All @@ -370,6 +375,12 @@ id="{{ .ApplicationServer.ID }}"
# message. There is no need to parse it from the key.
event_key_template="{{ .ApplicationServer.Integration.Kafka.EventKeyTemplate }}"
# Username (optional).
username="{{ .ApplicationServer.Integration.Kafka.Username }}"
# Password (optional).
password="{{ .ApplicationServer.Integration.Kafka.Password }}"
# PostgreSQL database integration.
[application_server.integration.postgresql]
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,11 @@ type IntegrationAMQPConfig struct {
// IntegrationKafkaConfig holds the Kafka integration configuration.
type IntegrationKafkaConfig struct {
Brokers []string `mapstructure:"brokers"`
TLS bool `mapstructure:"tls"`
Topic string `mapstructure:"topic"`
EventKeyTemplate string `mapstructure:"event_key_template"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
}

// AzurePublishMode defines the publish-mode type.
Expand Down
37 changes: 35 additions & 2 deletions internal/integration/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package kafka
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"text/template"
"time"

"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"

pb "github.com/brocaar/chirpstack-api/go/v3/as/integration"
"github.com/brocaar/chirpstack-application-server/internal/config"
Expand All @@ -30,11 +33,41 @@ type Integration struct {

// New creates a new Kafka integration.
func New(m marshaler.Type, conf config.IntegrationKafkaConfig) (*Integration, error) {
w := kafka.NewWriter(kafka.WriterConfig{
wc := kafka.WriterConfig{
Brokers: conf.Brokers,
Topic: conf.Topic,
Balancer: &kafka.LeastBytes{},
})

// Equal to kafka.DefaultDialer.
// We do not want to use kafka.DefaultDialer itself, as we might modify
// it below to setup SASLMechanism.
Dialer: &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
},
}

if conf.TLS {
wc.Dialer.TLS = &tls.Config{}
}

if conf.Username != "" || conf.Password != "" {
fmt.Println("username", conf.Username)
fmt.Println("password", conf.Password)
wc.Dialer.SASLMechanism = plain.Mechanism{
Username: conf.Username,
Password: conf.Password,
}
}

log.WithFields(log.Fields{
"brokers": conf.Brokers,
"topic": conf.Topic,
}).Info("integration/kafka: connecting to kafka broker(s)")

w := kafka.NewWriter(wc)

log.Info("integration/kafka: connected to kafka broker(s)")

kt, err := template.New("key").Parse(conf.EventKeyTemplate)
if err != nil {
Expand Down

0 comments on commit 5ce0873

Please sign in to comment.