From b5df73dc91d21f6cbc59dcfb31810fb8b531531f Mon Sep 17 00:00:00 2001 From: Kevin Donahue Date: Sat, 15 Jun 2024 20:36:40 -0400 Subject: [PATCH] add better error handling add script for updating open meteo volume --- api/README.md | 4 +- api/api/config.py | 1 + api/api/main.py | 4 +- api/api/prediction/meteo/open_meteo_client.py | 5 ++- docker-compose.yml | 37 +++++++--------- pp/Dockerfile | 2 +- pp/pp/main.py | 44 ++++++++++++------- pp/pp/message.py | 6 ++- pp/pp/prediction.py | 13 ++++-- update-open-meteo.sh | 13 ++++++ 10 files changed, 79 insertions(+), 50 deletions(-) create mode 100755 update-open-meteo.sh diff --git a/api/README.md b/api/README.md index c8baaac..d06f955 100644 --- a/api/README.md +++ b/api/README.md @@ -39,12 +39,12 @@ curl "localhost:8000/api/v1/pollution?lat=40.7277478&lon=-74.0000374" } ``` -#### `/api/v1/prediction` +#### `/api/v1/predict` Gets the predicted sky brightness at `lat` and `lon` for the current time. ```sh -curl "http://localhost:8000/api/v1/prediction?lat=-30.2466&lon=-70.7494" +curl "http://localhost:8000/api/v1/predict?lat=-30.2466&lon=-70.7494" ``` diff --git a/api/api/config.py b/api/api/config.py index 828ce57..15b4ed2 100644 --- a/api/api/config.py +++ b/api/api/config.py @@ -1,3 +1,4 @@ import os api_version = os.getenv("API_VERSION", "v1") +log_level = int(os.getenv("LOG_LEVEL", 20)) diff --git a/api/api/main.py b/api/api/main.py index 5569f7b..f91b458 100644 --- a/api/api/main.py +++ b/api/api/main.py @@ -1,8 +1,9 @@ from dataclasses import asdict +import logging from fastapi import FastAPI, HTTPException, APIRouter -from .config import api_version +from .config import api_version, log_level from .models import PredictionResponse from .pollution.pollution import ArtificialNightSkyBrightnessMapImage, Coords from .prediction.prediction import ( @@ -10,6 +11,7 @@ predict_sky_brightness, ) +logging.basicConfig(level=log_level) app = FastAPI() main_router = APIRouter(prefix=f"/api/{api_version}") diff --git a/api/api/prediction/meteo/open_meteo_client.py b/api/api/prediction/meteo/open_meteo_client.py index c61babe..d0a7963 100644 --- a/api/api/prediction/meteo/open_meteo_client.py +++ b/api/api/prediction/meteo/open_meteo_client.py @@ -12,16 +12,17 @@ def __init__(self, site: ObserverSite) -> None: self.url_base = f"{PROTOCOL}://{open_meteo_host}:{open_meteo_port}" async def get_values_at_site(self) -> t.Tuple[int, float]: - """get cloudcover and elevation values for the observer site""" + """ask the instance of open meteo for cloud cover and elevation values for the observer site""" import httpx lat, lon = self.site.latitude.value, self.site.longitude.value async with httpx.AsyncClient() as client: + model = "ecmwf_ifs04" params = { "latitude": lat, "longitude": lon, - "models": "ecmwf_ifs04", + "models": model, "hourly": "temperature_2m,cloud_cover" } r = await client.get(f"{self.url_base}/v1/forecast", params=params) diff --git a/docker-compose.yml b/docker-compose.yml index 46768ad..ab5c0ce 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -29,33 +29,26 @@ services: - "8000:8000" environment: API_VERSION: "v1" + LOG_LEVEL: 30 OPEN_METEO_HOST: "openmeteo" + OPEN_METEO_PORT: 8080 restart: on-failure depends_on: - openmeteo - # sr: - # build: ./sr - # environment: - # OPEN_METEO_HOST: "openmeteo" - # restart: on-failure - # depends_on: - # - openmeteo - - - # pp: - # build: ./pp - # environment: - # API_VERSION: "v1" - # API_HOST: "api" - # RABBITMQ_HOST: "rabbitmq" - # SLEEP_INTERVAL: "0.5" - # restart: on-failure - # depends_on: - # - rabbitmq - # - api - # links: - # - rabbitmq + pp: + build: ./pp + environment: + API_VERSION: "v1" + API_HOST: "api" + RABBITMQ_HOST: "rabbitmq" + SLEEP_INTERVAL: "0.5" + restart: on-failure + depends_on: + - rabbitmq + - api + links: + - rabbitmq volumes: open-meteo-data: diff --git a/pp/Dockerfile b/pp/Dockerfile index f4b7dbe..e9ee5dc 100644 --- a/pp/Dockerfile +++ b/pp/Dockerfile @@ -9,4 +9,4 @@ RUN pip install --no-cache-dir --upgrade -r ./requirements.txt COPY . . -CMD python -m cpp.main +CMD python -m pp.main diff --git a/pp/pp/main.py b/pp/pp/main.py index fa33ba1..6a95489 100644 --- a/pp/pp/main.py +++ b/pp/pp/main.py @@ -3,6 +3,7 @@ import httpx import pika +from pika.exceptions import AMQPConnectionError from .prediction import predict_on_cell from .cells import get_res_zero_cell_coords @@ -19,23 +20,32 @@ async def main(): n.b. with 122 res 0 cells on current machine, this setup will publish at a rate of 1.4m/s """ - connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host)) - channel = connection.channel() - channel.queue_declare(queue=prediction_queue) - - all_h3_cell_coords = get_res_zero_cell_coords() - - log.info(f"using {len(all_h3_cell_coords)} resolution zero cells") - - async with httpx.AsyncClient() as client: - while True: - try: - for cell_coordinates in all_h3_cell_coords: - await asyncio.create_task(predict_on_cell(client, cell_coordinates, channel)) - await asyncio.sleep(sleep_interval) - except Exception as e: - log.error(f"could not continue publishing because {e}") - channel.close() + try: + connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host)) + except AMQPConnectionError as e: + import sys + log.error("could not form amqp connection!") + log.warning("exiting") + sys.exit(1) + except Exception as e: + log.error(f"could not start process because {e}") + else: + channel = connection.channel() + channel.queue_declare(queue=prediction_queue) + + all_h3_cell_coords = get_res_zero_cell_coords() + + log.info(f"using {len(all_h3_cell_coords)} resolution zero cells") + + async with httpx.AsyncClient() as client: + while True: + try: + for cell_coordinates in all_h3_cell_coords: + await asyncio.create_task(predict_on_cell(client, cell_coordinates, channel)) + await asyncio.sleep(sleep_interval) + except Exception as e: + log.error(f"could not continue publishing because {e}") + channel.close() if __name__ == "__main__": diff --git a/pp/pp/message.py b/pp/pp/message.py index 4ccf820..652627b 100644 --- a/pp/pp/message.py +++ b/pp/pp/message.py @@ -3,7 +3,9 @@ @dataclass class PredictionMessage: - time_of: str + utc: str lat: float lon: float - sky_brightness_mpsas: float + # magnitudes per square arc second + mpsas: float + h3_id: str diff --git a/pp/pp/prediction.py b/pp/pp/prediction.py index caca8b7..881bcbd 100644 --- a/pp/pp/prediction.py +++ b/pp/pp/prediction.py @@ -5,6 +5,7 @@ from pika.channel import Channel import httpx +import h3 from .config import api_protocol, api_host, api_port, api_version, prediction_queue from .message import PredictionMessage @@ -17,21 +18,27 @@ async def get_prediction_message_for_lat_lon(client: httpx.AsyncClient, lat: float, lon: float) -> PredictionMessage: res = await client.get(prediction_endpoint_url, params={"lat": lat, "lon": lon}) res.raise_for_status() + data = res.json() + if mpsas := data.get("sky_brightness", None) is None: + raise ValueError("no sky brightness reading in response") + return PredictionMessage( lat=lat, lon=lon, - time_of=datetime.utcnow().isoformat(), - sky_brightness_mpsas=data["sky_brightness"], + h3_id=h3.geo_to_h3(lat, lon, 0), + utc=datetime.utcnow().isoformat(), + mpsas=mpsas, ) async def predict_on_cell(client: httpx.AsyncClient, coords: Tuple[float, float], channel: Channel): - """retrieve and publish a sky brightness prediction at coords""" + """retrieve and publish a sky brightness prediction at coords for the h3 cell""" import json try: lat, lon = coords + prediction_message = await get_prediction_message_for_lat_lon(client, lat, lon) message_body = asdict(prediction_message) diff --git a/update-open-meteo.sh b/update-open-meteo.sh new file mode 100755 index 0000000..b996ea7 --- /dev/null +++ b/update-open-meteo.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +volume_name="open-meteo-data" + +if docker volume ls -q | grep -q "^${volume_name}$"; then + echo "$volume_name exists; updating volume" + docker run -it --rm -v open-meteo-data:/app/data ghcr.io/open-meteo/open-meteo sync ecmwf_ifs04 cloud_cover,temperature_2m +else + echo "$volume_name does not exist" + exit 1 +fi + +