-
Notifications
You must be signed in to change notification settings - Fork 236
/
Copy pathmain.go
564 lines (512 loc) · 19.2 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bufio"
"fmt"
"log/slog"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strconv"
"syscall"
"github.com/alecthomas/kingpin/v2"
"github.com/prometheus/client_golang/prometheus"
versioncollector "github.com/prometheus/client_golang/prometheus/collectors/version"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/promslog"
"github.com/prometheus/common/promslog/flag"
"github.com/prometheus/common/version"
"github.com/prometheus/exporter-toolkit/web"
"github.com/prometheus/statsd_exporter/pkg/address"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line"
"github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/mappercache/lru"
"github.com/prometheus/statsd_exporter/pkg/mappercache/randomreplacement"
"github.com/prometheus/statsd_exporter/pkg/relay"
)
var (
eventStats = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_total",
Help: "The total number of StatsD events seen.",
},
[]string{"type"},
)
eventsFlushed = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_event_queue_flushed_total",
Help: "Number of times events were flushed to exporter",
},
)
eventsUnmapped = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_events_unmapped_total",
Help: "The total number of StatsD events no mapping was found for.",
})
udpPackets = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_udp_packets_total",
Help: "The total number of StatsD packets received over UDP.",
},
)
udpPacketDrops = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_udp_packet_drops_total",
Help: "The total number of dropped StatsD packets which received over UDP.",
},
)
tcpConnections = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connections_total",
Help: "The total number of TCP connections handled.",
},
)
tcpErrors = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connection_errors_total",
Help: "The number of errors encountered reading from TCP.",
},
)
tcpLineTooLong = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_too_long_lines_total",
Help: "The number of lines discarded due to being too long.",
},
)
unixgramPackets = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_unixgram_packets_total",
Help: "The total number of StatsD packets received over Unixgram.",
},
)
linesReceived = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_lines_total",
Help: "The total number of StatsD lines received.",
},
)
samplesReceived = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_samples_total",
Help: "The total number of StatsD samples received.",
},
)
sampleErrors = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_sample_errors_total",
Help: "The total number of errors parsing StatsD samples.",
},
[]string{"reason"},
)
tagsReceived = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tags_total",
Help: "The total number of DogStatsD tags processed.",
},
)
tagErrors = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tag_errors_total",
Help: "The number of errors parsing DogStatsD tags.",
},
)
configLoads = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_config_reloads_total",
Help: "The number of configuration reloads.",
},
[]string{"outcome"},
)
mappingsCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "statsd_exporter_loaded_mappings",
Help: "The current number of configured metric mappings.",
})
conflictingEventStats = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_conflict_total",
Help: "The total number of StatsD events with conflicting names.",
},
[]string{"type"},
)
errorEventStats = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_error_total",
Help: "The total number of StatsD events discarded due to errors.",
},
[]string{"reason"},
)
eventsActions = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_actions_total",
Help: "The total number of StatsD events by action.",
},
[]string{"action"},
)
metricsCount = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "statsd_exporter_metrics_total",
Help: "The total number of metrics.",
},
[]string{"type"},
)
)
func serveHTTP(mux http.Handler, listenAddress string, logger *slog.Logger) {
logger.Error(http.ListenAndServe(listenAddress, mux).Error())
os.Exit(1)
}
func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, logger *slog.Logger) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGHUP)
for s := range signals {
if fileName == "" {
logger.Warn("Received signal but no mapping config to reload", "signal", s)
continue
}
logger.Info("Received signal, attempting reload", "signal", s)
reloadConfig(fileName, mapper, logger)
}
}
func reloadConfig(fileName string, mapper *mapper.MetricMapper, logger *slog.Logger) {
err := mapper.InitFromFile(fileName)
if err != nil {
logger.Info("Error reloading config", "error", err)
configLoads.WithLabelValues("failure").Inc()
} else {
logger.Info("Config reloaded successfully")
configLoads.WithLabelValues("success").Inc()
}
}
func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string, logger *slog.Logger) error {
f, err := os.Create(dumpFilename)
if err != nil {
return err
}
logger.Info("Start dumping FSM", "file_name", dumpFilename)
w := bufio.NewWriter(f)
mapper.FSM.DumpFSM(w)
w.Flush()
f.Close()
logger.Info("Finish dumping FSM")
return nil
}
func getCache(cacheSize int, cacheType string, registerer prometheus.Registerer) (mapper.MetricMapperCache, error) {
var cache mapper.MetricMapperCache
var err error
if cacheSize == 0 {
return nil, nil
} else {
switch cacheType {
case "lru":
cache, err = lru.NewMetricMapperLRUCache(registerer, cacheSize)
case "random":
cache, err = randomreplacement.NewMetricMapperRRCache(registerer, cacheSize)
default:
err = fmt.Errorf("unsupported cache type %q", cacheType)
}
if err != nil {
return nil, err
}
}
return cache, nil
}
func main() {
var (
listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String()
enableLifecycle = kingpin.Flag("web.enable-lifecycle", "Enable shutdown and reload via HTTP request.").Default("false").Bool()
metricsEndpoint = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String()
statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
statsdListenUnixgram = kingpin.Flag("statsd.listen-unixgram", "The Unixgram socket path to receive statsd metric lines in datagram. \"\" disables it.").Default("").String()
// not using Int here because flag displays default in decimal, 0755 will show as 493
statsdUnixSocketMode = kingpin.Flag("statsd.unixsocket-mode", "The permission mode of the unix socket.").Default("755").String()
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int()
cacheType = kingpin.Flag("statsd.cache-type", "Metric mapping cache type. Valid options are \"lru\" and \"random\"").Default("lru").Enum("lru", "random")
eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events.").Default("10000").Uint()
eventFlushThreshold = kingpin.Flag("statsd.event-flush-threshold", "Number of events to hold in queue before flushing.").Default("1000").Int()
eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Maximum time between event queue flushes.").Default("200ms").Duration()
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
checkConfig = kingpin.Flag("check-config", "Check configuration and exit.").Default("false").Bool()
dogstatsdTagsEnabled = kingpin.Flag("statsd.parse-dogstatsd-tags", "Parse DogStatsd style tags. Enabled by default.").Default("true").Bool()
influxdbTagsEnabled = kingpin.Flag("statsd.parse-influxdb-tags", "Parse InfluxDB style tags. Enabled by default.").Default("true").Bool()
libratoTagsEnabled = kingpin.Flag("statsd.parse-librato-tags", "Parse Librato style tags. Enabled by default.").Default("true").Bool()
signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool()
relayAddr = kingpin.Flag("statsd.relay.address", "The UDP relay target address (host:port)").String()
relayPacketLen = kingpin.Flag("statsd.relay.packet-length", "Maximum relay output packet length to avoid fragmentation").Default("1400").Uint()
udpPacketQueueSize = kingpin.Flag("statsd.udp-packet-queue-size", "Size of internal queue for processing UDP packets.").Default("10000").Int()
)
promslogConfig := &promslog.Config{}
flag.AddFlags(kingpin.CommandLine, promslogConfig)
kingpin.Version(version.Print("statsd_exporter"))
kingpin.CommandLine.UsageWriter(os.Stdout)
kingpin.HelpFlag.Short('h')
kingpin.Parse()
logger := promslog.New(promslogConfig)
prometheus.MustRegister(versioncollector.NewCollector("statsd_exporter"))
parser := line.NewParser()
if *dogstatsdTagsEnabled {
parser.EnableDogstatsdParsing()
}
if *influxdbTagsEnabled {
parser.EnableInfluxdbParsing()
}
if *libratoTagsEnabled {
parser.EnableLibratoParsing()
}
if *signalFXTagsEnabled {
parser.EnableSignalFXParsing()
}
logger.Info("Starting StatsD -> Prometheus Exporter", "version", version.Info())
logger.Info("Build context", "context", version.BuildContext())
events := make(chan event.Events, *eventQueueSize)
defer close(events)
eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed)
thisMapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount, Logger: logger}
cache, err := getCache(*cacheSize, *cacheType, thisMapper.Registerer)
if err != nil {
logger.Error("Unable to setup metric mapper cache", "error", err)
os.Exit(1)
}
thisMapper.UseCache(cache)
if *mappingConfig != "" {
err := thisMapper.InitFromFile(*mappingConfig)
if err != nil {
logger.Error("error loading config", "error", err)
os.Exit(1)
}
if *dumpFSMPath != "" {
err := dumpFSM(thisMapper, *dumpFSMPath, logger)
if err != nil {
logger.Error("error dumping FSM", "error", err)
// Failure to dump the FSM is an error (the user asked for it and it
// didn't happen) but not fatal (the exporter is fully functional
// afterwards).
}
}
}
exporter := exporter.NewExporter(prometheus.DefaultRegisterer, thisMapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
if *checkConfig {
logger.Info("Configuration check successful, exiting")
return
}
var relayTarget *relay.Relay
if *relayAddr != "" {
var err error
relayTarget, err = relay.NewRelay(logger, *relayAddr, *relayPacketLen)
if err != nil {
logger.Error("Unable to create relay", "err", err)
os.Exit(1)
}
}
logger.Info("Accepting StatsD Traffic", "udp", *statsdListenUDP, "tcp", *statsdListenTCP, "unixgram", *statsdListenUnixgram)
logger.Info("Accepting Prometheus Requests", "addr", *listenAddress)
if *statsdListenUDP == "" && *statsdListenTCP == "" && *statsdListenUnixgram == "" {
logger.Error("At least one of UDP/TCP/Unixgram listeners must be specified.")
os.Exit(1)
}
if *statsdListenUDP != "" {
udpListenAddr, err := address.UDPAddrFromString(*statsdListenUDP)
if err != nil {
logger.Error("invalid UDP listen address", "address", *statsdListenUDP, "error", err)
os.Exit(1)
}
uconn, err := net.ListenUDP("udp", udpListenAddr)
if err != nil {
logger.Error("failed to start UDP listener", "error", err)
os.Exit(1)
}
if *readBuffer != 0 {
err = uconn.SetReadBuffer(*readBuffer)
if err != nil {
logger.Error("error setting UDP read buffer", "error", err)
os.Exit(1)
}
}
udpPacketQueue := make(chan []byte, *udpPacketQueueSize)
ul := &listener.StatsDUDPListener{
Conn: uconn,
EventHandler: eventQueue,
Logger: logger,
LineParser: parser,
UDPPackets: udpPackets,
UDPPacketDrops: udpPacketDrops,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Relay: relayTarget,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
UdpPacketQueue: udpPacketQueue,
}
go ul.Listen()
}
if *statsdListenTCP != "" {
tcpListenAddr, err := address.TCPAddrFromString(*statsdListenTCP)
if err != nil {
logger.Error("invalid TCP listen address", "address", *statsdListenUDP, "error", err)
os.Exit(1)
}
tconn, err := net.ListenTCP("tcp", tcpListenAddr)
if err != nil {
logger.Error("failed to start TCP listener", "err", err)
os.Exit(1)
}
defer tconn.Close()
tl := &listener.StatsDTCPListener{
Conn: tconn,
EventHandler: eventQueue,
Logger: logger,
LineParser: parser,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Relay: relayTarget,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
}
go tl.Listen()
}
if *statsdListenUnixgram != "" {
var err error
if _, err = os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) {
logger.Error("Unixgram socket already exists", "socket_name", *statsdListenUnixgram)
os.Exit(1)
}
uxgconn, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
Net: "unixgram",
Name: *statsdListenUnixgram,
})
if err != nil {
logger.Error("failed to listen on Unixgram socket", "error", err)
os.Exit(1)
}
defer uxgconn.Close()
if *readBuffer != 0 {
err = uxgconn.SetReadBuffer(*readBuffer)
if err != nil {
logger.Error("error setting Unixgram read buffer", "error", err)
os.Exit(1)
}
}
ul := &listener.StatsDUnixgramListener{
Conn: uxgconn,
EventHandler: eventQueue,
Logger: logger,
LineParser: parser,
UnixgramPackets: unixgramPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Relay: relayTarget,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
}
go ul.Listen()
// if it's an abstract unix domain socket, it won't exist on fs
// so we can't chmod it either
if _, err := os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) {
defer os.Remove(*statsdListenUnixgram)
// convert the string to octet
perm, err := strconv.ParseInt("0"+string(*statsdUnixSocketMode), 8, 32)
if err != nil {
logger.Warn("Bad permission %s: %v, ignoring\n", *statsdUnixSocketMode, err)
} else {
err = os.Chmod(*statsdListenUnixgram, os.FileMode(perm))
if err != nil {
logger.Warn("Failed to change unixgram socket permission", "error", err)
}
}
}
}
mux := http.DefaultServeMux
mux.Handle(*metricsEndpoint, promhttp.Handler())
if *metricsEndpoint != "/" && *metricsEndpoint != "" {
landingConfig := web.LandingConfig{
Name: "StatsD Exporter",
Description: "Prometheus Exporter for converting StatsD to Prometheus metrics",
Version: version.Info(),
Links: []web.LandingLinks{
{
Address: *metricsEndpoint,
Text: "Metrics",
},
},
}
landingPage, err := web.NewLandingPage(landingConfig)
if err != nil {
logger.Error("error creating landing page", "err", err)
os.Exit(1)
}
mux.Handle("/", landingPage)
}
quitChan := make(chan struct{}, 1)
if *enableLifecycle {
mux.HandleFunc("/-/reload", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPut || r.Method == http.MethodPost {
fmt.Fprintf(w, "Requesting reload")
if *mappingConfig == "" {
logger.Warn("Received lifecycle api reload but no mapping config to reload")
return
}
logger.Info("Received lifecycle api reload, attempting reload")
reloadConfig(*mappingConfig, thisMapper, logger)
}
})
mux.HandleFunc("/-/quit", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPut || r.Method == http.MethodPost {
fmt.Fprintf(w, "Requesting termination... Goodbye!")
quitChan <- struct{}{}
}
})
}
mux.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
logger.Debug("Received health check")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Statsd Exporter is Healthy.\n")
}
})
mux.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
logger.Debug("Received ready check")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Statsd Exporter is Ready.\n")
}
})
go serveHTTP(mux, *listenAddress, logger)
go sighupConfigReloader(*mappingConfig, thisMapper, logger)
go exporter.Listen(events)
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
// quit if we get a message on either channel
select {
case sig := <-signals:
logger.Info("Received os signal, exiting", "signal", sig.String())
case <-quitChan:
logger.Info("Received lifecycle api quit, exiting")
}
}