Skip to content

Commit

Permalink
use orm in consumer for observation storage
Browse files Browse the repository at this point in the history
  • Loading branch information
nonnontrivial committed Oct 1, 2024
1 parent 3cf5534 commit fd8a926
Show file tree
Hide file tree
Showing 26 changed files with 123 additions and 170 deletions.
2 changes: 1 addition & 1 deletion api/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
astroplan==0.9.1
astropy==6.0.0
pandas==2.1.4
torch~=2.2.2
torch~=2.2.x
requests==2.31.0
fastapi~=0.110.2
httpx==0.26.0
Expand Down
4 changes: 1 addition & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3"

services:
postgres:
image: "postgres:latest"
Expand Down Expand Up @@ -65,8 +63,8 @@ services:
build: ./pp
environment:
API_VERSION: "v1"
MODEL_VERSION: "0.1.0"
API_HOST: "api"
MODEL_VERSION: "0.1.0"
KEYDB_HOST: "keydb"
RABBITMQ_HOST: "rabbitmq"
TASK_SLEEP_INTERVAL: "0.5"
Expand Down
3 changes: 2 additions & 1 deletion pc/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
FROM python:3.11.7-slim-bullseye
ARG VERSION=3.12.6-slim-bookworm
FROM python:${VERSION}

LABEL maintainer="Kevin Donahue <nonnontrivial@gmail.com>"

Expand Down
7 changes: 3 additions & 4 deletions pc/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# pc

prediction consumer.
> prediction consumer.
pulls predictions messages off of prediction queue and into postgres and websockets.

## connect to timescale instance
Pulls brightness observation messages off of the prediction queue and into postgres.

```shell
# connect to postgres instance
psql -d "postgres://postgres:password@localhost/postgres"
```
6 changes: 1 addition & 5 deletions pc/pc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@
PG_DATABASE = os.getenv("PG_DATABASE", "localhost")
PG_HOST = os.getenv("PG_HOST", "postgres")
PG_PORT = int(os.getenv("PG_PORT", 5432))

pg_dsn = f"dbname={PG_DATABASE} user={PG_USER} password={PG_PASSWORD} host={PG_HOST}"
pg_dsn = f"postgres://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DATABASE}"

AMQP_USER = os.getenv("AMQP_USER", "guest")
AMQP_PASSWORD = os.getenv("AMQP_PASSWORD", "guest")
AMQP_HOST = os.getenv("AMQP_HOST", "localhost")
AMQP_PREDICTION_QUEUE = os.getenv("AMQP_PREDICTION_QUEUE", "prediction")

WS_HOST = os.getenv("WS_HOST", "consumer")
WS_PORT = int(os.getenv("WS_PORT", 8090))
74 changes: 4 additions & 70 deletions pc/pc/main.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,18 @@
import json
import asyncio
import logging

import psycopg
import aio_pika

from pc.config import *
from pc.model import BrightnessMessage
from pc.websockets_handler import WebSocketsHandler
from pc.persistence.db import initialize_db
from pc.rabbitmq import consume_brightness_observations

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger(__name__)

websockets_handler = WebSocketsHandler()


def initialize_db():
"""create the predictions table if it does not exist"""
with psycopg.connect(pg_dsn) as conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS predictions (
id serial PRIMARY KEY,
h3_id text NOT NULL,
utc_iso text NOT NULL,
utc_ns bigint NOT NULL,
mpsas real NOT NULL,
model_version text NOT NULL
)
""")
conn.commit()


def insert_brightness_message_in_db(message: BrightnessMessage):
"""insert subset of brightness message into the predictions table"""
with psycopg.connect(pg_dsn) as conn:
with conn.cursor() as cur:
log.info(f"inserting brightness message for {message.h3_id}")

cur.execute("""
INSERT INTO predictions (h3_id, utc_iso, utc_ns, mpsas, model_version)
VALUES (%s, %s, %s, %s, %s)
""", (message.h3_id, message.utc_iso, message.utc_ns, message.mpsas, message.model_version))
conn.commit()


async def consume_from_rabbitmq():
"""create table in pg if needed and begin consuming messages from the queue,
storing them in the predictions table"""
try:
amqp_connection = await aio_pika.connect_robust(f"amqp://{AMQP_USER}:{AMQP_PASSWORD}@{AMQP_HOST}")
except Exception as e:
import sys

log.error(f"could not form amqp connection because {e}; has rabbitmq started?")
log.warning("exiting")
sys.exit(1)
else:
async with amqp_connection:

channel = await amqp_connection.channel()
queue = await channel.declare_queue(AMQP_PREDICTION_QUEUE)

async for m in queue:
async with m.process():
# serialize the message coming over the queue and add to postgres
json_data = json.loads(m.body.decode())
message = BrightnessMessage(**json_data)

insert_brightness_message_in_db(message)
await websockets_handler.broadcast(message)

await asyncio.Future()


async def main():
coroutines = [websockets_handler.setup(), consume_from_rabbitmq()]
"""run the primary coroutines together"""
coroutines = [initialize_db(), consume_brightness_observations()]
await asyncio.gather(*coroutines)


if __name__ == "__main__":
initialize_db()
asyncio.run(main())
File renamed without changes.
15 changes: 15 additions & 0 deletions pc/pc/persistence/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import logging

from pc.config import pg_dsn
from tortoise import Tortoise

log = logging.getLogger(__name__)


async def initialize_db():
log.info(f"initializing db at {pg_dsn}")
await Tortoise.init(
db_url=pg_dsn,
modules={"models": ["pc.persistence.models"]}
)
await Tortoise.generate_schemas()
14 changes: 14 additions & 0 deletions pc/pc/persistence/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from tortoise import fields, models


class BrightnessObservation(models.Model):
uuid = fields.CharField(primary_key=True, max_length=36)
lat = fields.FloatField()
lon = fields.FloatField()
h3_id = fields.CharField(max_length=15)
utc_iso = fields.CharField(max_length=30)
mpsas = fields.FloatField()
model_version = fields.CharField(max_length=36)

def __str__(self):
return f"{self.h3_id}:{self.uuid}"
43 changes: 43 additions & 0 deletions pc/pc/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import json
import logging
import asyncio

import aio_pika

from pc.config import *
from pc.persistence.models import BrightnessObservation

log = logging.getLogger(__name__)


# class RabbitMQConsumer:
# def __init__(self, user: str, password: str, host: str):
# self.url = f"amqp://{user}:{password}@{host}"
#
# async def connect(self):
# pass


async def consume_brightness_observations():
"""begin consuming messages from the queue, storing them in predictions table"""
try:
amqp_connection = await aio_pika.connect_robust(f"amqp://{AMQP_USER}:{AMQP_PASSWORD}@{AMQP_HOST}")
except Exception as e:
import sys

log.error(f"could not form amqp connection because {e}; has rabbitmq started?")
log.warning("exiting")
sys.exit(1)
else:
async with amqp_connection:
channel = await amqp_connection.channel()
queue = await channel.declare_queue(AMQP_PREDICTION_QUEUE)

async for message in queue:
async with message.process():
brightness_observation_json = json.loads(message.body.decode())
brightness_observation = BrightnessObservation(**brightness_observation_json)

log.info(f"saving brightness observation {brightness_observation}")
await brightness_observation.save()
await asyncio.Future()
Empty file added pc/pc/tests/__init__.py
Empty file.
33 changes: 0 additions & 33 deletions pc/pc/websockets_handler.py

This file was deleted.

3 changes: 1 addition & 2 deletions pc/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
aio-pika==9.4.2
websockets==12.0
psycopg~=3.2.1
tortoise-orm[asyncpg]~=0.21.6
2 changes: 1 addition & 1 deletion pp/.idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pp/.idea/pp.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pp/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
FROM python:3.11.7-slim-bullseye
ARG VERSION=3.12.6-slim-bookworm
FROM python:${VERSION}

LABEL maintainer="Kevin Donahue <nonnontrivial@gmail.com>"

Expand Down
4 changes: 2 additions & 2 deletions pp/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# pp

prediction producer.
> prediction producer.
> handles sky brightness prediction across resolution 0 h3 cells and puts on rabbitmq
Handles sky brightness prediction across resolution 0 h3 cells and puts results on rabbitmq

## monitoring

Expand Down
Empty file added pp/pp/cells/__init__.py
Empty file.
File renamed without changes.
6 changes: 3 additions & 3 deletions pp/pp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
keydb_port = int(os.getenv("KEYDB_PORT", 6379))

rabbitmq_host = os.getenv("RABBITMQ_HOST", "localhost")
prediction_queue = os.getenv("PREDICTION_QUEUE", "prediction")

task_sleep_interval = float(os.getenv("TASK_SLEEP_INTERVAL", "0.5"))
queue_name = os.getenv("QUEUE_NAME", "prediction")

api_protocol = os.getenv("API_PROTOCOL", "http")
api_port = int(os.getenv("API_PORT", "8000"))
api_host = os.getenv("API_HOST", "localhost")
api_version = os.getenv("API_VERSION", "v1")

model_version = os.getenv("MODEL_VERSION", "0.1.0")

task_sleep_interval = float(os.getenv("TASK_SLEEP_INTERVAL", "0.5"))
10 changes: 5 additions & 5 deletions pp/pp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import pika
from pika.exceptions import AMQPConnectionError

from .prediction import publish_cell_brightness
from .cells import get_h3_cells
from .config import rabbitmq_host, prediction_queue, task_sleep_interval
from .prediction import publish_observation_to_queue
from .cells.h3 import get_h3_cells
from .config import rabbitmq_host, queue_name, task_sleep_interval

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger(__name__)
Expand All @@ -19,7 +19,7 @@ async def main():
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host))
channel = connection.channel()
channel.queue_declare(queue=prediction_queue)
channel.queue_declare(queue=queue_name)
except AMQPConnectionError as e:
import sys

Expand All @@ -36,7 +36,7 @@ async def main():
async with httpx.AsyncClient() as client:
while True:
for cell_coords in h3_cell_coords:
await asyncio.create_task(publish_cell_brightness(client, cell_coords, channel))
await asyncio.create_task(publish_observation_to_queue(client, cell_coords, channel))
await asyncio.sleep(task_sleep_interval)
except Exception as e:
log.error(f"could not continue publishing because {e}")
Expand Down
Empty file added pp/pp/models/__init__.py
Empty file.
6 changes: 2 additions & 4 deletions pc/pc/model.py → pp/pp/models/models.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from dataclasses import dataclass
from pydantic import BaseModel


@dataclass
class BrightnessMessage:
class BrightnessObservation(BaseModel):
uuid: str
lat: float
lon: float
h3_id: str
utc_iso: str
utc_ns: int
mpsas: float
model_version: str
Loading

0 comments on commit fd8a926

Please sign in to comment.