Skip to content

Commit

Permalink
add better error handling
Browse files Browse the repository at this point in the history
add script for updating open meteo volume
  • Loading branch information
nonnontrivial committed Jun 16, 2024
1 parent 2a0162f commit b5df73d
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 50 deletions.
4 changes: 2 additions & 2 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ curl "localhost:8000/api/v1/pollution?lat=40.7277478&lon=-74.0000374"
}
```

#### `/api/v1/prediction`
#### `/api/v1/predict`

Gets the predicted sky brightness at `lat` and `lon` for the current time.

```sh
curl "http://localhost:8000/api/v1/prediction?lat=-30.2466&lon=-70.7494"
curl "http://localhost:8000/api/v1/predict?lat=-30.2466&lon=-70.7494"

```

Expand Down
1 change: 1 addition & 0 deletions api/api/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os

api_version = os.getenv("API_VERSION", "v1")
log_level = int(os.getenv("LOG_LEVEL", 20))
4 changes: 3 additions & 1 deletion api/api/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from dataclasses import asdict
import logging

from fastapi import FastAPI, HTTPException, APIRouter

from .config import api_version
from .config import api_version, log_level
from .models import PredictionResponse
from .pollution.pollution import ArtificialNightSkyBrightnessMapImage, Coords
from .prediction.prediction import (
Prediction,
predict_sky_brightness,
)

logging.basicConfig(level=log_level)
app = FastAPI()
main_router = APIRouter(prefix=f"/api/{api_version}")

Expand Down
5 changes: 3 additions & 2 deletions api/api/prediction/meteo/open_meteo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ def __init__(self, site: ObserverSite) -> None:
self.url_base = f"{PROTOCOL}://{open_meteo_host}:{open_meteo_port}"

async def get_values_at_site(self) -> t.Tuple[int, float]:
"""get cloudcover and elevation values for the observer site"""
"""ask the instance of open meteo for cloud cover and elevation values for the observer site"""
import httpx

lat, lon = self.site.latitude.value, self.site.longitude.value

async with httpx.AsyncClient() as client:
model = "ecmwf_ifs04"
params = {
"latitude": lat,
"longitude": lon,
"models": "ecmwf_ifs04",
"models": model,
"hourly": "temperature_2m,cloud_cover"
}
r = await client.get(f"{self.url_base}/v1/forecast", params=params)
Expand Down
37 changes: 15 additions & 22 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,26 @@ services:
- "8000:8000"
environment:
API_VERSION: "v1"
LOG_LEVEL: 30
OPEN_METEO_HOST: "openmeteo"
OPEN_METEO_PORT: 8080
restart: on-failure
depends_on:
- openmeteo

# sr:
# build: ./sr
# environment:
# OPEN_METEO_HOST: "openmeteo"
# restart: on-failure
# depends_on:
# - openmeteo


# pp:
# build: ./pp
# environment:
# API_VERSION: "v1"
# API_HOST: "api"
# RABBITMQ_HOST: "rabbitmq"
# SLEEP_INTERVAL: "0.5"
# restart: on-failure
# depends_on:
# - rabbitmq
# - api
# links:
# - rabbitmq
pp:
build: ./pp
environment:
API_VERSION: "v1"
API_HOST: "api"
RABBITMQ_HOST: "rabbitmq"
SLEEP_INTERVAL: "0.5"
restart: on-failure
depends_on:
- rabbitmq
- api
links:
- rabbitmq

volumes:
open-meteo-data:
Expand Down
2 changes: 1 addition & 1 deletion pp/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ RUN pip install --no-cache-dir --upgrade -r ./requirements.txt

COPY . .

CMD python -m cpp.main
CMD python -m pp.main
44 changes: 27 additions & 17 deletions pp/pp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import httpx
import pika
from pika.exceptions import AMQPConnectionError

from .prediction import predict_on_cell
from .cells import get_res_zero_cell_coords
Expand All @@ -19,23 +20,32 @@ async def main():
n.b. with 122 res 0 cells on current machine, this setup will publish at a rate of 1.4m/s
"""

connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host))
channel = connection.channel()
channel.queue_declare(queue=prediction_queue)

all_h3_cell_coords = get_res_zero_cell_coords()

log.info(f"using {len(all_h3_cell_coords)} resolution zero cells")

async with httpx.AsyncClient() as client:
while True:
try:
for cell_coordinates in all_h3_cell_coords:
await asyncio.create_task(predict_on_cell(client, cell_coordinates, channel))
await asyncio.sleep(sleep_interval)
except Exception as e:
log.error(f"could not continue publishing because {e}")
channel.close()
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host))
except AMQPConnectionError as e:
import sys
log.error("could not form amqp connection!")
log.warning("exiting")
sys.exit(1)
except Exception as e:
log.error(f"could not start process because {e}")
else:
channel = connection.channel()
channel.queue_declare(queue=prediction_queue)

all_h3_cell_coords = get_res_zero_cell_coords()

log.info(f"using {len(all_h3_cell_coords)} resolution zero cells")

async with httpx.AsyncClient() as client:
while True:
try:
for cell_coordinates in all_h3_cell_coords:
await asyncio.create_task(predict_on_cell(client, cell_coordinates, channel))
await asyncio.sleep(sleep_interval)
except Exception as e:
log.error(f"could not continue publishing because {e}")
channel.close()


if __name__ == "__main__":
Expand Down
6 changes: 4 additions & 2 deletions pp/pp/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

@dataclass
class PredictionMessage:
time_of: str
utc: str
lat: float
lon: float
sky_brightness_mpsas: float
# magnitudes per square arc second
mpsas: float
h3_id: str
13 changes: 10 additions & 3 deletions pp/pp/prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from pika.channel import Channel
import httpx
import h3

from .config import api_protocol, api_host, api_port, api_version, prediction_queue
from .message import PredictionMessage
Expand All @@ -17,21 +18,27 @@
async def get_prediction_message_for_lat_lon(client: httpx.AsyncClient, lat: float, lon: float) -> PredictionMessage:
res = await client.get(prediction_endpoint_url, params={"lat": lat, "lon": lon})
res.raise_for_status()

data = res.json()
if mpsas := data.get("sky_brightness", None) is None:
raise ValueError("no sky brightness reading in response")

return PredictionMessage(
lat=lat,
lon=lon,
time_of=datetime.utcnow().isoformat(),
sky_brightness_mpsas=data["sky_brightness"],
h3_id=h3.geo_to_h3(lat, lon, 0),
utc=datetime.utcnow().isoformat(),
mpsas=mpsas,
)


async def predict_on_cell(client: httpx.AsyncClient, coords: Tuple[float, float], channel: Channel):
"""retrieve and publish a sky brightness prediction at coords"""
"""retrieve and publish a sky brightness prediction at coords for the h3 cell"""
import json

try:
lat, lon = coords

prediction_message = await get_prediction_message_for_lat_lon(client, lat, lon)
message_body = asdict(prediction_message)

Expand Down
13 changes: 13 additions & 0 deletions update-open-meteo.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

volume_name="open-meteo-data"

if docker volume ls -q | grep -q "^${volume_name}$"; then
echo "$volume_name exists; updating volume"
docker run -it --rm -v open-meteo-data:/app/data ghcr.io/open-meteo/open-meteo sync ecmwf_ifs04 cloud_cover,temperature_2m
else
echo "$volume_name does not exist"
exit 1
fi


0 comments on commit b5df73d

Please sign in to comment.