Skip to content

Commit

Permalink
add docstring
Browse files Browse the repository at this point in the history
  • Loading branch information
nonnontrivial committed Jun 15, 2024
1 parent 4bf0d5b commit 379a625
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
7 changes: 4 additions & 3 deletions pp/pp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@ async def main():
"""initializes process of getting sky brightness predictions for h3 cells;
publishing them to prediction queue as available.
with 122 res 0 cells on current machine, this setup will publish at a rate of 1.4m/s
n.b. with 122 res 0 cells on current machine, this setup will publish at a rate of 1.4m/s
"""

connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host))
channel = connection.channel()
channel.queue_declare(queue=prediction_queue)

all_h3_cell_coords = get_res_zero_cell_coords()

log.info(f"using {len(all_h3_cell_coords)} resolution zero cells")

async with httpx.AsyncClient() as client:
while True:
try:
for coords in all_h3_cell_coords:
await asyncio.create_task(predict_on_cell(client, coords, channel))
for cell_coordinates in all_h3_cell_coords:
await asyncio.create_task(predict_on_cell(client, cell_coordinates, channel))
await asyncio.sleep(sleep_interval)
except Exception as e:
log.error(f"could not continue publishing because {e}")
Expand Down
4 changes: 2 additions & 2 deletions pp/pp/prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
async def get_prediction_message_for_lat_lon(client: httpx.AsyncClient, lat: float, lon: float) -> PredictionMessage:
res = await client.get(prediction_endpoint_url, params={"lat": lat, "lon": lon})
res.raise_for_status()

data = res.json()
return PredictionMessage(
lat=lat,
Expand All @@ -28,14 +27,15 @@ async def get_prediction_message_for_lat_lon(client: httpx.AsyncClient, lat: flo


async def predict_on_cell(client: httpx.AsyncClient, coords: Tuple[float, float], channel: Channel):
"""retrieve and publish a sky brightness prediction at coords"""
import json

try:
lat, lon = coords
prediction_message = await get_prediction_message_for_lat_lon(client, lat, lon)
message_body = asdict(prediction_message)

log.info(f"publishing prediction message {message_body}")
log.info(f"publishing prediction message {message_body} with routing key {prediction_queue}")
channel.basic_publish(exchange="", routing_key=prediction_queue, body=json.dumps(message_body))
except httpx.HTTPStatusError as e:
log.error(f"got bad status from api server {e}")
Expand Down

0 comments on commit 379a625

Please sign in to comment.