-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
- Loading branch information
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
FROM python:3.11.7-slim-bullseye | ||
|
||
LABEL maintainer="Kevin Donahue <nonnontrivial@gmail.com>" | ||
|
||
WORKDIR /app | ||
COPY ./requirements.txt . | ||
|
||
RUN pip install --no-cache-dir --upgrade -r ./requirements.txt | ||
|
||
COPY . . | ||
|
||
CMD python -m cpp.main |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# cpp | ||
|
||
this container is a producer for rabbitmq prediction queue. | ||
|
||
it finds cells to request predictions for, | ||
requests those predictions, and then sends | ||
the response to the prediction queue (repeatedly) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from typing import Tuple, List | ||
from dataclasses import dataclass, asdict | ||
from datetime import datetime | ||
import logging | ||
|
||
from pika.channel import Channel | ||
import h3 | ||
import httpx | ||
|
||
from .config import api_protocol, api_host, api_port, api_version | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
@dataclass | ||
class PredictionMessage: | ||
time_of: str | ||
lat: float | ||
lon: float | ||
sky_brightness_mpsas: float | ||
|
||
|
||
async def predict_on_cell(client: httpx.AsyncClient, coords: Tuple[float, float], channel: Channel): | ||
"""generate the prediction for the cell coordinates; publishing to rabbitmq once available""" | ||
import json | ||
import asyncio | ||
|
||
api_url = f"{api_protocol}://{api_host}:{api_port}/api/{api_version}/prediction" | ||
|
||
lat, lon = coords | ||
res = await client.get(api_url, params={"lat": lat, "lon": lon}) | ||
# res.raise_for_status() | ||
data = res.json() | ||
message_body = asdict(PredictionMessage( | ||
lat=lat, | ||
lon=lon, | ||
time_of=datetime.utcnow().isoformat(), | ||
sky_brightness_mpsas=data["sky_brightness"], | ||
)) | ||
log.info(f"publishing prediction message {message_body}") | ||
# FIXME routing key | ||
channel.basic_publish(exchange="", routing_key="hello", body=json.dumps(message_body)) | ||
await asyncio.sleep(1) | ||
|
||
|
||
def get_res_zero_cell_coords() -> List[Tuple[float, float]]: | ||
"""get coordinates of all resolution zero cells""" | ||
resolution_zero_cells = h3.get_res0_indexes() | ||
return [h3.h3_to_geo(c) for c in resolution_zero_cells] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
import os | ||
|
||
rabbitmq_host = os.getenv("RABBITMQ_HOST", "localhost") | ||
prediction_queue = os.getenv("PREDICTION_QUEUE", "prediction") | ||
|
||
api_protocol = os.getenv("API_PROTOCOL", "http") | ||
api_port = int(os.getenv("API_PORT", "8000")) | ||
api_host = os.getenv("API_HOST", "localhost") | ||
api_version = os.getenv("API_VERSION", "v1") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import logging | ||
import asyncio | ||
|
||
import httpx | ||
import pika | ||
|
||
from .cells import predict_on_cell, get_res_zero_cell_coords | ||
from .config import rabbitmq_host, prediction_queue | ||
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | ||
log = logging.getLogger(__name__) | ||
|
||
|
||
async def main(): | ||
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host)) | ||
channel = connection.channel() | ||
channel.queue_declare(queue=prediction_queue) | ||
|
||
async with httpx.AsyncClient() as client: | ||
while True: | ||
cells = get_res_zero_cell_coords() | ||
log.info(f"found {len(cells)} resolution zero cells") | ||
tasks = [predict_on_cell(client, coords, channel) for coords in cells[:10]] | ||
await asyncio.gather(*tasks) | ||
await asyncio.sleep(1) | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
h3 | ||
httpx | ||
pika | ||
pydantic |