forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 2
/
telegraf.go
322 lines (275 loc) · 10.7 KB
/
telegraf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
package influxdb
import (
"context"
"encoding/json"
"fmt"
"regexp"
"github.com/BurntSushi/toml"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/telegraf/plugins"
"github.com/influxdata/influxdb/v2/telegraf/plugins/inputs"
"github.com/influxdata/influxdb/v2/telegraf/plugins/outputs"
)
const (
ErrTelegrafConfigInvalidOrgID = "invalid org ID" // ErrTelegrafConfigInvalidOrgID is the error message for a missing or invalid organization ID.
ErrTelegrafConfigNotFound = "telegraf configuration not found" // ErrTelegrafConfigNotFound is the error message for a missing telegraf config.
ErrTelegrafPluginNameUnmatch = "the telegraf plugin is name %s doesn't match the config %s"
ErrNoTelegrafPlugins = "there is no telegraf plugin in the config"
ErrUnsupportTelegrafPluginType = "unsupported telegraf plugin type %s"
ErrUnsupportTelegrafPluginName = "unsupported telegraf plugin %s, type %s"
)
// ops for buckets error and buckets op logs.
var (
OpFindTelegrafConfigByID = "FindTelegrafConfigByID"
OpFindTelegrafConfigs = "FindTelegrafConfigs"
OpCreateTelegrafConfig = "CreateTelegrafConfig"
OpUpdateTelegrafConfig = "UpdateTelegrafConfig"
OpDeleteTelegrafConfig = "DeleteTelegrafConfig"
)
// TelegrafConfigStore represents a service for managing telegraf config data.
type TelegrafConfigStore interface {
// FindTelegrafConfigByID returns a single telegraf config by ID.
FindTelegrafConfigByID(ctx context.Context, id platform.ID) (*TelegrafConfig, error)
// FindTelegrafConfigs returns a list of telegraf configs that match filter and the total count of matching telegraf configs.
// Additional options provide pagination & sorting.
FindTelegrafConfigs(ctx context.Context, filter TelegrafConfigFilter, opt ...FindOptions) ([]*TelegrafConfig, int, error)
// CreateTelegrafConfig creates a new telegraf config and sets b.ID with the new identifier.
CreateTelegrafConfig(ctx context.Context, tc *TelegrafConfig, userID platform.ID) error
// UpdateTelegrafConfig updates a single telegraf config.
// Returns the new telegraf config after update.
UpdateTelegrafConfig(ctx context.Context, id platform.ID, tc *TelegrafConfig, userID platform.ID) (*TelegrafConfig, error)
// DeleteTelegrafConfig removes a telegraf config by ID.
DeleteTelegrafConfig(ctx context.Context, id platform.ID) error
}
// TelegrafConfigFilter represents a set of filter that restrict the returned telegraf configs.
type TelegrafConfigFilter struct {
OrgID *platform.ID
Organization *string
}
// TelegrafConfig stores telegraf config for one telegraf instance.
type TelegrafConfig struct {
ID platform.ID `json:"id,omitempty"` // ID of this config object.
OrgID platform.ID `json:"orgID,omitempty"` // OrgID is the id of the owning organization.
Name string `json:"name,omitempty"` // Name of this config object.
Description string `json:"description,omitempty"` // Decription of this config object.
Config string `json:"config,omitempty"` // ConfigTOML contains the raw toml config.
Metadata map[string]interface{} `json:"metadata,omitempty"` // Metadata for the config.
}
var pluginCount = regexp.MustCompilePOSIX(`\[\[(inputs\..*|outputs\..*|aggregators\..*|processors\..*)\]\]`)
// CountPlugins returns a map of the number of times each plugin is used.
func (tc *TelegrafConfig) CountPlugins() map[string]float64 {
plugins := map[string]float64{}
founds := pluginCount.FindAllStringSubmatch(tc.Config, -1)
for _, v := range founds {
if len(v) < 2 {
continue
}
plugins[v[1]]++
}
return plugins
}
// UnmarshalJSON implement the json.Unmarshaler interface.
// Gets called when reading from the kv db. mostly legacy so loading old/stored configs still work.
// May not remove for a while. Primarily will get hit when user views/downloads config.
func (tc *TelegrafConfig) UnmarshalJSON(b []byte) error {
tcd := new(telegrafConfigDecode)
if err := json.Unmarshal(b, tcd); err != nil {
return err
}
orgID := tcd.OrgID
if orgID == nil || !orgID.Valid() {
orgID = tcd.OrganizationID
}
if tcd.ID != nil {
tc.ID = *tcd.ID
}
if orgID != nil {
tc.OrgID = *orgID
}
tc.Name = tcd.Name
tc.Description = tcd.Description
// Prefer new structure; use full toml config.
tc.Config = tcd.Config
tc.Metadata = tcd.Metadata
if tcd.Plugins != nil {
// legacy, remove after some moons. or a migration.
if len(tcd.Plugins) > 0 {
bkts, conf, err := decodePluginRaw(tcd)
if err != nil {
return err
}
tc.Config = plugins.AgentConfig + conf
tc.Metadata = map[string]interface{}{"buckets": bkts}
} else if c, ok := plugins.GetPlugin("output", "influxdb_v2"); ok {
// Handles legacy adding of default plugins (agent and output).
tc.Config = plugins.AgentConfig + c.Config
tc.Metadata = map[string]interface{}{
"buckets": []string{},
}
}
} else if tcd.Metadata == nil || len(tcd.Metadata) == 0 {
// Get buckets from the config.
m, err := parseMetadata(tc.Config)
if err != nil {
return err
}
tc.Metadata = m
}
return nil
}
type buckets []string
func (t *buckets) UnmarshalTOML(data interface{}) error {
dataOk, ok := data.(map[string]interface{})
if !ok {
return &errors.Error{
Code: errors.EEmptyValue,
Msg: "no config to get buckets",
}
}
bkts := []string{}
for tp, ps := range dataOk {
if tp != "outputs" {
continue
}
plugins, ok := ps.(map[string]interface{})
if !ok {
return &errors.Error{
Code: errors.EEmptyValue,
Msg: "no plugins in config to get buckets",
}
}
for name, configDataArray := range plugins {
if name != "influxdb_v2" {
continue
}
config, ok := configDataArray.([]map[string]interface{})
if !ok {
return &errors.Error{
Code: errors.EEmptyValue,
Msg: "influxdb_v2 output has no config",
}
}
for i := range config {
if b, ok := config[i]["bucket"]; ok {
bkts = append(bkts, b.(string))
}
}
}
}
*t = bkts
return nil
}
func parseMetadata(cfg string) (map[string]interface{}, error) {
bs := []string{}
this := &buckets{}
_, err := toml.Decode(cfg, this)
if err != nil {
return nil, err
}
for _, i := range *this {
if i != "" {
bs = append(bs, i)
}
}
return map[string]interface{}{"buckets": bs}, nil
}
// return bucket, config, error
func decodePluginRaw(tcd *telegrafConfigDecode) ([]string, string, error) {
op := "unmarshal telegraf config raw plugin"
ps := ""
bucket := []string{}
for _, pr := range tcd.Plugins {
var tpFn func() plugins.Config
var ok bool
switch pr.Type {
case "input":
tpFn, ok = availableInputPlugins[pr.Name]
case "output":
tpFn, ok = availableOutputPlugins[pr.Name]
default:
return nil, "", &errors.Error{
Code: errors.EInvalid,
Op: op,
Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginType, pr.Type),
}
}
if !ok {
// This removes the validation (and does not create toml) for new "input" plugins
// but keeps in place the existing behavior for certain "input" plugins
if pr.Type == "output" {
return nil, "", &errors.Error{
Code: errors.EInvalid,
Op: op,
Msg: fmt.Sprintf(ErrUnsupportTelegrafPluginName, pr.Name, pr.Type),
}
}
continue
}
config := tpFn()
// if pr.Config if empty, make it a blank obj,
// so it will still go to the unmarshalling process to validate.
if pr.Config == nil || len(string(pr.Config)) == 0 {
pr.Config = []byte("{}")
}
if err := json.Unmarshal(pr.Config, config); err != nil {
return nil, "", &errors.Error{
Code: errors.EInvalid,
Err: err,
Op: op,
}
}
if pr.Name == "influxdb_v2" {
if b := config.(*outputs.InfluxDBV2).Bucket; b != "" {
bucket = []string{b}
}
}
ps += config.TOML()
}
return bucket, ps, nil
}
// telegrafConfigDecode is the helper struct for json decoding. legacy.
type telegrafConfigDecode struct {
ID *platform.ID `json:"id,omitempty"`
OrganizationID *platform.ID `json:"organizationID,omitempty"`
OrgID *platform.ID `json:"orgID,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
Config string `json:"config,omitempty"`
Plugins []telegrafPluginDecode `json:"plugins,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// telegrafPluginDecode is the helper struct for json decoding. legacy.
type telegrafPluginDecode struct {
Type string `json:"type,omitempty"` // Type of the plugin.
Name string `json:"name,omitempty"` // Name of the plugin.
Alias string `json:"alias,omitempty"` // Alias of the plugin.
Description string `json:"description,omitempty"` // Description of the plugin.
Config json.RawMessage `json:"config,omitempty"` // Config is the currently stored plugin configuration.
}
var availableInputPlugins = map[string](func() plugins.Config){
"cpu": func() plugins.Config { return &inputs.CPUStats{} },
"disk": func() plugins.Config { return &inputs.DiskStats{} },
"diskio": func() plugins.Config { return &inputs.DiskIO{} },
"docker": func() plugins.Config { return &inputs.Docker{} },
"file": func() plugins.Config { return &inputs.File{} },
"kernel": func() plugins.Config { return &inputs.Kernel{} },
"kubernetes": func() plugins.Config { return &inputs.Kubernetes{} },
"logparser": func() plugins.Config { return &inputs.LogParserPlugin{} },
"mem": func() plugins.Config { return &inputs.MemStats{} },
"net_response": func() plugins.Config { return &inputs.NetResponse{} },
"net": func() plugins.Config { return &inputs.NetIOStats{} },
"nginx": func() plugins.Config { return &inputs.Nginx{} },
"processes": func() plugins.Config { return &inputs.Processes{} },
"procstat": func() plugins.Config { return &inputs.Procstat{} },
"prometheus": func() plugins.Config { return &inputs.Prometheus{} },
"redis": func() plugins.Config { return &inputs.Redis{} },
"swap": func() plugins.Config { return &inputs.SwapStats{} },
"syslog": func() plugins.Config { return &inputs.Syslog{} },
"system": func() plugins.Config { return &inputs.SystemStats{} },
"tail": func() plugins.Config { return &inputs.Tail{} },
}
var availableOutputPlugins = map[string](func() plugins.Config){
"file": func() plugins.Config { return &outputs.File{} },
"influxdb_v2": func() plugins.Config { return &outputs.InfluxDBV2{} },
}