diff --git a/pp/.idea/misc.xml b/pp/.idea/misc.xml index 5299cec..320019d 100644 --- a/pp/.idea/misc.xml +++ b/pp/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/pp/.idea/pp.iml b/pp/.idea/pp.iml index f32290b..3e9d178 100644 --- a/pp/.idea/pp.iml +++ b/pp/.idea/pp.iml @@ -5,7 +5,7 @@ - + diff --git a/pp/pp/config.py b/pp/pp/config.py index d8f5cf4..6e715a7 100644 --- a/pp/pp/config.py +++ b/pp/pp/config.py @@ -3,7 +3,7 @@ rabbitmq_host = os.getenv("RABBITMQ_HOST", "localhost") prediction_queue = os.getenv("PREDICTION_QUEUE", "prediction") -sleep_interval = float(os.getenv("SLEEP_INTERVAL", "0.5")) +task_sleep_interval = float(os.getenv("SLEEP_INTERVAL", "0.5")) api_protocol = os.getenv("API_PROTOCOL", "http") api_port = int(os.getenv("API_PORT", "8000")) diff --git a/pp/pp/main.py b/pp/pp/main.py index 6a95489..384d4aa 100644 --- a/pp/pp/main.py +++ b/pp/pp/main.py @@ -5,9 +5,9 @@ import pika from pika.exceptions import AMQPConnectionError -from .prediction import predict_on_cell +from .prediction import predict_on_cell_coords from .cells import get_res_zero_cell_coords -from .config import rabbitmq_host, prediction_queue, sleep_interval +from .config import rabbitmq_host, prediction_queue, task_sleep_interval logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") log = logging.getLogger(__name__) @@ -17,35 +17,34 @@ async def main(): """initializes process of getting sky brightness predictions for h3 cells; publishing them to prediction queue as available. - n.b. with 122 res 0 cells on current machine, this setup will publish at a rate of 1.4m/s + n.b. with 122 res 0 cells on 2016 macbook, this will publish at a rate of 1.4m/s """ try: connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host)) + channel = connection.channel() + channel.queue_declare(queue=prediction_queue) except AMQPConnectionError as e: import sys - log.error("could not form amqp connection!") + + log.error(f"could not form amqp connection {e}") log.warning("exiting") sys.exit(1) except Exception as e: - log.error(f"could not start process because {e}") + log.error(f"could not start publisher because {e}") else: - channel = connection.channel() - channel.queue_declare(queue=prediction_queue) - - all_h3_cell_coords = get_res_zero_cell_coords() - - log.info(f"using {len(all_h3_cell_coords)} resolution zero cells") - - async with httpx.AsyncClient() as client: - while True: - try: - for cell_coordinates in all_h3_cell_coords: - await asyncio.create_task(predict_on_cell(client, cell_coordinates, channel)) - await asyncio.sleep(sleep_interval) - except Exception as e: - log.error(f"could not continue publishing because {e}") - channel.close() + resolution_zero_cell_coords = get_res_zero_cell_coords() + log.debug(f"using {len(resolution_zero_cell_coords)} resolution zero cells") + + try: + async with httpx.AsyncClient() as client: + while True: + for cell_coords in resolution_zero_cell_coords: + await asyncio.create_task(predict_on_cell_coords(client, cell_coords, channel)) + await asyncio.sleep(task_sleep_interval) + except Exception as e: + log.error(f"could not continue publishing because {e}") + channel.close() if __name__ == "__main__": diff --git a/pp/pp/message.py b/pp/pp/message.py index 652627b..0551657 100644 --- a/pp/pp/message.py +++ b/pp/pp/message.py @@ -8,4 +8,5 @@ class PredictionMessage: lon: float # magnitudes per square arc second mpsas: float + # id of the h3 cell h3_id: str diff --git a/pp/pp/prediction.py b/pp/pp/prediction.py index 8a4fbcb..921c184 100644 --- a/pp/pp/prediction.py +++ b/pp/pp/prediction.py @@ -18,7 +18,6 @@ async def get_prediction_message_for_lat_lon(client: httpx.AsyncClient, lat: float, lon: float) -> PredictionMessage: res = await client.get(prediction_endpoint_url, params={"lat": lat, "lon": lon}) res.raise_for_status() - data = res.json() if (mpsas := data.get("sky_brightness", None)) is None: raise ValueError("no sky brightness reading in api response") @@ -32,7 +31,7 @@ async def get_prediction_message_for_lat_lon(client: httpx.AsyncClient, lat: flo ) -async def predict_on_cell(client: httpx.AsyncClient, coords: Tuple[float, float], channel: Channel): +async def predict_on_cell_coords(client: httpx.AsyncClient, coords: Tuple[float, float], channel: Channel): """retrieve and publish a sky brightness prediction at coords for the h3 cell""" import json diff --git a/pp/tests/__init__.py b/pp/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pp/tests/test_prediction_publisher.py b/pp/tests/test_prediction_publisher.py new file mode 100644 index 0000000..e69de29