Skip to content

Commit

Permalink
chore: split ingest and kafka specific config
Browse files Browse the repository at this point in the history
This allows us to reuse the Kafka configuration object in other
parts of the system/other configuration files.
  • Loading branch information
turip committed Jul 2, 2024
1 parent 64a7017 commit 33f4434
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 40 deletions.
22 changes: 12 additions & 10 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,20 @@ func TestComplete(t *testing.T) {
},
Ingest: IngestConfiguration{
Kafka: KafkaIngestConfiguration{
Broker: "127.0.0.1:9092",
SecurityProtocol: "SASL_SSL",
SaslMechanisms: "PLAIN",
SaslUsername: "user",
SaslPassword: "pass",
KafkaConfiguration: KafkaConfiguration{
Broker: "127.0.0.1:9092",
SecurityProtocol: "SASL_SSL",
SaslMechanisms: "PLAIN",
SaslUsername: "user",
SaslPassword: "pass",

BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny,
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second),
SocketKeepAliveEnabled: true,
},
Partitions: 1,
EventsTopicTemplate: "om_%s_events",

BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny,
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second),
SocketKeepAliveEnabled: true,
},
},
Aggregation: AggregationConfiguration{
Expand Down
67 changes: 40 additions & 27 deletions config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,30 @@ func (c IngestConfiguration) Validate() error {
}

type KafkaIngestConfiguration struct {
Broker string
SecurityProtocol string
SaslMechanisms string
SaslUsername string
SaslPassword string
KafkaConfiguration `mapstructure:",squash"`

Partitions int
EventsTopicTemplate string
}

// Validate validates the configuration.
func (c KafkaIngestConfiguration) Validate() error {
if c.EventsTopicTemplate == "" {
return errors.New("events topic template is required")
}

if err := c.KafkaConfiguration.Validate(); err != nil {
return err
}
return nil
}

type KafkaConfiguration struct {
Broker string
SecurityProtocol string
SaslMechanisms string
SaslUsername string
SaslPassword string

StatsInterval pkgkafka.TimeDurationMilliSeconds

Expand All @@ -49,8 +66,25 @@ type KafkaIngestConfiguration struct {
TopicMetadataRefreshInterval pkgkafka.TimeDurationMilliSeconds
}

func (c KafkaConfiguration) Validate() error {

if c.Broker == "" {
return errors.New("broker is required")
}

if c.StatsInterval > 0 && c.StatsInterval.Duration() < 5*time.Second {
return errors.New("StatsInterval must be >=5s")
}

if c.TopicMetadataRefreshInterval > 0 && c.TopicMetadataRefreshInterval.Duration() < 10*time.Second {
return errors.New("topic metadata refresh interval must be >=10s")
}

return nil
}

// CreateKafkaConfig creates a Kafka config map.
func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap {
func (c KafkaConfiguration) CreateKafkaConfig() kafka.ConfigMap {
config := kafka.ConfigMap{
"bootstrap.servers": c.Broker,

Expand Down Expand Up @@ -103,27 +137,6 @@ func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap {
return config
}

// Validate validates the configuration.
func (c KafkaIngestConfiguration) Validate() error {
if c.Broker == "" {
return errors.New("broker is required")
}

if c.EventsTopicTemplate == "" {
return errors.New("events topic template is required")
}

if c.StatsInterval > 0 && c.StatsInterval.Duration() < 5*time.Second {
return errors.New("StatsInterval must be >=5s")
}

if c.TopicMetadataRefreshInterval > 0 && c.TopicMetadataRefreshInterval.Duration() < 10*time.Second {
return errors.New("topic metadata refresh interval must be >=10s")
}

return nil
}

// Configure configures some defaults in the Viper instance.
func ConfigureIngest(v *viper.Viper) {
v.SetDefault("ingest.kafka.broker", "127.0.0.1:29092")
Expand Down
6 changes: 3 additions & 3 deletions config/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ func TestKafkaIngestConfiguration(t *testing.T) {
tests := []struct {
Name string

KafkaConfig KafkaIngestConfiguration
KafkaConfig KafkaConfiguration
ExpectedKafkaConfigMap kafka.ConfigMap
}{
{
Name: "All",
KafkaConfig: KafkaIngestConfiguration{
KafkaConfig: KafkaConfiguration{
Broker: "127.0.0.1:29092",
SecurityProtocol: "SASL_SSL",
SaslMechanisms: "PLAIN",
Expand All @@ -47,7 +47,7 @@ func TestKafkaIngestConfiguration(t *testing.T) {
},
{
Name: "Basic",
KafkaConfig: KafkaIngestConfiguration{
KafkaConfig: KafkaConfiguration{
Broker: "127.0.0.1:29092",
},
ExpectedKafkaConfigMap: kafka.ConfigMap{
Expand Down

0 comments on commit 33f4434

Please sign in to comment.