diff --git a/config.yaml b/config.yaml index bd34fd0..72b261b 100644 --- a/config.yaml +++ b/config.yaml @@ -175,6 +175,14 @@ prometheus: port: 8081 #If the API is run the API runs on the next port number up from this async_subscriber_count: False #If enabled the subscriber count will be updated asynchronously for Prometheus +influxdb: + enabled: False + host: "127.0.0.1" + port: 8086 + username: exampleUser + password: examplePassword + database: example + snmp: port: 1161 listen_address: 127.0.0.1 diff --git a/lib/diameter.py b/lib/diameter.py index baf2980..12c45c4 100755 --- a/lib/diameter.py +++ b/lib/diameter.py @@ -10,6 +10,7 @@ from database import Database, ROAMING_NETWORK, ROAMING_RULE from messaging import RedisMessaging from redis import Redis +import datetime import yaml import json import time @@ -1742,6 +1743,12 @@ def Answer_16777251_318(self, packet_vars, avps): "diameter_cmd_code": 318, "event": "Unknown User", "imsi_prefix": str(imsi[0:6])}, + metricInflux={ + "measurement": "S6a_Authentication_Information_Request", + "fields": {"Result-Code": 5001}, + "tags": {"IMSI": str(imsi)}, + "time": datetime.datetime.now(datetime.timezone.utc).astimezone().isoformat() + }, metricHelp='Diameter Authentication related Counters', metricExpiry=60, usePrefix=True, diff --git a/lib/messaging.py b/lib/messaging.py index c017c6a..0b5fcdb 100755 --- a/lib/messaging.py +++ b/lib/messaging.py @@ -36,7 +36,7 @@ def sendMessage(self, queue: str, message: str, queueExpiry: int=None, usePrefix except Exception as e: return '' - def sendMetric(self, serviceName: str, metricName: str, metricType: str, metricAction: str, metricValue: float, metricHelp: str='', metricLabels: list=[], metricTimestamp: int=time.time_ns(), metricExpiry: int=None, usePrefix: bool=False, prefixHostname: str='unknown', prefixServiceName: str='common') -> str: + def sendMetric(self, serviceName: str, metricName: str, metricType: str, metricAction: str, metricValue: float, metricInflux: dict={}, metricHelp: str='', metricLabels: list=[], metricTimestamp: int=time.time_ns(), metricExpiry: int=None, usePrefix: bool=False, prefixHostname: str='unknown', prefixServiceName: str='common') -> str: """ Stores a prometheus metric in a format readable by the metric service. """ @@ -52,6 +52,7 @@ def sendMetric(self, serviceName: str, metricName: str, metricType: str, metricA 'LABELS': metricLabels, 'ACTION': metricAction, 'VALUE': metricValue, + 'INFLUX': metricInflux, } ]) diff --git a/services/metricService.py b/services/metricService.py index 7aa314a..f144581 100755 --- a/services/metricService.py +++ b/services/metricService.py @@ -5,7 +5,9 @@ from prometheus_client import make_wsgi_app, start_http_server, Counter, Gauge, Summary, Histogram, CollectorRegistry from werkzeug.middleware.dispatcher import DispatcherMiddleware from flask import Flask +from influxdb import InfluxDBClient import threading +import traceback sys.path.append(os.path.realpath('../lib')) from messaging import RedisMessaging from banners import Banners @@ -27,6 +29,40 @@ def __init__(self, redisHost: str='127.0.0.1', redisPort: int=6379): self.registry = CollectorRegistry(auto_describe=True) self.logTool.log(service='Metric', level='info', message=f"{self.banners.metricService()}", redisClient=self.redisMessaging) self.hostname = socket.gethostname() + self.influxEnabled = self.config.get('influxdb', {}).get('enabled', None) + self.influxDatabase = self.config.get('influxdb', {}).get('database', None) + self.influxUser = self.config.get('influxdb', {}).get('username', None) + self.influxPassword = self.config.get('influxdb', {}).get('password', None) + self.influxHost = self.config.get('influxdb', {}).get('host', None) + self.influxPort = self.config.get('influxdb', {}).get('port', None) + + def processInfluxdb(self, influxData: dict) -> bool: + """ + Sends defined InfluxDB Metrics to InfluxDB, if configured. + """ + + if not self.influxEnabled: + return True + if not self.influxDatabase: + return True + if not self.influxUser: + return True + if not self.influxPassword: + return True + if not self.influxHost: + return True + if not self.influxPort: + return True + + influxClient = InfluxDBClient(self.influxHost, self.influxPort, self.influxUser, self.influxPassword, self.influxDatabase) + + if not isinstance(influxData, list): + influxData = [influxData] + + influxClient.write_points(influxData) + + return True + def handleMetrics(self): """ @@ -52,6 +88,13 @@ def handleMetrics(self): counterHelp = prometheusJson.get('HELP', '') counterLabels = prometheusJson.get('LABELS', {}) + try: + metricInflux = prometheusJson.get('INFLUX', {}) + if metricInflux: + self.processInfluxdb(influxData=metricInflux) + except Exception as e: + self.logTool.log(service='Metric', level='warn', message=f"[Metric] [handleMetrics] Error processing metric InfluxDb content: {traceback.format_exc()}", redisClient=self.redisMessaging) + if isinstance(counterLabels, list): counterLabels = dict()