-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
74f949b
commit 33a103d
Showing
19 changed files
with
123 additions
and
98 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
File renamed without changes.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# pp | ||
|
||
prediction producer. | ||
|
||
> handles sky brightness prediction across resolution 0 h3 cells and puts on rabbitmq | ||
## monitoring | ||
|
||
see the rabbitmq [dashboard](http://localhost:15672/#/) |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from typing import List, Tuple | ||
import h3 | ||
|
||
|
||
def get_res_zero_cell_coords() -> List[Tuple[float, float]]: | ||
"""gets list of coordinates of all resolution zero cells""" | ||
resolution_zero_cells = h3.get_res0_indexes() | ||
return [h3.h3_to_geo(c) for c in resolution_zero_cells] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import logging | ||
import asyncio | ||
|
||
import httpx | ||
import pika | ||
|
||
from .prediction import predict_on_cell | ||
from .cells import get_res_zero_cell_coords | ||
from .config import rabbitmq_host, prediction_queue, sleep_interval | ||
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | ||
log = logging.getLogger(__name__) | ||
|
||
|
||
async def main(): | ||
"""initializes process of getting sky brightness predictions for h3 cells; | ||
publishing them to prediction queue as available. | ||
over 122 res 0 cells on current machine, this setup will publish at a rate of 1.4m/s | ||
""" | ||
|
||
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host)) | ||
channel = connection.channel() | ||
channel.queue_declare(queue=prediction_queue) | ||
|
||
all_h3_cell_coords = get_res_zero_cell_coords() | ||
log.info(f"using {len(all_h3_cell_coords)} resolution zero cells") | ||
|
||
async with httpx.AsyncClient() as client: | ||
while True: | ||
for coords in all_h3_cell_coords: | ||
await asyncio.create_task(predict_on_cell(client, coords, channel)) | ||
await asyncio.sleep(sleep_interval) | ||
# tasks = [predict_on_cell(client, coords, channel) for coords in all_h3_cell_coords] | ||
# await asyncio.gather(*tasks) | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from dataclasses import dataclass | ||
|
||
|
||
@dataclass | ||
class PredictionMessage: | ||
time_of: str | ||
lat: float | ||
lon: float | ||
sky_brightness_mpsas: float |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from typing import Tuple | ||
from dataclasses import asdict | ||
from datetime import datetime | ||
import asyncio | ||
import logging | ||
|
||
from pika.channel import Channel | ||
import httpx | ||
|
||
from .config import api_protocol, api_host, api_port, api_version, sleep_interval | ||
from .message import PredictionMessage | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
prediction_endpoint_url = f"{api_protocol}://{api_host}:{api_port}/api/{api_version}/predict" | ||
|
||
|
||
async def get_prediction_message_for_lat_lon(client: httpx.AsyncClient, lat: float, lon: float) -> PredictionMessage: | ||
res = await client.get(prediction_endpoint_url, params={"lat": lat, "lon": lon}) | ||
res.raise_for_status() | ||
|
||
data = res.json() | ||
return PredictionMessage( | ||
lat=lat, | ||
lon=lon, | ||
time_of=datetime.utcnow().isoformat(), | ||
sky_brightness_mpsas=data["sky_brightness"], | ||
) | ||
|
||
|
||
async def predict_on_cell(client: httpx.AsyncClient, coords: Tuple[float, float], channel: Channel): | ||
import json | ||
|
||
try: | ||
lat, lon = coords | ||
prediction_message = await get_prediction_message_for_lat_lon(client, lat, lon) | ||
message_body = asdict(prediction_message) | ||
|
||
log.info(f"publishing prediction message {message_body}") | ||
|
||
# FIXME exchange, routing key | ||
channel.basic_publish(exchange="", routing_key="test", body=json.dumps(message_body)) | ||
except httpx.HTTPStatusError as e: | ||
log.error(f"got bad status from api server {e}") | ||
except Exception as e: | ||
log.error(f"could not publish prediction at {coords} because {e}") | ||
finally: | ||
# await asyncio.sleep(sleep_interval) | ||
pass |
File renamed without changes.