-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathnetbox_prometheus.py
executable file
·152 lines (137 loc) · 6.13 KB
/
netbox_prometheus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/python3
import os
import pynetbox
import re
import sys
import yaml
CLASS_MAP = {
"Devices": "device",
"VirtualMachines": "vm",
}
class ConfigBuilder:
def __init__(self, nb, filter={}):
self.nb = nb
self.filter = filter
self.metrics = {} # {(instance, kind) => {label=>value}} # static metadata
self.targets = {} # {filename => (labels) => [target]} # targets to be scraped
def add_target(self, item, filename, labels={}):
if not item.name:
print("Unnamed item %r" % item, file=sys.stderr)
return
kind = CLASS_MAP.get(item.__class__.__name__, item.__class__.__name__)
# add to prometheus scraping target
target_key = tuple([("netbox_type",kind)] + sorted(labels.items()))
self.targets.setdefault(filename, {})
tf = self.targets[filename]
tf.setdefault(target_key, [])
tt = tf[target_key]
if item.primary_ip:
addr = re.sub(r'/\d+$', '', item.primary_ip.address)
if ":" in addr:
addr = "[" + addr + "]"
tt.append(item.name + " " + addr)
else:
tt.append(item.name)
# add netbox_meta metric (label with role, site etc)
metric_key = (item.name, kind)
self.metrics.setdefault(metric_key, {})
tenant = getattr(item, "tenant", None)
if tenant:
self.metrics[metric_key]["tenant"] = tenant.slug
role = getattr(item, "device_role", getattr(item, "role", None))
if role:
self.metrics[metric_key]["role"] = role.slug
site = getattr(item, "site", None)
if site:
self.metrics[metric_key]["site"] = site.slug
rack = getattr(item, "rack", None)
if rack:
self.metrics[metric_key]["rack"] = rack.name # rack has no slug
cluster = getattr(item, "cluster", None)
if cluster:
self.metrics[metric_key]["cluster"] = cluster.name # cluster has no slug
for tag in item.tags:
self.metrics[metric_key]["tags_"+str(tag)] = "1"
def add_targets(self, items, filename, labels={}):
"""Add a target once"""
for item in items:
self.add_target(item, filename, labels)
def add_targets_cf(self, items, filename, cf_name, param_name):
"""Add a target for each value in a given custom field"""
for item in items:
cf = getattr(item, 'custom_fields')
if not cf:
print("Item %r: missing or empty custom_fields" % item)
continue
cv = cf.get(cf_name)
if not cv:
print("Item %r: missing or empty %s" % (item, cf_name))
continue
if not isinstance(cv, list):
cv = [cv]
for mod in cv:
self.add_target(item, filename, {param_name: mod})
def build(self):
"""
Here you assemble the netbox things you wish to query and which files to add them to.
Add queries for the different types of object to be polled.
"""
self.add_targets(self.nb.dcim.devices.filter(tag="prom_node", **self.filter), "node_targets.yml")
self.add_targets(self.nb.virtualization.virtual_machines.filter(tag="prom_node", **self.filter), "node_targets.yml")
self.add_targets_cf(self.nb.dcim.devices.filter(tag="prom_snmp", **self.filter), "snmp_targets.yml", "snmp_module", "module")
self.add_targets_cf(self.nb.virtualization.virtual_machines.filter(tag="prom_snmp", **self.filter), "snmp_targets.yml", "snmp_module", "module")
self.add_targets(self.nb.dcim.devices.filter(tag="prom_windows", **self.filter), "windows_targets.yml")
self.add_targets(self.nb.virtualization.virtual_machines.filter(tag="prom_windows", **self.filter), "windows_targets.yml")
# TODO: blackbox_targets: should this be on Device/VM or on IPAddress object? And/or Service?
def replace_file(self, filename, content):
try:
with open(filename) as f:
oldconf = f.read()
if oldconf == content:
return
except FileNotFoundError:
pass
with open(filename+".new", "w") as f:
f.write(content)
os.rename(filename+".new", filename)
def gen_target_file(self, data):
""" data is a dict of (labels) => [target]
Sort it so that it's repeatable
"""
content = []
for labels, targets in sorted(data.items()):
content.append({"labels": dict(labels), "targets": sorted(targets)})
return "# Auto-generated from Netbox, do not edit as your changes will be overwritten!\n" + yaml.dump(content, default_flow_style=False)
def write_targets(self, dir):
for filename, data in self.targets.items():
self.replace_file(dir+"/"+filename, self.gen_target_file(data))
def write_metrics(self, filename):
content = ""
for (instance, kind), labels in sorted(self.metrics.items()):
content += "netbox_meta{instance=\"%s\",netbox_type=\"%s\"" % (instance, kind)
for k, v in labels.items():
content += ",%s=\"%s\"" % (re.sub(r'[^a-zA-Z0-9_]', '_', k), re.sub(r'"', r'\\"', v))
content += "} 1\n"
self.replace_file(filename, content)
if __name__ == "__main__":
API_URL = "https://netbox.example.net"
API_TOKEN = "XXXXXXXX"
SITE_TAG = "prometheus" # we will poll devices in all sites with this tag (and VMs in clusters where the cluster's site has this tag)
DIR = "/etc/prometheus/targets.d"
METRICS = "/var/www/html/metrics/netbox"
# Uncomment when testing:
#DIR = "/tmp"
#METRICS = "/tmp/netbox.prom"
nb = pynetbox.api(API_URL, token=API_TOKEN)
builder = ConfigBuilder(
nb=nb,
filter={
"exclude": "config_context",
"site_id": [s.id for s in nb.dcim.sites.filter(tag=SITE_TAG)],
# This changed in 2.7: https://github.com/netbox-community/netbox/issues/3569
"status": "active", # "status": 1,
},
)
builder.build()
builder.write_targets(DIR)
builder.write_metrics(METRICS)