Skip to content

Commit

Permalink
move channel into try block
Browse files Browse the repository at this point in the history
  • Loading branch information
nonnontrivial committed Jun 25, 2024
1 parent 1dbff2c commit adf35c0
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pp/.idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pp/.idea/pp.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pp/pp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
rabbitmq_host = os.getenv("RABBITMQ_HOST", "localhost")
prediction_queue = os.getenv("PREDICTION_QUEUE", "prediction")

sleep_interval = float(os.getenv("SLEEP_INTERVAL", "0.5"))
task_sleep_interval = float(os.getenv("SLEEP_INTERVAL", "0.5"))

api_protocol = os.getenv("API_PROTOCOL", "http")
api_port = int(os.getenv("API_PORT", "8000"))
Expand Down
41 changes: 20 additions & 21 deletions pp/pp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import pika
from pika.exceptions import AMQPConnectionError

from .prediction import predict_on_cell
from .prediction import predict_on_cell_coords
from .cells import get_res_zero_cell_coords
from .config import rabbitmq_host, prediction_queue, sleep_interval
from .config import rabbitmq_host, prediction_queue, task_sleep_interval

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger(__name__)
Expand All @@ -17,35 +17,34 @@ async def main():
"""initializes process of getting sky brightness predictions for h3 cells;
publishing them to prediction queue as available.
n.b. with 122 res 0 cells on current machine, this setup will publish at a rate of 1.4m/s
n.b. with 122 res 0 cells on 2016 macbook, this will publish at a rate of 1.4m/s
"""

try:
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host))
channel = connection.channel()
channel.queue_declare(queue=prediction_queue)
except AMQPConnectionError as e:
import sys
log.error("could not form amqp connection!")

log.error(f"could not form amqp connection {e}")
log.warning("exiting")
sys.exit(1)
except Exception as e:
log.error(f"could not start process because {e}")
log.error(f"could not start publisher because {e}")
else:
channel = connection.channel()
channel.queue_declare(queue=prediction_queue)

all_h3_cell_coords = get_res_zero_cell_coords()

log.info(f"using {len(all_h3_cell_coords)} resolution zero cells")

async with httpx.AsyncClient() as client:
while True:
try:
for cell_coordinates in all_h3_cell_coords:
await asyncio.create_task(predict_on_cell(client, cell_coordinates, channel))
await asyncio.sleep(sleep_interval)
except Exception as e:
log.error(f"could not continue publishing because {e}")
channel.close()
resolution_zero_cell_coords = get_res_zero_cell_coords()
log.debug(f"using {len(resolution_zero_cell_coords)} resolution zero cells")

try:
async with httpx.AsyncClient() as client:
while True:
for cell_coords in resolution_zero_cell_coords:
await asyncio.create_task(predict_on_cell_coords(client, cell_coords, channel))
await asyncio.sleep(task_sleep_interval)
except Exception as e:
log.error(f"could not continue publishing because {e}")
channel.close()


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions pp/pp/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ class PredictionMessage:
lon: float
# magnitudes per square arc second
mpsas: float
# id of the h3 cell
h3_id: str
3 changes: 1 addition & 2 deletions pp/pp/prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
async def get_prediction_message_for_lat_lon(client: httpx.AsyncClient, lat: float, lon: float) -> PredictionMessage:
res = await client.get(prediction_endpoint_url, params={"lat": lat, "lon": lon})
res.raise_for_status()

data = res.json()
if (mpsas := data.get("sky_brightness", None)) is None:
raise ValueError("no sky brightness reading in api response")
Expand All @@ -32,7 +31,7 @@ async def get_prediction_message_for_lat_lon(client: httpx.AsyncClient, lat: flo
)


async def predict_on_cell(client: httpx.AsyncClient, coords: Tuple[float, float], channel: Channel):
async def predict_on_cell_coords(client: httpx.AsyncClient, coords: Tuple[float, float], channel: Channel):
"""retrieve and publish a sky brightness prediction at coords for the h3 cell"""
import json

Expand Down
Empty file added pp/tests/__init__.py
Empty file.
Empty file.

0 comments on commit adf35c0

Please sign in to comment.