diff --git a/config/config_test.go b/config/config_test.go index acbb038a7..7dbae6e86 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -77,18 +77,20 @@ func TestComplete(t *testing.T) { }, Ingest: IngestConfiguration{ Kafka: KafkaIngestConfiguration{ - Broker: "127.0.0.1:9092", - SecurityProtocol: "SASL_SSL", - SaslMechanisms: "PLAIN", - SaslUsername: "user", - SaslPassword: "pass", + KafkaConfiguration: KafkaConfiguration{ + Broker: "127.0.0.1:9092", + SecurityProtocol: "SASL_SSL", + SaslMechanisms: "PLAIN", + SaslUsername: "user", + SaslPassword: "pass", + + BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny, + TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute), + StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second), + SocketKeepAliveEnabled: true, + }, Partitions: 1, EventsTopicTemplate: "om_%s_events", - - BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny, - TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute), - StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second), - SocketKeepAliveEnabled: true, }, }, Aggregation: AggregationConfiguration{ diff --git a/config/ingest.go b/config/ingest.go index 6ab998a11..5c7b06306 100644 --- a/config/ingest.go +++ b/config/ingest.go @@ -26,13 +26,30 @@ func (c IngestConfiguration) Validate() error { } type KafkaIngestConfiguration struct { - Broker string - SecurityProtocol string - SaslMechanisms string - SaslUsername string - SaslPassword string + KafkaConfiguration `mapstructure:",squash"` + Partitions int EventsTopicTemplate string +} + +// Validate validates the configuration. +func (c KafkaIngestConfiguration) Validate() error { + if c.EventsTopicTemplate == "" { + return errors.New("events topic template is required") + } + + if err := c.KafkaConfiguration.Validate(); err != nil { + return err + } + return nil +} + +type KafkaConfiguration struct { + Broker string + SecurityProtocol string + SaslMechanisms string + SaslUsername string + SaslPassword string StatsInterval pkgkafka.TimeDurationMilliSeconds @@ -49,8 +66,25 @@ type KafkaIngestConfiguration struct { TopicMetadataRefreshInterval pkgkafka.TimeDurationMilliSeconds } +func (c KafkaConfiguration) Validate() error { + + if c.Broker == "" { + return errors.New("broker is required") + } + + if c.StatsInterval > 0 && c.StatsInterval.Duration() < 5*time.Second { + return errors.New("StatsInterval must be >=5s") + } + + if c.TopicMetadataRefreshInterval > 0 && c.TopicMetadataRefreshInterval.Duration() < 10*time.Second { + return errors.New("topic metadata refresh interval must be >=10s") + } + + return nil +} + // CreateKafkaConfig creates a Kafka config map. -func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap { +func (c KafkaConfiguration) CreateKafkaConfig() kafka.ConfigMap { config := kafka.ConfigMap{ "bootstrap.servers": c.Broker, @@ -103,27 +137,6 @@ func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap { return config } -// Validate validates the configuration. -func (c KafkaIngestConfiguration) Validate() error { - if c.Broker == "" { - return errors.New("broker is required") - } - - if c.EventsTopicTemplate == "" { - return errors.New("events topic template is required") - } - - if c.StatsInterval > 0 && c.StatsInterval.Duration() < 5*time.Second { - return errors.New("StatsInterval must be >=5s") - } - - if c.TopicMetadataRefreshInterval > 0 && c.TopicMetadataRefreshInterval.Duration() < 10*time.Second { - return errors.New("topic metadata refresh interval must be >=10s") - } - - return nil -} - // Configure configures some defaults in the Viper instance. func ConfigureIngest(v *viper.Viper) { v.SetDefault("ingest.kafka.broker", "127.0.0.1:29092") diff --git a/config/ingest_test.go b/config/ingest_test.go index 53fbd9864..21aa8d75d 100644 --- a/config/ingest_test.go +++ b/config/ingest_test.go @@ -15,12 +15,12 @@ func TestKafkaIngestConfiguration(t *testing.T) { tests := []struct { Name string - KafkaConfig KafkaIngestConfiguration + KafkaConfig KafkaConfiguration ExpectedKafkaConfigMap kafka.ConfigMap }{ { Name: "All", - KafkaConfig: KafkaIngestConfiguration{ + KafkaConfig: KafkaConfiguration{ Broker: "127.0.0.1:29092", SecurityProtocol: "SASL_SSL", SaslMechanisms: "PLAIN", @@ -47,7 +47,7 @@ func TestKafkaIngestConfiguration(t *testing.T) { }, { Name: "Basic", - KafkaConfig: KafkaIngestConfiguration{ + KafkaConfig: KafkaConfiguration{ Broker: "127.0.0.1:29092", }, ExpectedKafkaConfigMap: kafka.ConfigMap{