From 28636f4dbe7b0b8e1ec1d60103e2794c47b55eba Mon Sep 17 00:00:00 2001 From: Kevin Donahue Date: Sun, 12 May 2024 13:41:30 -0400 Subject: [PATCH] add cell prediction producer container --- api/api/main.py | 2 +- cpp/.idea/.gitignore | 3 ++ cpp/.idea/cpp.iml | 14 ++++++ .../inspectionProfiles/profiles_settings.xml | 6 +++ cpp/.idea/misc.xml | 7 +++ cpp/.idea/modules.xml | 8 +++ cpp/.idea/vcs.xml | 6 +++ cpp/Dockerfile | 12 +++++ cpp/README.md | 7 +++ cpp/cpp/__init__.py | 0 cpp/cpp/cells.py | 49 +++++++++++++++++++ cpp/cpp/config.py | 9 ++++ cpp/cpp/main.py | 29 +++++++++++ cpp/requirements.txt | 4 ++ docker-compose.yml | 18 +++++++ 15 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 cpp/.idea/.gitignore create mode 100644 cpp/.idea/cpp.iml create mode 100644 cpp/.idea/inspectionProfiles/profiles_settings.xml create mode 100644 cpp/.idea/misc.xml create mode 100644 cpp/.idea/modules.xml create mode 100644 cpp/.idea/vcs.xml create mode 100644 cpp/Dockerfile create mode 100644 cpp/README.md create mode 100644 cpp/cpp/__init__.py create mode 100644 cpp/cpp/cells.py create mode 100644 cpp/cpp/config.py create mode 100644 cpp/cpp/main.py create mode 100644 cpp/requirements.txt diff --git a/api/api/main.py b/api/api/main.py index 1237a3b..1d96a35 100644 --- a/api/api/main.py +++ b/api/api/main.py @@ -17,7 +17,7 @@ @dataclass class PredictionResponse: - """carries sky brightness in mpsas""" + """response with sky brightness in magnitudes per square arcsecond""" sky_brightness: float diff --git a/cpp/.idea/.gitignore b/cpp/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/cpp/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/cpp/.idea/cpp.iml b/cpp/.idea/cpp.iml new file mode 100644 index 0000000..7a6134d --- /dev/null +++ b/cpp/.idea/cpp.iml @@ -0,0 +1,14 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/cpp/.idea/inspectionProfiles/profiles_settings.xml b/cpp/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/cpp/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/cpp/.idea/misc.xml b/cpp/.idea/misc.xml new file mode 100644 index 0000000..565c141 --- /dev/null +++ b/cpp/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/cpp/.idea/modules.xml b/cpp/.idea/modules.xml new file mode 100644 index 0000000..e009e75 --- /dev/null +++ b/cpp/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/cpp/.idea/vcs.xml b/cpp/.idea/vcs.xml new file mode 100644 index 0000000..6c0b863 --- /dev/null +++ b/cpp/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/cpp/Dockerfile b/cpp/Dockerfile new file mode 100644 index 0000000..f4b7dbe --- /dev/null +++ b/cpp/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.11.7-slim-bullseye + +LABEL maintainer="Kevin Donahue " + +WORKDIR /app +COPY ./requirements.txt . + +RUN pip install --no-cache-dir --upgrade -r ./requirements.txt + +COPY . . + +CMD python -m cpp.main diff --git a/cpp/README.md b/cpp/README.md new file mode 100644 index 0000000..fac4325 --- /dev/null +++ b/cpp/README.md @@ -0,0 +1,7 @@ +# cpp + +this container is a producer for rabbitmq prediction queue. + +it finds cells to request predictions for, +requests those predictions, and then sends +the response to the prediction queue (repeatedly) diff --git a/cpp/cpp/__init__.py b/cpp/cpp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cpp/cpp/cells.py b/cpp/cpp/cells.py new file mode 100644 index 0000000..30f3bc6 --- /dev/null +++ b/cpp/cpp/cells.py @@ -0,0 +1,49 @@ +from typing import Tuple, List +from dataclasses import dataclass, asdict +from datetime import datetime +import logging + +from pika.channel import Channel +import h3 +import httpx + +from .config import api_protocol, api_host, api_port, api_version + +log = logging.getLogger(__name__) + + +@dataclass +class PredictionMessage: + time_of: str + lat: float + lon: float + sky_brightness_mpsas: float + + +async def predict_on_cell(client: httpx.AsyncClient, coords: Tuple[float, float], channel: Channel): + """generate the prediction for the cell coordinates; publishing to rabbitmq once available""" + import json + import asyncio + + api_url = f"{api_protocol}://{api_host}:{api_port}/api/{api_version}/prediction" + + lat, lon = coords + res = await client.get(api_url, params={"lat": lat, "lon": lon}) + # res.raise_for_status() + data = res.json() + message_body = asdict(PredictionMessage( + lat=lat, + lon=lon, + time_of=datetime.utcnow().isoformat(), + sky_brightness_mpsas=data["sky_brightness"], + )) + log.info(f"publishing prediction message {message_body}") + # FIXME routing key + channel.basic_publish(exchange="", routing_key="hello", body=json.dumps(message_body)) + await asyncio.sleep(1) + + +def get_res_zero_cell_coords() -> List[Tuple[float, float]]: + """get coordinates of all resolution zero cells""" + resolution_zero_cells = h3.get_res0_indexes() + return [h3.h3_to_geo(c) for c in resolution_zero_cells] diff --git a/cpp/cpp/config.py b/cpp/cpp/config.py new file mode 100644 index 0000000..888a4c0 --- /dev/null +++ b/cpp/cpp/config.py @@ -0,0 +1,9 @@ +import os + +rabbitmq_host = os.getenv("RABBITMQ_HOST", "localhost") +prediction_queue = os.getenv("PREDICTION_QUEUE", "prediction") + +api_protocol = os.getenv("API_PROTOCOL", "http") +api_port = int(os.getenv("API_PORT", "8000")) +api_host = os.getenv("API_HOST", "localhost") +api_version = os.getenv("API_VERSION", "v1") diff --git a/cpp/cpp/main.py b/cpp/cpp/main.py new file mode 100644 index 0000000..5462bfd --- /dev/null +++ b/cpp/cpp/main.py @@ -0,0 +1,29 @@ +import logging +import asyncio + +import httpx +import pika + +from .cells import predict_on_cell, get_res_zero_cell_coords +from .config import rabbitmq_host, prediction_queue + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +log = logging.getLogger(__name__) + + +async def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host)) + channel = connection.channel() + channel.queue_declare(queue=prediction_queue) + + async with httpx.AsyncClient() as client: + while True: + cells = get_res_zero_cell_coords() + log.info(f"found {len(cells)} resolution zero cells") + tasks = [predict_on_cell(client, coords, channel) for coords in cells[:10]] + await asyncio.gather(*tasks) + await asyncio.sleep(1) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/cpp/requirements.txt b/cpp/requirements.txt new file mode 100644 index 0000000..29bfb81 --- /dev/null +++ b/cpp/requirements.txt @@ -0,0 +1,4 @@ +h3 +httpx +pika +pydantic \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 196d65e..07c5584 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,3 +1,4 @@ +version: "3" services: api: build: ./api @@ -10,7 +11,24 @@ services: ports: - "5672:5672" - "15672:15672" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:15672"] + interval: 30s + timeout: 10s + retries: 5 environment: RABBITMQ_DEFAULT_USER: "guest" RABBITMQ_DEFAULT_PASS: "guest" + cpp: + build: ./cpp + environment: + API_VERSION: "v1" + API_HOST: "api" + RABBITMQ_HOST: "rabbitmq" + restart: on-failure + depends_on: + - rabbitmq + - api + links: + - rabbitmq