From bf433628f479d93c36fe0d2517986cee3b1f535c Mon Sep 17 00:00:00 2001 From: Sam Pixel York Date: Tue, 5 Sep 2023 13:58:53 +1000 Subject: [PATCH 1/2] Refactor Prometheus metrics into MQTT plugins --- src/python/mqtt_processors/MQTTProcessors.py | 11 +++++++++++ src/python/mqtt_processors/plugins/Wombat.py | 8 ++++++++ src/python/mqtt_processors/plugins/YDOC.py | 6 ++++++ 3 files changed, 25 insertions(+) diff --git a/src/python/mqtt_processors/MQTTProcessors.py b/src/python/mqtt_processors/MQTTProcessors.py index bfb01589..bc87fbad 100644 --- a/src/python/mqtt_processors/MQTTProcessors.py +++ b/src/python/mqtt_processors/MQTTProcessors.py @@ -17,6 +17,14 @@ import mqtt_processors.plugins as plugins +from prometheus_client import start_http_server, Counter + +# Prometheus Metrics +messages_received = Counter('mqtt_messages_received_total', 'Total number of MQTT messages received') +messages_processed_successfully = Counter('mqtt_messages_processed_successfully_total', 'Total number of MQTT messages processed successfully') +messages_processed_failed = Counter('mqtt_messages_processed_failed_total', 'Total number of MQTT messages processed with failures') +start_http_server(8000) # Starting Prometheus server on port 8000 + std_logger = logging.getLogger(__name__) tx_channel: mq.TxChannel = None @@ -113,6 +121,7 @@ def on_message(channel, method, properties, body, plugin_name): # process message std_logger.info(f"{channel}") std_logger.info(f"Message Received for {plugin_name}") + messages_received.inc() processed_message = plugin_modules[plugin_name].on_message(body, { 'channel': channel, 'method': method, 'properties': properties, 'body': body }) # Publish Messages to Physical Timeseries @@ -121,10 +130,12 @@ def on_message(channel, method, properties, body, plugin_name): # Log Errors for error in processed_message['errors']: + messages_processed_successfully.inc() std_logger.error(error) except Exception as e: # Log the exception + messages_processed_failed.inc() std_logger.exception('Error while processing message.') finally: diff --git a/src/python/mqtt_processors/plugins/Wombat.py b/src/python/mqtt_processors/plugins/Wombat.py index e0e29f6f..9a060ea2 100644 --- a/src/python/mqtt_processors/plugins/Wombat.py +++ b/src/python/mqtt_processors/plugins/Wombat.py @@ -4,16 +4,23 @@ import util.LoggingUtil as lu import api.client.DAO as dao +from prometheus_client import Counter +wombat_messages_received = Counter('wombat_messages_received_total', 'Total number of Wombat messages received') +wombat_messages_error = Counter('wombat_messages_error_total', 'Total number of Wombat messages with errors') +wombat_devices_created = Counter('wombat_devices_created_total', 'Total number of new Wombat devices created') + TOPICS = ['wombat'] def on_message(message, properties): correlation_id = str(uuid.uuid4()) lu.cid_logger.info(f'Message as received: {message}', extra={BrokerConstants.CORRELATION_ID_KEY: correlation_id}) + wombat_messages_received.inc() msg = {} try: msg = json.loads(message) except Exception as e: + wombat_messages_received.inc() raise Exception(f'JSON parsing failed') # This code could put the cid into msg (and does so later) and pass msg into the lu_cid @@ -36,6 +43,7 @@ def on_message(message, properties): pds = dao.get_pyhsical_devices_using_source_ids(BrokerConstants.WOMBAT, find_source_id) if len(pds) < 1: lu.cid_logger.info('Device not found, creating physical device.', extra=msg_with_cid) + wombat_devices_created.inc() props = { BrokerConstants.CREATION_CORRELATION_ID_KEY: correlation_id, diff --git a/src/python/mqtt_processors/plugins/YDOC.py b/src/python/mqtt_processors/plugins/YDOC.py index a7aab625..96c821d8 100644 --- a/src/python/mqtt_processors/plugins/YDOC.py +++ b/src/python/mqtt_processors/plugins/YDOC.py @@ -7,6 +7,10 @@ from pdmodels.Models import PhysicalDevice from typing import Dict, Optional +from prometheus_client import Counter +ydoc_messages_processed = Counter('ydoc_messages_processed_total', 'Total number of YDOC messages processed') +ydoc_messages_error = Counter('ydoc_messages_error_total', 'Total number of YDOC messages with errors') + # The default MQTT topic of YDOC devices is YDOC/ which RabbitMQ converts into a routing key of YDOC.. # It seems we can use the MQTT topic wildcard of # to get all YDOC messages. 'YDOC.#' TOPICS = ['YDOC.#'] @@ -27,6 +31,7 @@ def parse_ydoc_ts(ydoc_ts) -> Optional[datetime.datetime]: tzinfo=datetime.timezone(datetime.timedelta(hours=10))) return ts except Exception as err: + ydoc_messages_error.inc() logging.exception('parse_ydoc_ts error.') return None @@ -43,6 +48,7 @@ def parse_ydoc_ts(ydoc_ts) -> Optional[datetime.datetime]: _non_alpha_numeric = re.compile(r'\W') def process_message(msg_with_cid: Dict) -> Dict[str, Dict]: + ydoc_messages_processed.inc() # Create a map of the channel objects keyed by channel code to make it simple # to find the variable name, uom, etc while processing values. channels = {} From eb9bf339565ac1385458e1697dd492b80c2ab705 Mon Sep 17 00:00:00 2001 From: Sam Pixel York Date: Tue, 5 Sep 2023 14:11:50 +1000 Subject: [PATCH 2/2] import JSONResponse --- src/python/restapi/RestAPI.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/restapi/RestAPI.py b/src/python/restapi/RestAPI.py index 2496317d..8fd891b5 100644 --- a/src/python/restapi/RestAPI.py +++ b/src/python/restapi/RestAPI.py @@ -12,7 +12,7 @@ from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request, Response, status, Query from fastapi.security import HTTPBearer, HTTPBasic -#from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse from typing import Annotated, List, Dict from pdmodels.Models import DeviceNote, PhysicalDevice, LogicalDevice, PhysicalToLogicalMapping