From 2104186a245d1e1817c34fb0f7ae53a88f3d56c5 Mon Sep 17 00:00:00 2001 From: Kevin Donahue Date: Fri, 26 Jul 2024 21:41:56 -0400 Subject: [PATCH] store config in own module --- docker-compose.yml | 2 ++ pc/pc/config.py | 2 ++ pc/pc/main.py | 37 ++++++++++++++++++------------------- pc/pc/model.py | 13 +++++++++++++ pc/pc/websockets_handler.py | 33 +++++++++++++++++++++++++++++++++ 5 files changed, 68 insertions(+), 19 deletions(-) create mode 100644 pc/pc/model.py create mode 100644 pc/pc/websockets_handler.py diff --git a/docker-compose.yml b/docker-compose.yml index b2e0b64..624ac8b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -72,6 +72,8 @@ services: consumer: build: ./pc + ports: + - "8080:8080" environment: AMQP_HOST: "rabbitmq" PG_DATABASE: "postgres" diff --git a/pc/pc/config.py b/pc/pc/config.py index e2f0b75..0ab1fbf 100644 --- a/pc/pc/config.py +++ b/pc/pc/config.py @@ -13,3 +13,5 @@ AMQP_HOST = os.getenv("AMQP_HOST", "localhost") AMQP_PREDICTION_QUEUE = os.getenv("AMQP_PREDICTION_QUEUE", "prediction") +WS_HOST = os.getenv("WS_HOST", "consumer") +WS_PORT = int(os.getenv("WS_PORT", 8080)) diff --git a/pc/pc/main.py b/pc/pc/main.py index 27b9324..eebf8fb 100644 --- a/pc/pc/main.py +++ b/pc/pc/main.py @@ -1,29 +1,18 @@ import json import asyncio import logging -from dataclasses import dataclass import psycopg import aio_pika -import websockets from pc.config import * +from pc.model import BrightnessMessage +from pc.websockets_handler import WebSocketsHandler logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger(__name__) - -# FIXME should be defined elsewhere -@dataclass -class BrightnessMessage: - uuid: str - lat: float - lon: float - h3_id: str - utc_iso: str - utc_ns: int - mpsas: float - model_version: str +websockets_handler = WebSocketsHandler() def initialize_db(): @@ -56,30 +45,40 @@ def insert_brightness_message_in_db(message: BrightnessMessage): conn.commit() -async def main(): +async def consume_from_rabbitmq(): """create table in pg if needed and begin consuming messages from the queue, storing them in the predictions table""" - initialize_db() - try: amqp_connection = await aio_pika.connect_robust(f"amqp://{AMQP_USER}:{AMQP_PASSWORD}@{AMQP_HOST}") except Exception as e: import sys + log.error(f"could not form amqp connection because {e}; has rabbitmq started?") log.warning("exiting") sys.exit(1) else: async with amqp_connection: + channel = await amqp_connection.channel() queue = await channel.declare_queue(AMQP_PREDICTION_QUEUE) + async for m in queue: async with m.process(): + # serialize the message coming over the queue and add to postgres json_data = json.loads(m.body.decode()) - brightness_message = BrightnessMessage(**json_data) - insert_brightness_message_in_db(brightness_message) + message = BrightnessMessage(**json_data) + + insert_brightness_message_in_db(message) + await websockets_handler.broadcast(message) await asyncio.Future() +async def main(): + coroutines = [websockets_handler.setup(), consume_from_rabbitmq()] + await asyncio.gather(*coroutines) + + if __name__ == "__main__": + initialize_db() asyncio.run(main()) diff --git a/pc/pc/model.py b/pc/pc/model.py new file mode 100644 index 0000000..1c128ee --- /dev/null +++ b/pc/pc/model.py @@ -0,0 +1,13 @@ +from dataclasses import dataclass + + +@dataclass +class BrightnessMessage: + uuid: str + lat: float + lon: float + h3_id: str + utc_iso: str + utc_ns: int + mpsas: float + model_version: str diff --git a/pc/pc/websockets_handler.py b/pc/pc/websockets_handler.py new file mode 100644 index 0000000..9202be0 --- /dev/null +++ b/pc/pc/websockets_handler.py @@ -0,0 +1,33 @@ +import asyncio +import json +import logging +from dataclasses import asdict + +from websockets import serve, broadcast + +from pc.config import * +from pc.model import BrightnessMessage + +log = logging.getLogger(__name__) + + +class WebSocketsHandler: + clients = set() + + async def setup(self): + async def register_client(websocket): + log.info(f"registering {websocket}") + self.clients.add(websocket) + try: + await websocket.wait_closed() + finally: + self.clients.remove(websocket) + + async with serve(register_client, WS_HOST, WS_PORT): + await asyncio.Future() + + async def broadcast(self, message: BrightnessMessage): + """send the message to all websockets""" + log.info(f"broadcasting to {len(self.clients)} clients") + message_json = json.dumps(asdict(message)) + broadcast(self.clients, message_json)