Skip to content

Commit

Permalink
Merge pull request #24 from CSU-ITC303-ITC309-2023-Team8/feature/moni…
Browse files Browse the repository at this point in the history
…toring_mqtt_refactor

Feature/monitoring mqtt refactor
  • Loading branch information
SamPixelYork authored Sep 5, 2023
2 parents 2b78e6a + eb9bf33 commit 0949af9
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
11 changes: 11 additions & 0 deletions src/python/mqtt_processors/MQTTProcessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

import mqtt_processors.plugins as plugins

from prometheus_client import start_http_server, Counter

# Prometheus Metrics
messages_received = Counter('mqtt_messages_received_total', 'Total number of MQTT messages received')
messages_processed_successfully = Counter('mqtt_messages_processed_successfully_total', 'Total number of MQTT messages processed successfully')
messages_processed_failed = Counter('mqtt_messages_processed_failed_total', 'Total number of MQTT messages processed with failures')
start_http_server(8000) # Starting Prometheus server on port 8000

std_logger = logging.getLogger(__name__)

tx_channel: mq.TxChannel = None
Expand Down Expand Up @@ -113,6 +121,7 @@ def on_message(channel, method, properties, body, plugin_name):
# process message
std_logger.info(f"{channel}")
std_logger.info(f"Message Received for {plugin_name}")
messages_received.inc()
processed_message = plugin_modules[plugin_name].on_message(body, { 'channel': channel, 'method': method, 'properties': properties, 'body': body })

# Publish Messages to Physical Timeseries
Expand All @@ -121,10 +130,12 @@ def on_message(channel, method, properties, body, plugin_name):

# Log Errors
for error in processed_message['errors']:
messages_processed_successfully.inc()
std_logger.error(error)

except Exception as e:
# Log the exception
messages_processed_failed.inc()
std_logger.exception('Error while processing message.')

finally:
Expand Down
8 changes: 8 additions & 0 deletions src/python/mqtt_processors/plugins/Wombat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,23 @@
import util.LoggingUtil as lu
import api.client.DAO as dao

from prometheus_client import Counter
wombat_messages_received = Counter('wombat_messages_received_total', 'Total number of Wombat messages received')
wombat_messages_error = Counter('wombat_messages_error_total', 'Total number of Wombat messages with errors')
wombat_devices_created = Counter('wombat_devices_created_total', 'Total number of new Wombat devices created')

TOPICS = ['wombat']

def on_message(message, properties):
correlation_id = str(uuid.uuid4())
lu.cid_logger.info(f'Message as received: {message}', extra={BrokerConstants.CORRELATION_ID_KEY: correlation_id})
wombat_messages_received.inc()

msg = {}
try:
msg = json.loads(message)
except Exception as e:
wombat_messages_received.inc()
raise Exception(f'JSON parsing failed')

# This code could put the cid into msg (and does so later) and pass msg into the lu_cid
Expand All @@ -36,6 +43,7 @@ def on_message(message, properties):
pds = dao.get_pyhsical_devices_using_source_ids(BrokerConstants.WOMBAT, find_source_id)
if len(pds) < 1:
lu.cid_logger.info('Device not found, creating physical device.', extra=msg_with_cid)
wombat_devices_created.inc()

props = {
BrokerConstants.CREATION_CORRELATION_ID_KEY: correlation_id,
Expand Down
6 changes: 6 additions & 0 deletions src/python/mqtt_processors/plugins/YDOC.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
from pdmodels.Models import PhysicalDevice
from typing import Dict, Optional

from prometheus_client import Counter
ydoc_messages_processed = Counter('ydoc_messages_processed_total', 'Total number of YDOC messages processed')
ydoc_messages_error = Counter('ydoc_messages_error_total', 'Total number of YDOC messages with errors')

# The default MQTT topic of YDOC devices is YDOC/<serial#> which RabbitMQ converts into a routing key of YDOC.<serial#>.
# It seems we can use the MQTT topic wildcard of # to get all YDOC messages. 'YDOC.#'
TOPICS = ['YDOC.#']
Expand All @@ -27,6 +31,7 @@ def parse_ydoc_ts(ydoc_ts) -> Optional[datetime.datetime]:
tzinfo=datetime.timezone(datetime.timedelta(hours=10)))
return ts
except Exception as err:
ydoc_messages_error.inc()
logging.exception('parse_ydoc_ts error.')

return None
Expand All @@ -43,6 +48,7 @@ def parse_ydoc_ts(ydoc_ts) -> Optional[datetime.datetime]:
_non_alpha_numeric = re.compile(r'\W')

def process_message(msg_with_cid: Dict) -> Dict[str, Dict]:
ydoc_messages_processed.inc()
# Create a map of the channel objects keyed by channel code to make it simple
# to find the variable name, uom, etc while processing values.
channels = {}
Expand Down
2 changes: 1 addition & 1 deletion src/python/restapi/RestAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request, Response, status, Query
from fastapi.security import HTTPBearer, HTTPBasic

#from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse
from typing import Annotated, List, Dict

from pdmodels.Models import DeviceNote, PhysicalDevice, LogicalDevice, PhysicalToLogicalMapping
Expand Down

0 comments on commit 0949af9

Please sign in to comment.