Skip to content

Commit

Permalink
Merge pull request #1138 from openmeterio/chore/split-kafka-and-ingre…
Browse files Browse the repository at this point in the history
…ss-config

refactor: split ingest and kafka specific config
  • Loading branch information
turip authored Jul 2, 2024
2 parents 93a3e4c + 33f4434 commit 56cc3fc
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 40 deletions.
22 changes: 12 additions & 10 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,20 @@ func TestComplete(t *testing.T) {
},
Ingest: IngestConfiguration{
Kafka: KafkaIngestConfiguration{
Broker: "127.0.0.1:9092",
SecurityProtocol: "SASL_SSL",
SaslMechanisms: "PLAIN",
SaslUsername: "user",
SaslPassword: "pass",
KafkaConfiguration: KafkaConfiguration{
Broker: "127.0.0.1:9092",
SecurityProtocol: "SASL_SSL",
SaslMechanisms: "PLAIN",
SaslUsername: "user",
SaslPassword: "pass",

BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny,
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second),
SocketKeepAliveEnabled: true,
},
Partitions: 1,
EventsTopicTemplate: "om_%s_events",

BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny,
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second),
SocketKeepAliveEnabled: true,
},
},
Aggregation: AggregationConfiguration{
Expand Down
67 changes: 40 additions & 27 deletions config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,30 @@ func (c IngestConfiguration) Validate() error {
}

type KafkaIngestConfiguration struct {
Broker string
SecurityProtocol string
SaslMechanisms string
SaslUsername string
SaslPassword string
KafkaConfiguration `mapstructure:",squash"`

Partitions int
EventsTopicTemplate string
}

// Validate validates the configuration.
func (c KafkaIngestConfiguration) Validate() error {
if c.EventsTopicTemplate == "" {
return errors.New("events topic template is required")
}

if err := c.KafkaConfiguration.Validate(); err != nil {
return err
}
return nil
}

type KafkaConfiguration struct {
Broker string
SecurityProtocol string
SaslMechanisms string
SaslUsername string
SaslPassword string

StatsInterval pkgkafka.TimeDurationMilliSeconds

Expand All @@ -49,8 +66,25 @@ type KafkaIngestConfiguration struct {
TopicMetadataRefreshInterval pkgkafka.TimeDurationMilliSeconds
}

func (c KafkaConfiguration) Validate() error {

if c.Broker == "" {
return errors.New("broker is required")
}

if c.StatsInterval > 0 && c.StatsInterval.Duration() < 5*time.Second {
return errors.New("StatsInterval must be >=5s")
}

if c.TopicMetadataRefreshInterval > 0 && c.TopicMetadataRefreshInterval.Duration() < 10*time.Second {
return errors.New("topic metadata refresh interval must be >=10s")
}

return nil
}

// CreateKafkaConfig creates a Kafka config map.
func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap {
func (c KafkaConfiguration) CreateKafkaConfig() kafka.ConfigMap {
config := kafka.ConfigMap{
"bootstrap.servers": c.Broker,

Expand Down Expand Up @@ -103,27 +137,6 @@ func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap {
return config
}

// Validate validates the configuration.
func (c KafkaIngestConfiguration) Validate() error {
if c.Broker == "" {
return errors.New("broker is required")
}

if c.EventsTopicTemplate == "" {
return errors.New("events topic template is required")
}

if c.StatsInterval > 0 && c.StatsInterval.Duration() < 5*time.Second {
return errors.New("StatsInterval must be >=5s")
}

if c.TopicMetadataRefreshInterval > 0 && c.TopicMetadataRefreshInterval.Duration() < 10*time.Second {
return errors.New("topic metadata refresh interval must be >=10s")
}

return nil
}

// Configure configures some defaults in the Viper instance.
func ConfigureIngest(v *viper.Viper) {
v.SetDefault("ingest.kafka.broker", "127.0.0.1:29092")
Expand Down
6 changes: 3 additions & 3 deletions config/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ func TestKafkaIngestConfiguration(t *testing.T) {
tests := []struct {
Name string

KafkaConfig KafkaIngestConfiguration
KafkaConfig KafkaConfiguration
ExpectedKafkaConfigMap kafka.ConfigMap
}{
{
Name: "All",
KafkaConfig: KafkaIngestConfiguration{
KafkaConfig: KafkaConfiguration{
Broker: "127.0.0.1:29092",
SecurityProtocol: "SASL_SSL",
SaslMechanisms: "PLAIN",
Expand All @@ -47,7 +47,7 @@ func TestKafkaIngestConfiguration(t *testing.T) {
},
{
Name: "Basic",
KafkaConfig: KafkaIngestConfiguration{
KafkaConfig: KafkaConfiguration{
Broker: "127.0.0.1:29092",
},
ExpectedKafkaConfigMap: kafka.ConfigMap{
Expand Down

0 comments on commit 56cc3fc

Please sign in to comment.