Skip to content

Commit

Permalink
Minor bug fixes and some renaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
dajtxx committed Jul 31, 2024
1 parent 68fb8b7 commit 3829cee
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 266 deletions.
13 changes: 7 additions & 6 deletions src/python/broker-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def str_to_logical_device(val) -> LogicalDevice:


def str_to_dict(val) -> Dict:
print(val, type(val))
return json.loads(val)


Expand Down Expand Up @@ -207,7 +208,7 @@ def plain_pd_list(devs: List[PhysicalDevice]):


def dict_from_file_or_string() -> dict:
if (hasattr(args, 'pd') or hasattr(args, 'ld')) and (hasattr(args, 'in_filename') and args.in_filename is not None):
if ((hasattr(args, 'pd') and args.pd is not None) or (hasattr(args, 'ld') and args.ld is not None)) and (hasattr(args, 'in_filename') and args.in_filename is not None):
raise RuntimeError('error: --json and --file are mutually exclusive.')

json_obj = None
Expand Down Expand Up @@ -283,7 +284,7 @@ def main() -> None:

dev = PhysicalDevice.parse_obj(dev)
print(pretty_print_json(dao.update_physical_device(dev)))

elif args.cmd2 == 'rm':
# Delete all physical_logical mappings to avoid foreign key violation
mappings = dao.get_physical_device_mappings(pd=args.p_uid)
Expand Down Expand Up @@ -373,9 +374,9 @@ def main() -> None:
current_mapping = dao.get_current_device_mapping(pd=args.p_uid, ld=args.l_uid)
if current_mapping is None:
raise RuntimeError("No current mapping for the uid given")

dao.toggle_device_mapping(args.enable, args.p_uid, args.l_uid)

elif args.cmd1 == 'users':
if args.cmd2 == 'add':
dao.user_add(uname=args.uname, passwd=args.passwd, disabled=args.disabled)
Expand All @@ -391,13 +392,13 @@ def main() -> None:

elif args.enable == True:
dao.token_enable(uname=args.uname)

if args.refresh == True:
dao.token_refresh(uname=args.uname)

elif args.cmd2 == 'chng':
dao.user_change_password(args.uname, args.passwd)

elif args.cmd2 == 'ls':
print(dao.user_ls())

Expand Down
31 changes: 30 additions & 1 deletion src/python/delivery/BaseWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ def __init__(self, name) -> None:
signal.signal(signal.SIGTERM, self.sigterm_handler)

def run(self) -> None:
"""
This method runs the blocking MQTT loop, waiting for messages from upstream
and writing them to the backing table. A separate thread reads them from
the backing table and attempts to deliver them.
"""
logging.info('===============================================================')
logging.info(f' STARTING {self.name.upper()} WRITER')
logging.info('===============================================================')
Expand Down Expand Up @@ -92,7 +97,7 @@ def run(self) -> None:
logging.exception(err)
break

except pika.exceptions.AMQPConnectionError:
except pika.exceptions.AMQPConnectionError as err:
logging.exception(err)
logging.warning('Connection was closed, retrying after a pause.')
time.sleep(10)
Expand All @@ -108,6 +113,12 @@ def run(self) -> None:
delivery_thread.join()

def delivery_thread_proc(self) -> None:
"""
This method runs in a separate thread and reads messages from the backing table,
calling the on_message handler for each one. It also removes messages from the
backing table when on_message returns MSG_OK or MSG_FAIL, and maintains the retry_count
attribute on MSG_RETRY returns.
"""
logging.info('Delivery threat started')
while self.keep_running:
count = dao.get_delivery_msg_count(self.name)
Expand Down Expand Up @@ -136,6 +147,8 @@ def delivery_thread_proc(self) -> None:
lu.cid_logger.error(f'Could not find logical device, dropping message: {msg}', extra=msg)
dao.remove_delivery_msg(self.name, msg_uid)

lu.cid_logger.info(f'{pd.name} / {ld.name}', extra=msg)

rc = self.on_message(pd, ld, msg, retry_count)
if rc == BaseWriter.MSG_OK:
lu.cid_logger.info('Message processed ok.', extra=msg)
Expand All @@ -157,6 +170,22 @@ def delivery_thread_proc(self) -> None:
logging.info('Delivery threat stopped.')

def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retry_count: int) -> int:
"""
Subclasses must override this method and perform all transformation and delivery during
its execution.
Implementations must decide what constitutes a temporary failure that can be retried, how
many times it is reasonable to retry, and when a message is undeliverable.
pd: The PhysicalDevice that sent the message.
ld: The LogicalDevice to deliver the message to.
msg: The message content, in IoTa format.
retry_count: How many times this method has returned MSG_RETRY so far.
return BaseWriter.MSG_OK to signal the message has been delivered and can be removed from the backing table.
BaseWriter.MSG_RETRY to signal a temporary delivery failure which is retryable.
BaseWriter.MSG_FAIL to signal the message cannot be delivered, or has failed too many times.
"""
lu.cid_logger.info(f'{pd.name} / {ld.name} / {retry_count}: {msg}', extra=msg)
return BaseWriter.MSG_OK

Expand Down
192 changes: 37 additions & 155 deletions src/python/delivery/FRRED.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,185 +14,67 @@
batch has been successfully processed.
"""

import dateutil.parser
from typing import Any

import json, logging, math, os, signal, sys, time, uuid
import json, logging, os, sys, time

import BrokerConstants

import pika, pika.channel, pika.spec
from pika.exchange_type import ExchangeType

import api.client.Ubidots as ubidots

import api.client.DAO as dao
import util.LoggingUtil as lu

_user = os.environ['RABBITMQ_DEFAULT_USER']
_passwd = os.environ['RABBITMQ_DEFAULT_PASS']
_host = os.environ['RABBITMQ_HOST']
_port = os.environ['RABBITMQ_PORT']

_amqp_url_str = f'amqp://{_user}:{_passwd}@{_host}:{_port}/%2F'

_channel = None
from delivery.BaseWriter import BaseWriter
from pdmodels.Models import LogicalDevice, PhysicalDevice

_finish = False

_q_name = 'databolt_delivery'

_raw_data_name = '/raw_data'


def sigterm_handler(sig_no, stack_frame) -> None:
"""
Handle SIGTERM from docker by closing the mq and db connections and setting a
flag to tell the main loop to exit.
"""
global _finish, _channel

logging.info(f'{signal.strsignal(sig_no)}, setting finish to True')
_finish = True
dao.stop()

# This breaks the endless loop in main.
_channel.cancel()
class DataboltWriter(BaseWriter):
def __init__(self) -> None:
super().__init__('databolt')

def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retry_count: int) -> int:
try:
# Only messages from Wombat or Axistech nodes should be processed for SCMN.
if pd.source_name not in [BrokerConstants.WOMBAT, BrokerConstants.AXISTECH]:
lu.cid_logger.info(f'Rejecting message from source {pd.source_name}', extra=msg)
return DataboltWriter.MSG_OK

def main():
"""
Initiate the connection to RabbitMQ and then idle until asked to stop.
Because the messages from RabbitMQ arrive via async processing this function
has nothing to do after starting connection.
It would be good to find a better way to do nothing than the current loop.
"""
global _channel, _finish, _q_name
# May as well use the message context id for the DataBolt directory and file name.
if not os.path.isdir(_raw_data_name):
lu.cid_logger.error(f'DataBolt {_raw_data_name} directory not found. This should be a mounted volume shared with the DataBolt container.', extra=msg)
sys.exit(1)

logging.info('===============================================================')
logging.info(' STARTING FRRED DATABOLT WRITER')
logging.info('===============================================================')
msg_uuid = msg[BrokerConstants.CORRELATION_ID_KEY]

logging.info('Opening connection')
connection = None
conn_attempts = 0
backoff = 10
while connection is None:
try:
connection = pika.BlockingConnection(pika.URLParameters(_amqp_url_str))
except:
conn_attempts += 1
logging.warning(f'Connection to RabbitMQ attempt {conn_attempts} failed.')

if conn_attempts % 5 == 0 and backoff < 60:
backoff += 10

time.sleep(backoff)

logging.info('Opening channel')
_channel = connection.channel()
_channel.basic_qos(prefetch_count=1)
logging.info('Declaring exchange')
_channel.exchange_declare(
exchange=BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME,
exchange_type=ExchangeType.fanout,
durable=True)
logging.info('Declaring queue')
_channel.queue_declare(queue=_q_name, durable=True)
logging.info('Binding queue to exchange')
_channel.queue_bind(_q_name, BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, 'logical_timeseries')

# This loops until _channel.cancel is called in the signal handler.
for method, properties, body in _channel.consume(_q_name):
on_message(_channel, method, properties, body)

logging.info('Closing connection')
connection.close()


def on_message(channel, method, properties, body):
"""
This function is called when a message arrives from RabbitMQ.
"""

global _channel, _finish

delivery_tag = method.delivery_tag

# If the finish flag is set, reject the message so RabbitMQ will re-queue it
# and return early.
if _finish:
lu.cid_logger.info(f'NACK delivery tag {delivery_tag}, _finish is True', extra=msg)
_channel.basic_reject(delivery_tag)
return

try:
# Parse the message just to confirm it is valid JSON.
msg = json.loads(body)
p_uid = msg[BrokerConstants.PHYSICAL_DEVICE_UID_KEY]
l_uid = msg[BrokerConstants.LOGICAL_DEVICE_UID_KEY]

# TODO: Look into routing keys for deciding which messages to pass on to Intersect.

# Only messages from Wombat nodes should be processed. This is a bit coarse-grained
# because Wombats could conceivably be used for other projects, but that is very
# unlikely now. To deterimine if the message is from a Wombat, look up the physical
# device and check the source_name.
pd = dao.get_physical_device(p_uid)
if pd is None:
lu.cid_logger.error(f'Physical device not found, cannot continue. Dropping message.', extra=msg)
# Ack the message, even though we cannot process it. We don't want it redelivered.
# We can change this to a Nack if that would provide extra context somewhere.
_channel.basic_ack(delivery_tag)
return

if pd.source_name not in [BrokerConstants.WOMBAT, BrokerConstants.AXISTECH]:
_channel.basic_ack(delivery_tag)
return

lu.cid_logger.info(f'Physical device: {pd.name}', extra=msg)

# May as well use the message context id for the DataBolt directory and file name.
if not os.path.isdir(_raw_data_name):
logging.error(f'DataBolt {_raw_data_name} directory not found. This should be a mounted volume shared with the DataBolt container.')
sys.exit(1)

msg_uuid = msg[BrokerConstants.CORRELATION_ID_KEY]

old_umask = os.umask(0)
try:
lu.cid_logger.info(json.dumps(msg), extra=msg)
os.mkdir(f'{_raw_data_name}/{msg_uuid}')
with open(f'{_raw_data_name}/{msg_uuid}/{msg_uuid}.json', 'w') as f:
# The body argument is bytes, not a string. Using json.dump is a
# simple way to get a string written to the file.
json.dump(msg, f)
# Set a permissive umask to try and avoid problems with user-based file permissions between
# different containers and the host system.
old_umask = os.umask(0)
try:
os.mkdir(f'{_raw_data_name}/{msg_uuid}')
with open(f'{_raw_data_name}/{msg_uuid}/{msg_uuid}.json', 'w') as f:
# The body argument is bytes, not a string. Using json.dump is a
# simple way to get a string written to the file.
json.dump(msg, f)

except:
logging.exception('Failed to write message to DataBolt directory.')
_channel.basic_ack(delivery_tag)
sys.exit(1)
except:
lu.cid_logger.exception('Failed to write message to DataBolt directory.', extra=msg)
return DataboltWriter.MSG_FAIL

os.umask(old_umask)
# Put the old umask back in case some other file operations are done in the base class.
os.umask(old_umask)

# This tells RabbitMQ the message is handled and can be deleted from the queue.
_channel.basic_ack(delivery_tag)
return DataboltWriter.MSG_OK

except BaseException:
logging.exception('Error while processing message.')
_channel.basic_reject(delivery_tag, requeue=False)
except BaseException:
lu.cid_logger.exception('Error while processing message.', extra=msg)
return DataboltWriter.MSG_FAIL


if __name__ == '__main__':
if not os.path.isdir(_raw_data_name):
logging.error(f'DataBolt {_raw_data_name} directory not found. This should be a mounted volume shared with the DataBolt container.')
sys.exit(1)

# Docker sends SIGTERM to tell the process the container is stopping so set
# a handler to catch the signal and initiate an orderly shutdown.
signal.signal(signal.SIGTERM, sigterm_handler)

# Does not return until SIGTERM is received.
main()
DataboltWriter().run()
logging.info('Exiting.')
30 changes: 30 additions & 0 deletions src/python/delivery/LTSLogger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
A test delivery service. Simply logs the messages it receives.
"""

from typing import Any

import logging

import util.LoggingUtil as lu

from delivery.BaseWriter import BaseWriter
from pdmodels.Models import LogicalDevice, PhysicalDevice

class LTSLogger(BaseWriter):
def __init__(self) -> None:
super().__init__('ltslogger')

def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retry_count: int) -> int:
try:
lu.cid_logger.info(f'{msg}, retry_count = {retry_count}', extra=msg)
return LTSLogger.MSG_OK

except BaseException:
logging.exception('Error while processing message.')
return LTSLogger.MSG_FAIL


if __name__ == '__main__':
LTSLogger().run()
logging.info('Exiting.')
Loading

0 comments on commit 3829cee

Please sign in to comment.