Skip to content

Commit

Permalink
store config in own module
Browse files Browse the repository at this point in the history
  • Loading branch information
nonnontrivial committed Jul 27, 2024
1 parent e315010 commit 2104186
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 19 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ services:

consumer:
build: ./pc
ports:
- "8080:8080"
environment:
AMQP_HOST: "rabbitmq"
PG_DATABASE: "postgres"
Expand Down
2 changes: 2 additions & 0 deletions pc/pc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@
AMQP_HOST = os.getenv("AMQP_HOST", "localhost")
AMQP_PREDICTION_QUEUE = os.getenv("AMQP_PREDICTION_QUEUE", "prediction")

WS_HOST = os.getenv("WS_HOST", "consumer")
WS_PORT = int(os.getenv("WS_PORT", 8080))
37 changes: 18 additions & 19 deletions pc/pc/main.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,18 @@
import json
import asyncio
import logging
from dataclasses import dataclass

import psycopg
import aio_pika
import websockets

from pc.config import *
from pc.model import BrightnessMessage
from pc.websockets_handler import WebSocketsHandler

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)


# FIXME should be defined elsewhere
@dataclass
class BrightnessMessage:
uuid: str
lat: float
lon: float
h3_id: str
utc_iso: str
utc_ns: int
mpsas: float
model_version: str
websockets_handler = WebSocketsHandler()


def initialize_db():
Expand Down Expand Up @@ -56,30 +45,40 @@ def insert_brightness_message_in_db(message: BrightnessMessage):
conn.commit()


async def main():
async def consume_from_rabbitmq():
"""create table in pg if needed and begin consuming messages from the queue,
storing them in the predictions table"""
initialize_db()

try:
amqp_connection = await aio_pika.connect_robust(f"amqp://{AMQP_USER}:{AMQP_PASSWORD}@{AMQP_HOST}")
except Exception as e:
import sys

log.error(f"could not form amqp connection because {e}; has rabbitmq started?")
log.warning("exiting")
sys.exit(1)
else:
async with amqp_connection:

channel = await amqp_connection.channel()
queue = await channel.declare_queue(AMQP_PREDICTION_QUEUE)

async for m in queue:
async with m.process():
# serialize the message coming over the queue and add to postgres
json_data = json.loads(m.body.decode())
brightness_message = BrightnessMessage(**json_data)
insert_brightness_message_in_db(brightness_message)
message = BrightnessMessage(**json_data)

insert_brightness_message_in_db(message)
await websockets_handler.broadcast(message)

await asyncio.Future()


async def main():
coroutines = [websockets_handler.setup(), consume_from_rabbitmq()]
await asyncio.gather(*coroutines)


if __name__ == "__main__":
initialize_db()
asyncio.run(main())
13 changes: 13 additions & 0 deletions pc/pc/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from dataclasses import dataclass


@dataclass
class BrightnessMessage:
uuid: str
lat: float
lon: float
h3_id: str
utc_iso: str
utc_ns: int
mpsas: float
model_version: str
33 changes: 33 additions & 0 deletions pc/pc/websockets_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio
import json
import logging
from dataclasses import asdict

from websockets import serve, broadcast

from pc.config import *
from pc.model import BrightnessMessage

log = logging.getLogger(__name__)


class WebSocketsHandler:
clients = set()

async def setup(self):
async def register_client(websocket):
log.info(f"registering {websocket}")
self.clients.add(websocket)
try:
await websocket.wait_closed()
finally:
self.clients.remove(websocket)

async with serve(register_client, WS_HOST, WS_PORT):
await asyncio.Future()

async def broadcast(self, message: BrightnessMessage):
"""send the message to all websockets"""
log.info(f"broadcasting to {len(self.clients)} clients")
message_json = json.dumps(asdict(message))
broadcast(self.clients, message_json)

0 comments on commit 2104186

Please sign in to comment.