-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathconfig.go
349 lines (304 loc) · 13.1 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
/*
Copyright © 2021, 2022, 2023 Red Hat, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
// This source file contains definition of data type named ConfigStruct that
// represents configuration of Notification Writer service. This source file
// also contains function named LoadConfiguration that can be used to load
// configuration from provided configuration file and/or from environment
// variables. Additionally several specific functions named
// GetStorageConfiguration, GetLoggingConfiguration, and GetBrokerConfiguration
// are to be used to return specific configuration options.
// Generated documentation is available at:
// https://pkg.go.dev/github.com/RedHatInsights/ccx-notification-writer/
//
// Documentation in literate-programming-style is available at:
// https://redhatinsights.github.io/ccx-notification-writer/packages/config.html
// Default name of configuration file is config.toml
// It can be changed via environment variable NOTIFICATION_WRITER_CONFIG_FILE
// An example of configuration file that can be used in devel environment:
//
// [broker]
// address = "kafka:29092"
// security_protocol = "PLAINTEXT"
// sasl_mechanism = "not-used"
// sasl_username = "not-used"
// sasl_password = "not-used"
// topic = "ccx.ocp.results"
// group = "aggregator"
// enabled = true
//
// [storage]
// db_driver = "postgres"
// pg_username = "user"
// pg_password = "password"
// pg_host = "localhost"
// pg_port = 5432
// pg_db_name = "notification"
// pg_params = "sslmode=disable"
// log_sql_queries = true
//
// [logging]
// debug = true
// log_level = ""
//
// [metrics]
// namespace = "notification_writer"
// address = ":8080"
//
// Environment variables that can be used to override configuration file settings:
// CCX_NOTIFICATION_WRITER__BROKER__ADDRESS
// CCX_NOTIFICATION_WRITER__BROKER__SECURITY_PROTOCOL
// CCX_NOTIFICATION_WRITER__BROKER__SASL_MECHANISM
// CCX_NOTIFICATION_WRITER__BROKER__SASL_USERNAME
// CCX_NOTIFICATION_WRITER__BROKER__SASL_PASSWORD
// CCX_NOTIFICATION_WRITER__BROKER__TOPIC
// CCX_NOTIFICATION_WRITER__BROKER__GROUP
// CCX_NOTIFICATION_WRITER__BROKER__ENABLED
// CCX_NOTIFICATION_WRITER__STORAGE__DB_DRIVER
// CCX_NOTIFICATION_WRITER__STORAGE__PG_USERNAME
// CCX_NOTIFICATION_WRITER__STORAGE__PG_PASSWORD
// CCX_NOTIFICATION_WRITER__STORAGE__PG_HOST
// CCX_NOTIFICATION_WRITER__STORAGE__PG_PORT
// CCX_NOTIFICATION_WRITER__STORAGE__PG_DB_NAME
// CCX_NOTIFICATION_WRITER__STORAGE__PG_PARAMS
// CCX_NOTIFICATION_WRITER__STORAGE__LOG_SQL_QUERIES
// CCX_NOTIFICATION_WRITER__LOGGING__DEBUG
// CCX_NOTIFICATION_WRITER__LOGGING__LOG_LEVEL
// CCX_NOTIFICATION_WRITER__METRICS__NAMESPACE
// CCX_NOTIFICATION_WRITER__METRICS__ADDRESS
import (
"bytes"
"fmt"
"os"
"strings"
"github.com/BurntSushi/toml"
"github.com/RedHatInsights/insights-operator-utils/logger"
"path/filepath"
clowder "github.com/redhatinsights/app-common-go/pkg/api/v1"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
)
// Common constants used for logging and error reporting
const (
filenameAttribute = "filename"
parsingConfigurationFileMessage = "parsing configuration file"
noKafkaConfig = "no Kafka configuration available in Clowder, using default one"
noBrokerConfig = "warning: no broker configurations found in clowder config"
noSaslConfig = "warning: SASL configuration is missing"
noTopicMapping = "warning: no kafka mapping found for topic %s"
noStorage = "warning: no storage section in Clowder config"
)
// ConfigStruct is a structure holding the whole notification service
// configuration
type ConfigStruct struct {
Broker BrokerConfiguration `mapstructure:"broker" toml:"broker"`
Storage StorageConfiguration `mapstructure:"storage" toml:"storage"`
Logging logger.LoggingConfiguration `mapstructure:"logging" toml:"logging"`
CloudWatchConf logger.CloudWatchConfiguration `mapstructure:"cloudwatch" toml:"cloudwatch"`
Metrics MetricsConfiguration `mapstructure:"metrics" toml:"metrics"`
Tracker TrackerConfiguration `mapstructure:"tracker" toml:"tracker"`
Sentry logger.SentryLoggingConfiguration `mapstructure:"sentry" toml:"sentry"`
}
// MetricsConfiguration holds metrics related configuration
type MetricsConfiguration struct {
Namespace string `mapstructure:"namespace" toml:"namespace"`
Address string `mapstructure:"address" toml:"address"`
}
// TrackerConfiguration holds configuration for payload tracker
type TrackerConfiguration struct {
Topic string `mapstructure:"topic" toml:"topic"`
ServiceName string `mapstructure:"service_name" toml:"service_name"`
}
// BrokerConfiguration represents configuration for the broker
type BrokerConfiguration struct {
// Addresses represents Kafka broker addresses
Addresses string `mapstructure:"addresses" toml:"addresses"`
// SecurityProtocol represents the security protocol used by the broker
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
// CertPath is the path to a file containing the certificate to be used with the broker
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
// SaslMechanism is the SASL mechanism used for authentication
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
// SaslUsername is the username used in case of PLAIN mechanism
SaslUsername string `mapstructure:"sasl_username" toml:"sasl_username"`
// SaslPassword is the password used in case of PLAIN mechanism
SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"`
// Topic is name of Kafka topic
Topic string `mapstructure:"topic" toml:"topic"`
// Group is name of Kafka group
Group string `mapstructure:"group" toml:"group"`
// Enabled is set to true if Kafka consumer is to be enabled
Enabled bool `mapstructure:"enabled" toml:"enabled"`
}
// StorageConfiguration represents configuration of data storage
type StorageConfiguration struct {
Driver string `mapstructure:"db_driver" toml:"db_driver"`
PGUsername string `mapstructure:"pg_username" toml:"pg_username"`
PGPassword string `mapstructure:"pg_password" toml:"pg_password"`
PGHost string `mapstructure:"pg_host" toml:"pg_host"`
PGPort int `mapstructure:"pg_port" toml:"pg_port"`
PGDBName string `mapstructure:"pg_db_name" toml:"pg_db_name"`
PGParams string `mapstructure:"pg_params" toml:"pg_params"`
LogSQLQueries bool `mapstructure:"log_sql_queries" toml:"log_sql_queries"`
}
// LoadConfiguration loads configuration from defaultConfigFile, file set in
// configFileEnvVariableName or from env
func LoadConfiguration(configFileEnvVariableName, defaultConfigFile string) (ConfigStruct, error) {
var configuration ConfigStruct
// env. variable holding name of configuration file
configFile, specified := os.LookupEnv(configFileEnvVariableName)
if specified {
log.Info().Str(filenameAttribute, configFile).Msg(parsingConfigurationFileMessage)
// we need to separate the directory name and filename without
// extension
directory, basename := filepath.Split(configFile)
file := strings.TrimSuffix(basename, filepath.Ext(basename))
// parse the configuration
viper.SetConfigName(file)
viper.AddConfigPath(directory)
} else {
log.Info().Str(filenameAttribute, defaultConfigFile).Msg(parsingConfigurationFileMessage)
// parse the configuration
viper.SetConfigName(defaultConfigFile)
viper.AddConfigPath(".")
}
// try to read the whole configuration
err := viper.ReadInConfig()
if _, isNotFoundError := err.(viper.ConfigFileNotFoundError); !specified && isNotFoundError {
// If config file is not present (which might be correct in
// some environment) we need to read configuration from
// environment variables. The problem is that Viper is not
// smart enough to understand the structure of config by
// itself, so we need to read fake config file
fakeTomlConfigWriter := new(bytes.Buffer)
err := toml.NewEncoder(fakeTomlConfigWriter).Encode(configuration)
if err != nil {
return configuration, err
}
fakeTomlConfig := fakeTomlConfigWriter.String()
viper.SetConfigType("toml")
err = viper.ReadConfig(strings.NewReader(fakeTomlConfig))
if err != nil {
return configuration, err
}
} else if err != nil {
// error is processed on caller side
return configuration, fmt.Errorf("fatal error config file: %s", err)
}
// override configuration from env if there's variable in env
const envPrefix = "CCX_NOTIFICATION_WRITER_"
viper.AutomaticEnv()
viper.SetEnvPrefix(envPrefix)
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "__"))
err = viper.Unmarshal(&configuration)
if err != nil {
return configuration, err
}
updateConfigFromClowder(&configuration)
// everything's should be ok
return configuration, nil
}
// GetStorageConfiguration returns storage configuration
func GetStorageConfiguration(configuration *ConfigStruct) StorageConfiguration {
return configuration.Storage
}
// GetLoggingConfiguration returns logging configuration
func GetLoggingConfiguration(configuration *ConfigStruct) logger.LoggingConfiguration {
return configuration.Logging
}
// GetCloudWatchConfiguration returns cloudwatch configuration
func GetCloudWatchConfiguration(configuration *ConfigStruct) logger.CloudWatchConfiguration {
return configuration.CloudWatchConf
}
// GetSentryConfiguration returns sentry configuration
func GetSentryConfiguration(configuration *ConfigStruct) logger.SentryLoggingConfiguration {
return configuration.Sentry
}
// GetBrokerConfiguration returns broker configuration
func GetBrokerConfiguration(configuration *ConfigStruct) BrokerConfiguration {
return configuration.Broker
}
// GetMetricsConfiguration returns metrics configuration
func GetMetricsConfiguration(configuration *ConfigStruct) MetricsConfiguration {
return configuration.Metrics
}
// GetTrackerConfiguration returns payload tracker configuration
func GetTrackerConfiguration(configuration *ConfigStruct) TrackerConfiguration {
return configuration.Tracker
}
func updateBrokerCfgFromClowder(configuration *ConfigStruct) {
// make sure broker(s) are configured in Clowder
if len(clowder.LoadedConfig.Kafka.Brokers) > 0 {
configuration.Broker.Addresses = ""
for _, broker := range clowder.LoadedConfig.Kafka.Brokers {
if broker.Port != nil {
configuration.Broker.Addresses += fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) + ","
} else {
configuration.Broker.Addresses += broker.Hostname + ","
}
}
// remove the extra comma
configuration.Broker.Addresses = configuration.Broker.Addresses[:len(configuration.Broker.Addresses)-1]
// SSL config
clowderBrokerCfg := clowder.LoadedConfig.Kafka.Brokers[0]
if clowderBrokerCfg.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if clowderBrokerCfg.Sasl != nil {
configuration.Broker.SaslUsername = *clowderBrokerCfg.Sasl.Username
configuration.Broker.SaslPassword = *clowderBrokerCfg.Sasl.Password
configuration.Broker.SaslMechanism = *clowderBrokerCfg.Sasl.SaslMechanism
configuration.Broker.SecurityProtocol = *clowderBrokerCfg.SecurityProtocol
if caPath, err := clowder.LoadedConfig.KafkaCa(clowderBrokerCfg); err == nil {
configuration.Broker.CertPath = caPath
}
} else {
fmt.Println(noSaslConfig)
}
}
} else {
fmt.Println(noBrokerConfig)
}
updateTopicsMapping(&configuration.Broker)
}
// updateConfigFromClowder updates the current config with the values defined in clowder
func updateConfigFromClowder(configuration *ConfigStruct) {
// check if Clowder is enabled. If not, simply skip the logic.
if !clowder.IsClowderEnabled() || clowder.LoadedConfig == nil {
fmt.Println("Clowder is disabled")
return
}
fmt.Println("Clowder is enabled")
if clowder.LoadedConfig.Kafka == nil {
fmt.Println(noKafkaConfig)
} else {
updateBrokerCfgFromClowder(configuration)
}
if clowder.LoadedConfig.Database != nil {
// get DB configuration from clowder
configuration.Storage.PGDBName = clowder.LoadedConfig.Database.Name
configuration.Storage.PGHost = clowder.LoadedConfig.Database.Hostname
configuration.Storage.PGPort = clowder.LoadedConfig.Database.Port
configuration.Storage.PGUsername = clowder.LoadedConfig.Database.Username
configuration.Storage.PGPassword = clowder.LoadedConfig.Database.Password
} else {
fmt.Println(noStorage)
}
}
func updateTopicsMapping(configuration *BrokerConfiguration) {
// Get the correct topic name from clowder mapping if available
if clowderTopic, ok := clowder.KafkaTopics[configuration.Topic]; ok {
configuration.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, configuration.Topic)
}
}