Skip to content

Commit

Permalink
Merge pull request #73 from DPIclimate/issue_70
Browse files Browse the repository at this point in the history
Reject future timestamped messages, /messages REST API can now take both physical and logical device ids
  • Loading branch information
dajtxx authored May 27, 2024
2 parents 85d8907 + 9b4882f commit 665e477
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 123 deletions.
34 changes: 28 additions & 6 deletions src/python/api/client/DAO.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,6 @@ def add_raw_json_message(source_name: str, ts: datetime, correlation_uuid: str,
free_conn(conn)


@backoff.on_exception(backoff.expo, DAOException, max_time=30)
def insert_physical_timeseries_message(msg: Dict[str, Any]) -> None:
conn = None
p_uid = msg[BrokerConstants.PHYSICAL_DEVICE_UID_KEY]
Expand All @@ -988,8 +987,7 @@ def insert_physical_timeseries_message(msg: Dict[str, Any]) -> None:
free_conn(conn)


@backoff.on_exception(backoff.expo, DAOException, max_time=30)
def get_physical_timeseries_message(p_uid: int, start: datetime, end: datetime, count: int, only_timestamp: bool) -> List[Dict]:
def get_physical_timeseries_message(start: datetime | None = None, end: datetime | None = None, count: int | None = None, only_timestamp: bool = False, p_uid: int = None, l_uid: int = None) -> List[Dict]:
conn = None

if start is None:
Expand All @@ -1001,20 +999,44 @@ def get_physical_timeseries_message(p_uid: int, start: datetime, end: datetime,
if count < 1:
count = 1

if p_uid is None and l_uid is None:
raise DAOException('p_uid or l_uid must be supplied.')

if p_uid is not None and l_uid is not None:
raise DAOException('Both p_uid and l_uid were provided, only give one.')

if p_uid is not None:
uid_col_name = 'physical_uid'
uid = p_uid
else:
uid_col_name = 'logical_uid'
uid = l_uid

if not isinstance(uid, int):
raise TypeError

if not isinstance(start, datetime):
raise TypeError

if not isinstance(end, datetime):
raise TypeError

column_name = 'ts' if only_timestamp else 'json_msg'

try:
# Order messages by descending timestamp. If a caller asks for one message, they probably want the latest
# message.
with _get_connection() as conn, conn.cursor() as cursor:
qry = f"""
select {column_name} from physical_timeseries
where physical_uid = %s
where {uid_col_name} = %s
and ts > %s
and ts <= %s
order by ts asc
order by ts desc
limit %s
"""

args = (p_uid, start, end, count)
args = (uid, start, end, count)
cursor.execute(qry, args)
return [row[0] for row in cursor.fetchall()]
except Exception as err:
Expand Down
77 changes: 51 additions & 26 deletions src/python/logical_mapper/LogicalMapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,35 @@
"""

import asyncio, json, logging, signal
import datetime

import dateutil.parser

import BrokerConstants
from pika.exchange_type import ExchangeType
import api.client.RabbitMQ as mq
import api.client.DAO as dao
import util.LoggingUtil as lu

rx_channel = None
tx_channel = None
mq_client = None
finish = False
_rx_channel = None
_tx_channel = None
_mq_client = None
_finish = False

_max_delta = datetime.timedelta(hours=-1)


def sigterm_handler(sig_no, stack_frame) -> None:
"""
Handle SIGTERM from docker by closing the mq and db connections and setting a
flag to tell the main loop to exit.
"""
global finish, mq_client
global _finish, _mq_client

logging.info(f'{signal.strsignal(sig_no)}, setting finish to True')
finish = True
logging.info(f'{signal.strsignal(sig_no)}, setting _finish to True')
_finish = True
dao.stop()
mq_client.stop()
_mq_client.stop()


async def main():
Expand All @@ -53,25 +58,25 @@ async def main():
It would be good to find a better way to do nothing than the current loop.
"""
global mq_client, rx_channel, tx_channel, finish
global _mq_client, _rx_channel, _tx_channel, _finish

logging.info('===============================================================')
logging.info(' STARTING LOGICAL MAPPER')
logging.info('===============================================================')

rx_channel = mq.RxChannel(exchange_name=BrokerConstants.PHYSICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout, queue_name='lm_physical_timeseries', on_message=on_message)
tx_channel = mq.TxChannel(exchange_name=BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout)
mq_client = mq.RabbitMQConnection(channels=[rx_channel, tx_channel])
_rx_channel = mq.RxChannel(exchange_name=BrokerConstants.PHYSICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout, queue_name='lm_physical_timeseries', on_message=on_message)
_tx_channel = mq.TxChannel(exchange_name=BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout)
_mq_client = mq.RabbitMQConnection(channels=[_rx_channel, _tx_channel])

asyncio.create_task(mq_client.connect())
asyncio.create_task(_mq_client.connect())

while not (rx_channel.is_open and tx_channel.is_open):
while not (_rx_channel.is_open and _tx_channel.is_open):
await asyncio.sleep(0)

while not finish:
while not _finish:
await asyncio.sleep(2)

while not mq_client.stopped:
while not _mq_client.stopped:
await asyncio.sleep(1)


Expand All @@ -80,14 +85,14 @@ def on_message(channel, method, properties, body):
This function is called when a message arrives from RabbitMQ.
"""

global rx_channel, tx_channel, finish
global _rx_channel, _tx_channel, _finish

delivery_tag = method.delivery_tag

# If the finish flag is set, reject the message so RabbitMQ will re-queue it
# If the _finish flag is set, reject the message so RabbitMQ will re-queue it
# and return early.
if finish:
rx_channel._channel.basic_reject(delivery_tag)
if _finish:
_rx_channel._channel.basic_reject(delivery_tag)
return

try:
Expand All @@ -99,7 +104,7 @@ def on_message(channel, method, properties, body):
lu.cid_logger.error(f'Physical device not found, cannot continue. Dropping message.', extra=msg)
# Ack the message, even though we cannot process it. We don't want it redelivered.
# We can change this to a Nack if that would provide extra context somewhere.
rx_channel._channel.basic_ack(delivery_tag)
_rx_channel._channel.basic_ack(delivery_tag)
return

lu.cid_logger.info(f'Accepted message from {pd.name}', extra=msg)
Expand All @@ -116,27 +121,47 @@ def on_message(channel, method, properties, body):

# Ack the message, even though we cannot process it. We don't want it redelivered.
# We can change this to a Nack if that would provide extra context somewhere.
rx_channel._channel.basic_ack(delivery_tag)
_rx_channel._channel.basic_ack(delivery_tag)
return

msg[BrokerConstants.LOGICAL_DEVICE_UID_KEY] = mapping.ld.uid

dao.insert_physical_timeseries_message(msg)

ld = mapping.ld
ld.last_seen = msg[BrokerConstants.TIMESTAMP_KEY]

# Determine if the message has a future timestamp.
ts_str: datetime.datetime = msg[BrokerConstants.TIMESTAMP_KEY]
ts = dateutil.parser.isoparse(ts_str)
utc_now = datetime.datetime.now(datetime.timezone.utc)
ts_delta = utc_now - ts

# Drop messages with a timestamp more than 1 hour in the future.
if ts_delta < _max_delta:
lu.cid_logger.warning(f'Message with future timestamp. Dropping message.', extra=msg)
# Ack the message, even though we cannot process it. We don't want it redelivered.
# We can change this to a Nack if that would provide extra context somewhere.
_rx_channel._channel.basic_ack(delivery_tag)
return

if ts > utc_now:
# If the timestamp is a bit in the future then make the last seen time 'now'.
ld.last_seen = utc_now
else:
ld.last_seen = ts

lu.cid_logger.info(f'Timestamp from message for LD last seen update: {ld.last_seen}', extra=msg)
ld.properties[BrokerConstants.LAST_MSG] = msg
dao.update_logical_device(ld)

tx_channel.publish_message('logical_timeseries', msg)
_tx_channel.publish_message('logical_timeseries', msg)

# This tells RabbitMQ the message is handled and can be deleted from the queue.
rx_channel._channel.basic_ack(delivery_tag)
_rx_channel._channel.basic_ack(delivery_tag)

except BaseException as e:
logging.exception('Error while processing message')
rx_channel._channel.basic_ack(delivery_tag)
_rx_channel._channel.basic_ack(delivery_tag)


if __name__ == '__main__':
Expand Down
17 changes: 12 additions & 5 deletions src/python/restapi/RestAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,11 @@ async def end_mapping_of_logical_uid(uid: int) -> None:
MESSAGE RELATED
--------------------------------------------------------------------------"""

@router.get("/physical/messages/{uid}", tags=['messages'])
@router.get("/messages", tags=['Messages'])
async def get_physical_timeseries(
request: Request,
uid: int,
p_uid: int | None = None,
l_uid: int | None = None,
count: Annotated[int | None, Query(gt=0, le=65536)] = None,
last: str = None,
start: datetime.datetime = None,
Expand All @@ -471,7 +472,8 @@ async def get_physical_timeseries(
Args:
request: The HTTP request object.
uid: The unique identifier of the physical device.
p_uid: The unique identifier of a physical device. Mutually exclusive with l_uid.
l_uid: The unique identifier of a logical device. Mutually exclusive with p_uid.
count: The maximum number of entries to return.
last: Return messages from the last nx interval where n is a number and x is 'h'ours, 'd'ays, 'w'eeks, 'm'onths, 'y'ears.
start: The start date and time of the time range.
Expand All @@ -485,7 +487,6 @@ async def get_physical_timeseries(
HTTPException: If an error occurs.
"""
try:
#logging.info(f'start: {start.isoformat() if start is not None else start}, end: {end.isoformat() if end is not None else end}, count: {count}, only_timestamp: {only_timestamp}')
if end is not None:
if start is not None and start >= end:
raise HTTPException(status_code=422, detail={"detail": [{"loc": ["query", "start"], "msg": "ensure start value is less than end"}]})
Expand Down Expand Up @@ -518,7 +519,12 @@ async def get_physical_timeseries(

start = end - diff

msgs = dao.get_physical_timeseries_message(uid, start, end, count, only_timestamp)
msgs = None
if p_uid is not None:
msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, p_uid=p_uid)
elif l_uid is not None:
msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, l_uid=l_uid)

if msgs is None:
raise HTTPException(status_code=404, detail="Failed to retrieve messages")

Expand All @@ -528,6 +534,7 @@ async def get_physical_timeseries(
except dao.DAOException as err:
raise HTTPException(status_code=500, detail=err.msg)


"""--------------------------------------------------------------------------
USER AUTHENTICATION
--------------------------------------------------------------------------"""
Expand Down
Loading

0 comments on commit 665e477

Please sign in to comment.