diff --git a/README.md b/README.md index fd01355..c76af3c 100644 --- a/README.md +++ b/README.md @@ -8,5 +8,9 @@ this will spin up the process of the prediction producer container repeatedly as measurements across all [resolution 0 h3 cells](https://h3geo.org/docs/core-library/restable/) and publishing to rabbitmq. ```shell +# create the volume for weather data +docker volume create --name open-meteo-data + +# run the containers docker-compose up --build ``` diff --git a/docker-compose.yml b/docker-compose.yml index c59f628..08e5d95 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,17 @@ version: "3" - services: + postgres: + image: "postgres:latest" + environment: + POSTGRES_DB: "postgres" + POSTGRES_USER: "postgres" + POSTGRES_PASSWORD: "password" + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + + keydb: image: "eqalpha/keydb:latest" ports: @@ -12,7 +23,7 @@ services: - "5672:5672" - "15672:15672" healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:15672"] + test: [ "CMD", "curl", "-f", "http://localhost:15672" ] interval: 30s timeout: 10s retries: 5 @@ -58,6 +69,22 @@ services: links: - rabbitmq + pc: + build: ./pc + environment: + AMQP_HOST: "rabbitmq" + PG_DATABASE: "postgres" + restart: on-failure + depends_on: + - postgres + - rabbitmq + links: + - rabbitmq + + + volumes: + postgres_data: open-meteo-data: + external: true diff --git a/pc/.idea/.gitignore b/pc/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/pc/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/pc/.idea/inspectionProfiles/profiles_settings.xml b/pc/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/pc/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/pc/.idea/misc.xml b/pc/.idea/misc.xml new file mode 100644 index 0000000..5c79c71 --- /dev/null +++ b/pc/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/pc/.idea/modules.xml b/pc/.idea/modules.xml new file mode 100644 index 0000000..07a7f77 --- /dev/null +++ b/pc/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/pc/.idea/pc.iml b/pc/.idea/pc.iml new file mode 100644 index 0000000..2c80e12 --- /dev/null +++ b/pc/.idea/pc.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/pc/.idea/vcs.xml b/pc/.idea/vcs.xml new file mode 100644 index 0000000..6c0b863 --- /dev/null +++ b/pc/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/pc/Dockerfile b/pc/Dockerfile new file mode 100644 index 0000000..8fb00ab --- /dev/null +++ b/pc/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.11.7-slim-bullseye + +LABEL maintainer="Kevin Donahue " + +# system dependencies needed for psycopg3 +RUN apt-get update \ + && apt-get install -y libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app +COPY ./requirements.txt . + +RUN pip install --no-cache-dir --upgrade -r ./requirements.txt + +COPY . . + +CMD python -m pc.consume diff --git a/pc/README.md b/pc/README.md new file mode 100644 index 0000000..166540e --- /dev/null +++ b/pc/README.md @@ -0,0 +1,11 @@ +# pc + +prediction consumer. + +pulls predictions messages off of prediction queue and into postgres and websockets. + +## connect to timescale instance + +```shell +psql -d "postgres://postgres:password@localhost/postgres" +``` diff --git a/pc/pc/__init__.py b/pc/pc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pc/pc/consume.py b/pc/pc/consume.py new file mode 100644 index 0000000..df4f63d --- /dev/null +++ b/pc/pc/consume.py @@ -0,0 +1,97 @@ +import os +import json +import asyncio +import logging +from dataclasses import dataclass + +import psycopg +import aio_pika + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +log = logging.getLogger(__name__) + +PG_USER = os.getenv("PG_USER", "postgres") +PG_PASSWORD = os.getenv("PG_PASSWORD", "password") +PG_DATABASE = os.getenv("PG_DATABASE", "localhost") +PG_HOST = os.getenv("PG_HOST", "postgres") +PG_PORT = int(os.getenv("PG_PORT", 5432)) + +pg_dsn = f"dbname={PG_DATABASE} user={PG_USER} password={PG_PASSWORD} host={PG_HOST}" + +AMQP_USER = os.getenv("AMQP_USER", "guest") +AMQP_PASSWORD = os.getenv("AMQP_PASSWORD", "guest") +AMQP_HOST = os.getenv("AMQP_HOST", "localhost") +AMQP_PREDICTION_QUEUE = os.getenv("AMQP_PREDICTION_QUEUE", "prediction") + + +# FIXME should be defined elsewhere +@dataclass +class BrightnessMessage: + uuid: str + lat: float + lon: float + h3_id: str + utc_iso: str + utc_ns: int + mpsas: float + model_version: str + + +def initialize_db(): + """create the predictions table if it does not exist""" + with psycopg.connect(pg_dsn) as conn: + with conn.cursor() as cur: + cur.execute(""" + CREATE TABLE IF NOT EXISTS predictions ( + id serial PRIMARY KEY, + h3_id text NOT NULL, + utc_iso text NOT NULL, + utc_ns bigint NOT NULL, + mpsas real NOT NULL, + model_version text NOT NULL + ) + """) + conn.commit() + + +# +def insert_brightness_message_in_db(message: BrightnessMessage): + """insert subset of brightness message into the predictions table""" + with psycopg.connect(pg_dsn) as conn: + with conn.cursor() as cur: + log.info(f"inserting brightness message for {message.h3_id}") + + cur.execute(""" + INSERT INTO predictions (h3_id, utc_iso, utc_ns, mpsas, model_version) + VALUES (%s, %s, %s, %s, %s) + """, (message.h3_id, message.utc_iso, message.utc_ns, message.mpsas, message.model_version)) + conn.commit() + + +async def main(): + """create table in pg if needed and begin consuming messages from the queue, + storing them in the predictions table""" + initialize_db() + + try: + amqp_connection = await aio_pika.connect_robust(f"amqp://{AMQP_USER}:{AMQP_PASSWORD}@{AMQP_HOST}") + except Exception as e: + import sys + log.error(f"could not form amqp connection; has rabbitmq started?") + log.warning("exiting") + sys.exit(1) + else: + async with amqp_connection: + channel = await amqp_connection.channel() + queue = await channel.declare_queue(AMQP_PREDICTION_QUEUE) + async for m in queue: + async with m.process(): + json_data = json.loads(m.body.decode()) + brightness_message = BrightnessMessage(**json_data) + insert_brightness_message_in_db(brightness_message) + + await asyncio.Future() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pc/requirements.txt b/pc/requirements.txt new file mode 100644 index 0000000..80e7be3 --- /dev/null +++ b/pc/requirements.txt @@ -0,0 +1,3 @@ +aio-pika==9.4.2 +websockets==12.0 +psycopg~=3.2.1 \ No newline at end of file