Skip to content

Commit

Permalink
Merge pull request #1010 from openmeterio/feat-kafka-config
Browse files Browse the repository at this point in the history
feat: add Kafka client configs
  • Loading branch information
chrisgacsal authored Jun 14, 2024
2 parents c038fbe + 11cff8d commit 7637314
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 5 deletions.
7 changes: 7 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ telemetry:
# # To enable stats reporting set this value to >=5s.
# # Setting this value to 0 makes reporting explicitly disabled.
# statsInterval: 5s
# # Set IP address family used for communicating with Kafka cluster
# brokerAddressFamily: v4
# # Use this configuration parameter to define how frequently the local metadata cache needs to be updated.
# # It cannot be lower than 10 seconds.
# topicMetadataRefreshInterval: 1m
# # Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection.
# socketKeepAliveEnabled: true

# dedupe:
# enabled: true
Expand Down
6 changes: 6 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"

pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
"github.com/openmeterio/openmeter/pkg/models"
)

Expand Down Expand Up @@ -83,6 +84,11 @@ func TestComplete(t *testing.T) {
SaslPassword: "pass",
Partitions: 1,
EventsTopicTemplate: "om_%s_events",

BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny,
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second),
SocketKeepAliveEnabled: true,
},
},
Aggregation: AggregationConfiguration{
Expand Down
43 changes: 38 additions & 5 deletions config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/spf13/viper"

pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
)

type IngestConfiguration struct {
Expand All @@ -32,7 +34,19 @@ type KafkaIngestConfiguration struct {
Partitions int
EventsTopicTemplate string

StatsInterval time.Duration
StatsInterval pkgkafka.TimeDurationMilliSeconds

// BrokerAddressFamily defines the IP address family to be used for network communication with Kafka cluster
BrokerAddressFamily pkgkafka.BrokerAddressFamily
// SocketKeepAliveEnable defines if TCP socket keep-alive is enabled to prevent closing idle connections
// by Kafka brokers.
SocketKeepAliveEnabled bool
// TopicMetadataRefreshInterval defines how frequently the Kafka client needs to fetch metadata information
// (brokers, topic, partitions, etc) from the Kafka cluster.
// The 5 minutes default value is appropriate for mostly static Kafka clusters, but needs to be lowered
// in case of large clusters where changes are more frequent.
// This value must not be set to value lower than 10s.
TopicMetadataRefreshInterval pkgkafka.TimeDurationMilliSeconds
}

// CreateKafkaConfig creates a Kafka config map.
Expand All @@ -47,8 +61,10 @@ func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap {
// This is needed when using localhost brokers on OSX,
// since the OSX resolver will return the IPv6 addresses first.
// See: https://github.com/openmeterio/openmeter/issues/321
if strings.Contains(c.Broker, "localhost") || strings.Contains(c.Broker, "127.0.0.1") {
config["broker.address.family"] = "v4"
if c.BrokerAddressFamily != "" {
config["broker.address.family"] = c.BrokerAddressFamily
} else if strings.Contains(c.Broker, "localhost") || strings.Contains(c.Broker, "127.0.0.1") {
config["broker.address.family"] = pkgkafka.BrokerAddressFamilyIPv4
}

if c.SecurityProtocol != "" {
Expand All @@ -68,7 +84,20 @@ func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap {
}

if c.StatsInterval > 0 {
config["statistics.interval.ms"] = int(c.StatsInterval.Milliseconds())
config["statistics.interval.ms"] = c.StatsInterval
}

if c.SocketKeepAliveEnabled {
config["socket.keepalive.enable"] = c.SocketKeepAliveEnabled
}

// The `topic.metadata.refresh.interval.ms` defines the frequency the Kafka client needs to retrieve metadata
// from Kafka cluster. While `metadata.max.age.ms` defines the interval after the metadata cache maintained
// on client side becomes invalid. Setting the former will automatically adjust the value of the latter to avoid
// misconfiguration where the entries in metadata cache are evicted prior metadata refresh.
if c.TopicMetadataRefreshInterval > 0 {
config["topic.metadata.refresh.interval.ms"] = c.TopicMetadataRefreshInterval
config["metadata.max.age.ms"] = 3 * c.TopicMetadataRefreshInterval
}

return config
Expand All @@ -84,10 +113,14 @@ func (c KafkaIngestConfiguration) Validate() error {
return errors.New("events topic template is required")
}

if c.StatsInterval > 0 && c.StatsInterval < 5*time.Second {
if c.StatsInterval > 0 && c.StatsInterval.Duration() < 5*time.Second {
return errors.New("StatsInterval must be >=5s")
}

if c.TopicMetadataRefreshInterval > 0 && c.TopicMetadataRefreshInterval.Duration() < 10*time.Second {
return errors.New("topic metadata refresh interval must be >=10s")
}

return nil
}

Expand Down
67 changes: 67 additions & 0 deletions config/ingest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package config

import (
"testing"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stretchr/testify/assert"

pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
)

func TestKafkaIngestConfiguration(t *testing.T) {

tests := []struct {
Name string

KafkaConfig KafkaIngestConfiguration
ExpectedKafkaConfigMap kafka.ConfigMap
}{
{
Name: "All",
KafkaConfig: KafkaIngestConfiguration{
Broker: "127.0.0.1:29092",
SecurityProtocol: "SASL_SSL",
SaslMechanisms: "PLAIN",
SaslUsername: "user",
SaslPassword: "pass",
StatsInterval: pkgkafka.TimeDurationMilliSeconds(10 * time.Second),
BrokerAddressFamily: "v6",
SocketKeepAliveEnabled: true,
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
},
ExpectedKafkaConfigMap: kafka.ConfigMap{
"bootstrap.servers": "127.0.0.1:29092",
"broker.address.family": pkgkafka.BrokerAddressFamilyIPv6,
"go.logs.channel.enable": true,
"metadata.max.age.ms": pkgkafka.TimeDurationMilliSeconds(3 * time.Minute),
"sasl.mechanism": "PLAIN",
"sasl.password": "pass",
"sasl.username": "user",
"security.protocol": "SASL_SSL",
"socket.keepalive.enable": true,
"statistics.interval.ms": pkgkafka.TimeDurationMilliSeconds(10 * time.Second),
"topic.metadata.refresh.interval.ms": pkgkafka.TimeDurationMilliSeconds(time.Minute),
},
},
{
Name: "Basic",
KafkaConfig: KafkaIngestConfiguration{
Broker: "127.0.0.1:29092",
},
ExpectedKafkaConfigMap: kafka.ConfigMap{
"bootstrap.servers": "127.0.0.1:29092",
"broker.address.family": pkgkafka.BrokerAddressFamilyIPv4,
"go.logs.channel.enable": true,
},
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
config := test.KafkaConfig.CreateKafkaConfig()
assert.Equal(t, test.ExpectedKafkaConfigMap, config)
})
}
}
4 changes: 4 additions & 0 deletions config/testdata/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ ingest:
saslUsername: user
saslPassword: pass
partitions: 1
statsInterval: 5s
brokerAddressFamily: any
socketKeepAliveEnabled: true
topicMetadataRefreshInterval: 1m

aggregation:
clickhouse:
Expand Down
76 changes: 76 additions & 0 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package kafka

import (
"encoding"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
)

type configValue interface {
fmt.Stringer
encoding.TextUnmarshaler
json.Unmarshaler
}

const (
BrokerAddressFamilyAny BrokerAddressFamily = "any"
BrokerAddressFamilyIPv4 BrokerAddressFamily = "v4"
BrokerAddressFamilyIPv6 BrokerAddressFamily = "v6"
)

var _ configValue = (*BrokerAddressFamily)(nil)

type BrokerAddressFamily string

func (s *BrokerAddressFamily) UnmarshalText(text []byte) error {
switch strings.ToLower(strings.TrimSpace(string(text))) {
case "v4":
*s = BrokerAddressFamilyIPv4
case "v6":
*s = BrokerAddressFamilyIPv6
case "any":
*s = BrokerAddressFamilyAny
default:
return fmt.Errorf("invalid value broker family address: %s", text)
}

return nil
}

func (s *BrokerAddressFamily) UnmarshalJSON(data []byte) error {
return s.UnmarshalText(data)
}

func (s BrokerAddressFamily) String() string {
return string(s)
}

var _ configValue = (*TimeDurationMilliSeconds)(nil)

type TimeDurationMilliSeconds time.Duration

func (d *TimeDurationMilliSeconds) UnmarshalText(text []byte) error {
v, err := time.ParseDuration(strings.TrimSpace(string(text)))
if err != nil {
return fmt.Errorf("failed to parse time duration: %w", err)
}

*d = TimeDurationMilliSeconds(v)

return nil
}

func (d *TimeDurationMilliSeconds) UnmarshalJSON(data []byte) error {
return d.UnmarshalText(data)
}

func (d TimeDurationMilliSeconds) Duration() time.Duration {
return time.Duration(d)
}

func (d TimeDurationMilliSeconds) String() string {
return strconv.Itoa(int(time.Duration(d).Milliseconds()))
}
117 changes: 117 additions & 0 deletions pkg/kafka/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package kafka

import (
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestBrokerAddressFamily(t *testing.T) {

tests := []struct {
Name string

Value string
ExpectedError error
ExplectedValue BrokerAddressFamily
}{
{
Name: "Any",
Value: "any",
ExpectedError: nil,
ExplectedValue: BrokerAddressFamilyAny,
},
{
Name: "IPv4",
Value: "v4",
ExpectedError: nil,
ExplectedValue: BrokerAddressFamilyIPv4,
},
{
Name: "IPv6",
Value: "v6",
ExpectedError: nil,
ExplectedValue: BrokerAddressFamilyIPv6,
},
{
Name: "Invalid",
Value: "invalid",
ExpectedError: errors.New("invalid value broker family address: invalid"),
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
var family BrokerAddressFamily

err := family.UnmarshalText([]byte(test.Value))
assert.Equal(t, test.ExpectedError, err)
if err == nil {
assert.Equal(t, test.ExplectedValue, family)
}

err = family.UnmarshalJSON([]byte(test.Value))
assert.Equal(t, test.ExpectedError, err)
if err == nil {
assert.Equal(t, test.ExplectedValue, family)
}
})
}
}

func TestTimeDurationMilliSeconds(t *testing.T) {

tests := []struct {
Name string

Value string
ExpectedError error
ExplectedValue TimeDurationMilliSeconds
ExpectedString string
ExpectedDuration time.Duration
}{
{
Name: "Duration",
Value: "6s",
ExpectedError: nil,
ExplectedValue: TimeDurationMilliSeconds(6 * time.Second),
ExpectedString: "6000",
ExpectedDuration: 6 * time.Second,
},
{
Name: "Invalid",
Value: "10000",
ExpectedError: fmt.Errorf("failed to parse time duration: %w", errors.New("time: missing unit in duration \"10000\"")),
},
{
Name: "Invalid",
Value: "invalid",
ExpectedError: fmt.Errorf("failed to parse time duration: %w", errors.New("time: invalid duration \"invalid\"")),
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
var timeMs TimeDurationMilliSeconds

err := timeMs.UnmarshalText([]byte(test.Value))
assert.Equal(t, test.ExpectedError, err)
if err == nil {
assert.Equal(t, test.ExplectedValue, timeMs)
assert.Equal(t, test.ExpectedString, timeMs.String())
assert.Equal(t, test.ExpectedDuration, timeMs.Duration())
}

err = timeMs.UnmarshalJSON([]byte(test.Value))
assert.Equal(t, test.ExpectedError, err)
if err == nil {
assert.Equal(t, test.ExplectedValue, timeMs)
assert.Equal(t, test.ExpectedString, timeMs.String())
assert.Equal(t, test.ExpectedDuration, timeMs.Duration())
}
})
}
}

0 comments on commit 7637314

Please sign in to comment.