From f94910a7ff222aefdab3516d3f0b7284ae0e04d2 Mon Sep 17 00:00:00 2001 From: Kevin Donahue Date: Sat, 19 Oct 2024 16:07:53 -0400 Subject: [PATCH] use time that is closer to actual request fixes issue where time passed into model was at property access rather than right when request comes in --- README.md | 42 +++++++++++-------- api/README.md | 38 ++++++++++++++--- api/api/service/brightness_servicer.py | 10 +++-- api/api/service/observer_site.py | 24 +++++------ .../service/open_meteo/open_meteo_client.py | 4 +- api/tests/test_api.py | 25 +++++++++-- pc/pc/consumer/consumer.py | 27 +++++++----- pc/pc/main.py | 1 + pc/requirements-test.txt | 2 +- pc/tests/test_consumer.py | 16 +++++++ pp/pp/publisher/cell_prediction_publisher.py | 2 +- 11 files changed, 132 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index 0f098d1..98e692f 100644 --- a/README.md +++ b/README.md @@ -1,26 +1,29 @@ -# ctts +# CTTS -This project was inspired by wanting **a way of seeing what the -[sky brightness](https://en.wikipedia.org/wiki/Sky_brightness) -is over the entire earth surface, and how it changes over time**.. +> *c*loser *t*o *t*he *s*tars -Given that it would be infeasible to have [sensors](http://unihedron.com/projects/darksky/TSL237-E32.pdf) -everywhere that we want a brightness measurement, it would make -sense to have a way of performing this measurement indirectly. +CTTS is **a way of predicting the [sky brightness](https://en.wikipedia.org/wiki/Sky_brightness) across the earth, and how it changes over time**.. ---- +## features -The approach this project takes is to model the relationship -between a set of independent variables and the dependent variable -(sky brightness) available in a [public dataset](https://globeatnight.org/maps-data/) using -pytorch. +* gRPC api for sky brightness "readings" (at the current time across H3 cells at resolution 6 in north america) -Another component uses [H3](https://uber.github.io/h3-py/intro.html) -to discretize the surface of the earth into cells, which form the basis -of the requests made to the api server serving up the sky brightness. -This component then pushes brightness values from the api server onto -rabbitmq, while another component consumes them from that queue - -inserting records into postgres to enable historical lookup. +* gRPC api for light pollution values (in RGBA, from a 2022 map) + +## about + +This project is motivated by the desire for synoptic knowledge of "where are the stars good". + +It would be infeasible to have [sensors](http://unihedron.com/projects/darksky/TSL237-E32.pdf) +everywhere that a brightness measurement is desired, so it would make sense to have a way of +doing inference of this value. + + +The approach this project takes is to use pytorch to capture the relationships in the [Globe At Night +dataset](https://globeatnight.org/maps-data/) and use that to predict sky brightness for H3 +cells at resoultion 6. + +> note: currently limited to north america ## running with docker @@ -69,6 +72,9 @@ This output indicates that the producer service is successfully getting sky brightness predictions for H3 cells and that the consumer service is storing them in the postgres table `brightnessobservation`. +## documentation + +See [`.md` files](./api/README.md) in component directories. ## licensing diff --git a/api/README.md b/api/README.md index 30088ab..a396af8 100644 --- a/api/README.md +++ b/api/README.md @@ -1,14 +1,42 @@ # api -gRPC api for [sky brightness](https://en.wikipedia.org/wiki/Sky_brightness) at valid coordinates. +gRPC apis for [sky brightness](https://en.wikipedia.org/wiki/Sky_brightness). -## services +## gRPC client example -## building and training the sky brightness model +- install dependencies `pip install grpcio-tools grpcio protobuf` +- generate the stubs using `python3 -m grpc_tools.protoc` in terms of the `.proto` file in this repo -The api depends on a model being trained from csv data (`globe_at_night.csv`). +```py +"""client which will get predicted sky brightness for the current time at +given coordinates""" +import grpc -These commands will generate a new `model.pth`: +from stubs import brightness_service_pb2 +from stubs import brightness_service_pb2_grpc + +host = "localhost" +port = 50051 + +with grpc.insecure_channel(f"{host}:{port}") as channel: + lat,lon=(0.,0.) + + stub = brightness_service_pb2_grpc.BrightnessServiceStub(channel) + + request = brightness_service_pb2.Coordinates(lat=lat, lon=lon) + response = stub.GetBrightnessObservation(request) + + print(response) + +``` + +## model + +### building and training + +The api depends on a model being trained from csv data. + +The following commands will generate a new `model.pth` (i.e. the learned parameters): - `python -m api.model.build` to write the csv that the model trains on - `python -m api.model.train` to train on the data in the csv diff --git a/api/api/service/brightness_servicer.py b/api/api/service/brightness_servicer.py index 23f8cd3..66b47ed 100644 --- a/api/api/service/brightness_servicer.py +++ b/api/api/service/brightness_servicer.py @@ -6,11 +6,12 @@ import astropy.units as u import torch from astropy.coordinates import EarthLocation +from astropy.time import Time from ..stubs import brightness_service_pb2, brightness_service_pb2_grpc from .open_meteo.open_meteo_client import OpenMeteoClient from .neural_net.nn import NN -from .observer_site import ObserverSite +from .observer_site import ObservationSite from .pollution.pollution_image import PollutionImage from . import config @@ -29,8 +30,10 @@ def GetPollution(self, request, context): def GetBrightnessObservation(self, request, context): lat, lon = request.lat, request.lon + now = Time.now() + location = EarthLocation.from_geodetic(lon * u.degree, lat * u.degree) - site = ObserverSite(location=location) + site = ObservationSite(utc_time=now, location=location) try: meteo_client = OpenMeteoClient(site=site) @@ -39,7 +42,7 @@ def GetBrightnessObservation(self, request, context): import traceback logging.error(traceback.format_exc()) - raise ValueError(f"could not get meteo data {e}") + raise ValueError(f"could not get open meteo data {e}") else: model = NN() model.load_state_dict(torch.load(path_to_state_dict)) @@ -59,7 +62,6 @@ def GetBrightnessObservation(self, request, context): dtype=torch.float32, ).unsqueeze(0) - with torch.no_grad(): predicted_y = model(x) logging.debug(f"predicted {x} to be {predicted_y}") diff --git a/api/api/service/observer_site.py b/api/api/service/observer_site.py index 0780d82..5b6f879 100644 --- a/api/api/service/observer_site.py +++ b/api/api/service/observer_site.py @@ -1,35 +1,33 @@ +import typing + from astroplan import Observer +from astropy.table import astropy from astropy.time import Time +import numpy as np from .utils import get_astro_time_hour -class ObserverSite(Observer): - """A location on the earth, loaded with data around moon altitude and azimuth at the utc time""" - - def __str__(self): - return f"" +class ObservationSite(Observer): + def __init__(self, utc_time: Time, **kwargs): + super().__init__(**kwargs) - @property - def utc_time(self): - return Time.now() + self.utc_time = utc_time @property def time_hour(self): """mapping of hourly time into sine value""" - import numpy as np - return np.sin(2 * np.pi * get_astro_time_hour(self.utc_time) / 24) @property def moon_alt(self): - altaz = self.get_moon_altaz() + altaz = self._get_moon_altaz() return altaz.alt.value @property def moon_az(self): - altaz = self.get_moon_altaz() + altaz = self._get_moon_altaz() return altaz.az.value - def get_moon_altaz(self): + def _get_moon_altaz(self) -> typing.Any: return self.moon_altaz(self.utc_time) diff --git a/api/api/service/open_meteo/open_meteo_client.py b/api/api/service/open_meteo/open_meteo_client.py index 79b4f75..26369cd 100644 --- a/api/api/service/open_meteo/open_meteo_client.py +++ b/api/api/service/open_meteo/open_meteo_client.py @@ -4,13 +4,13 @@ from .. import config -from ..observer_site import ObserverSite +from ..observer_site import ObservationSite from ..utils import get_astro_time_hour model = config["meteo"]["model"] class OpenMeteoClient: - def __init__(self, site: ObserverSite) -> None: + def __init__(self, site: ObservationSite) -> None: self.site = site protocol=config["meteo"]["protocol"] diff --git a/api/tests/test_api.py b/api/tests/test_api.py index 1bad630..66d0e0a 100644 --- a/api/tests/test_api.py +++ b/api/tests/test_api.py @@ -1,5 +1,6 @@ from concurrent import futures from unittest.mock import patch +from datetime import datetime, UTC import pytest import grpc @@ -30,7 +31,7 @@ def brightness_service_stub(grpc_server): (-23.550520, -46.633308, 255), (28.613939, 77.209021, 255), (31.230416, 121.473701, 255), - (0., 0., 0), + (95., -95., 0), ]) def test_pollution(brightness_service_stub, lat, lon, channel_value): request = brightness_service_pb2.Coordinates(lat=lat,lon=lon) @@ -49,17 +50,33 @@ def test_pollution(brightness_service_stub, lat, lon, channel_value): ]) @patch('api.service.open_meteo.open_meteo_client.requests.get') def test_brightness_observation(mock_get, lat, lon, brightness_service_stub): - from datetime import datetime, UTC - mock_response = mock_get.return_value mock_response.status_code = 200 mock_response.json.return_value={"elevation":0.,"hourly":{"cloud_cover":[0]*24}} request = brightness_service_pb2.Coordinates(lat=lat,lon=lon) - response = brightness_service_stub.GetBrightnessObservation(request) + assert response.lat == lat assert response.lon == lon + parsed_utc_iso = datetime.fromisoformat(response.utc_iso) assert parsed_utc_iso.date() == datetime.now(UTC).date() + + tolerance_ms = 1 + assert abs(parsed_utc_iso.microsecond - datetime.now().microsecond) < tolerance_ms * 1000 + assert response.mpsas > 0 and response.mpsas < 22 + +@pytest.mark.parametrize('lat, lon', [ + (95., -95.), +]) +@patch('api.service.open_meteo.open_meteo_client.requests.get') +def test_brightness_observation_invalid_coords(mock_get, lat, lon, brightness_service_stub): + mock_response = mock_get.return_value + mock_response.status_code = 200 + mock_response.json.return_value={"elevation":0.,"hourly":{"cloud_cover":[0]*24}} + + with pytest.raises(Exception): + request = brightness_service_pb2.Coordinates(lat=lat,lon=lon) + response = brightness_service_stub.GetBrightnessObservation(request) diff --git a/pc/pc/consumer/consumer.py b/pc/pc/consumer/consumer.py index 83ae5dc..38f8d3f 100644 --- a/pc/pc/consumer/consumer.py +++ b/pc/pc/consumer/consumer.py @@ -1,9 +1,10 @@ import json import logging import asyncio +import typing import aio_pika -from aio_pika.abc import AbstractIncomingMessage +from aio_pika.abc import AbstractIncomingMessage, AbstractRobustChannel from pc.persistence.models import BrightnessObservation from pc.consumer.websocket_handler import WebsocketHandler @@ -12,14 +13,12 @@ class Consumer: - def __init__(self, url: str, queue_name: str, websocket_handler: WebsocketHandler): + def __init__(self, url: str, queue_name: str, websocket_handler: typing.Optional[WebsocketHandler] = None): self._amqp_url = url self._queue_name = queue_name self._websocket_handler = websocket_handler async def start(self): - """consume messages off of the prediction queue; storing in pg - and broadcasting to websocket clients""" try: connection = await aio_pika.connect_robust(self._amqp_url) except Exception as e: @@ -31,20 +30,26 @@ async def start(self): else: async with connection: channel = await connection.channel() - queue = await channel.declare_queue(self._queue_name) - await queue.consume(self._ingest_message, no_ack=True) - await asyncio.Future() + await self.consume(channel) # type: ignore - async def _ingest_message(self, message: AbstractIncomingMessage): - """store and disseminate brightness message""" - log.info(f"received message {message.body}") + async def consume(self, channel: AbstractRobustChannel): + """begin consuming from queue on a given channel""" + queue = await channel.declare_queue(self._queue_name) + await queue.consume(self._on_message, no_ack=True) + await asyncio.Future() + async def _on_message(self, message: AbstractIncomingMessage): + """handle incoming message by storing in postgres""" try: + log.info(f"received message {message.body}") + brightness_observation_json = json.loads(message.body.decode()) brightness_observation = BrightnessObservation(**brightness_observation_json) await brightness_observation.save() - await self._websocket_handler.broadcast(brightness_observation_json) + + if self._websocket_handler is not None: + await self._websocket_handler.broadcast(brightness_observation_json) except Exception as e: log.error(f"could not save brightness observation {e}") else: diff --git a/pc/pc/main.py b/pc/pc/main.py index fd04258..7567c34 100644 --- a/pc/pc/main.py +++ b/pc/pc/main.py @@ -6,6 +6,7 @@ from pc.consumer.websocket_handler import websocket_handler from pc.config import amqp_url, prediction_queue + logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") log = logging.getLogger(__name__) diff --git a/pc/requirements-test.txt b/pc/requirements-test.txt index 5a7b384..ee4ba01 100644 --- a/pc/requirements-test.txt +++ b/pc/requirements-test.txt @@ -1,2 +1,2 @@ pytest -pytest-async +pytest-asyncio diff --git a/pc/tests/test_consumer.py b/pc/tests/test_consumer.py index 5871ed8..3ef3516 100644 --- a/pc/tests/test_consumer.py +++ b/pc/tests/test_consumer.py @@ -1 +1,17 @@ +from unittest import mock + import pytest +from aio_pika import Message + +from pc.consumer.consumer import Consumer + +@pytest.fixture +def consumer(): + amqp_url="amqp://localhost" + prediction_queue="prediction" + return Consumer(url=amqp_url, queue_name=prediction_queue) + +@pytest.mark.skip +@pytest.mark.asyncio +async def test_can_consume_message(consumer): + pass diff --git a/pp/pp/publisher/cell_prediction_publisher.py b/pp/pp/publisher/cell_prediction_publisher.py index c9af1c4..99fa0dd 100644 --- a/pp/pp/publisher/cell_prediction_publisher.py +++ b/pp/pp/publisher/cell_prediction_publisher.py @@ -46,7 +46,6 @@ def publish_prediction_at_cell(self, cell): ) self._channel.basic_publish(exchange="", routing_key=self._queue_name, body=json.dumps(brightness_observation.model_dump())) - log.info(f"{len(CellPredictionPublisher.cell_counts)} distinct cells have had observations published") def publish(self): """get brightness observations for a set of cells, forwarding responses to rabbitmq""" @@ -55,3 +54,4 @@ def publish(self): for cell in cells: CellPredictionPublisher.cell_counts[cell] += 1 self.publish_prediction_at_cell(cell) + log.debug(f"{len(CellPredictionPublisher.cell_counts)} distinct cells have had observations published")