Skip to content

Commit

Permalink
Use concurrent requests for WMS clients
Browse files Browse the repository at this point in the history
  • Loading branch information
jdejaegh committed Jun 30, 2024
1 parent 3b2e882 commit 5b899da
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 106 deletions.
4 changes: 2 additions & 2 deletions src/open_irceline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .api import IrcelineApiError
from .rio import IrcelineRioClient
from .data import RioFeature, ForecastFeature, FeatureValue, RioIfdmFeature
from .forecast import IrcelineForecastClient
from .data import RioFeature, ForecastFeature, FeatureValue
from .rio import IrcelineRioClient, IrcelineRioIfdmClient

__version__ = '2.0.0'
4 changes: 1 addition & 3 deletions src/open_irceline/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from aiohttp import ClientResponse

from .data import IrcelineFeature
from .utils import SizedDict

_rio_wfs_base_url = 'https://geo.irceline.be/wfs'
_forecast_wms_base_url = 'https://geo.irceline.be/forecast/wms'
Expand All @@ -22,9 +21,8 @@ class IrcelineApiError(Exception):


class IrcelineBaseClient(ABC):
def __init__(self, session: aiohttp.ClientSession, cache_size: int = 20) -> None:
def __init__(self, session: aiohttp.ClientSession) -> None:
self._session = session
self._cache = SizedDict(cache_size)

@abstractmethod
async def get_data(self,
Expand Down
34 changes: 23 additions & 11 deletions src/open_irceline/forecast.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from datetime import date, timedelta, datetime
from itertools import product
from typing import List, Tuple, Dict
Expand Down Expand Up @@ -28,16 +29,27 @@ async def get_data(self,
base_querystring = (self._default_querystring |
{"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}"})

for feature, d in product(features, range(4)):
querystring = base_querystring | {"layers": f"{feature}_d{d}",
"query_layers": f"{feature}_d{d}"}
try:
r: ClientResponse = await self._api_wrapper(self._base_url, querystring)
r: dict = await r.json()
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(
value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'),
timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None)
except (IrcelineApiError, ClientResponseError, IndexError):
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(value=None, timestamp=None)
tasks = [asyncio.create_task(self._get_single_feature(base_querystring, d, feature, timestamp))
for feature, d in product(features, range(4))]
results = await asyncio.gather(*tasks)

for r in results:
result |= r

return result

async def _get_single_feature(self, base_querystring: dict, d: int, feature: ForecastFeature,
timestamp: date) -> dict:
result = dict()

querystring = base_querystring | {"layers": f"{feature}_d{d}",
"query_layers": f"{feature}_d{d}"}
try:
r: ClientResponse = await self._api_wrapper(self._base_url, querystring)
r: dict = await r.json()
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(
value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'),
timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None)
except (IrcelineApiError, ClientResponseError, IndexError):
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(value=None, timestamp=None)
return result
30 changes: 19 additions & 11 deletions src/open_irceline/rio.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from datetime import datetime, date, UTC, timedelta
from typing import List, Tuple, Dict, Set
from xml.etree import ElementTree
Expand Down Expand Up @@ -152,17 +153,24 @@ async def get_data(self,
lat, lon = position
base_querystring = (self._default_querystring |
{"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}"})
print({"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}"})

for feature in features:
querystring = base_querystring | {"layers": f"{feature}", "query_layers": f"{feature}"}
try:
r: ClientResponse = await self._api_wrapper(self._base_url, querystring)
r: dict = await r.json()
result[feature] = FeatureValue(
value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'),
timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None)
except (IrcelineApiError, ClientResponseError, IndexError):
result[feature] = FeatureValue(value=None, timestamp=None)
tasks = [asyncio.create_task(self._get_single_feature(base_querystring, feature)) for feature in features]
results = await asyncio.gather(*tasks)

for r in results:
result |= r

return result

async def _get_single_feature(self, base_querystring: dict, feature: RioIfdmFeature) -> dict:
result = dict()
querystring = base_querystring | {"layers": f"{feature}", "query_layers": f"{feature}"}
try:
r: ClientResponse = await self._api_wrapper(self._base_url, querystring)
r: dict = await r.json()
result[feature] = FeatureValue(
value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'),
timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None)
except (IrcelineApiError, ClientResponseError, IndexError):
result[feature] = FeatureValue(value=None, timestamp=None)
return result
38 changes: 0 additions & 38 deletions src/open_irceline/utils.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,10 @@
from collections import OrderedDict
from typing import Tuple

from pyproj import Transformer

_project_transform = Transformer.from_crs('EPSG:4326', 'EPSG:31370', always_xy=False)


class SizedDict(OrderedDict):
"""Dictionary with a maximum size. When more items are added, the least recently accessed element is evicted"""

def __init__(self, size: int):
super().__init__()
self._size = size

def __setitem__(self, key, value):
super().__setitem__(key, value)
self.move_to_end(key)
if len(self) > self._size:
self.popitem(False)

def __getitem__(self, key):
self.move_to_end(key)
return super().__getitem__(key)

def get(self, __key, __default=None):
self.move_to_end(__key)
return super().get(__key, __default)

def update(self, __m, **kwargs):
raise NotImplementedError()


def epsg_transform(position: Tuple[float, float]) -> Tuple[int, int]:
"""
Convert 'EPSG:4326' coordinates to 'EPSG:31370' coordinates
Expand All @@ -39,15 +13,3 @@ def epsg_transform(position: Tuple[float, float]) -> Tuple[int, int]:
"""
result = _project_transform.transform(position[0], position[1])
return round(result[0]), round(result[1])


def round_coordinates(x: float, y: float, step=.05) -> Tuple[float, float]:
"""
Round the coordinate to the precision given by step
:param x: latitude
:param y: longitude
:param step: precision of the rounding
:return: x and y round to the closest step increment
"""
n = 1 / step
return round(x * n) / n, round(y * n) / n
42 changes: 1 addition & 41 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,4 @@
import pytest

from src.open_irceline.utils import SizedDict, round_coordinates, epsg_transform


def test_sized_dict():
s_dict = SizedDict(5)
assert len(s_dict) == 0

s_dict['a'] = 1
s_dict['b'] = 2
s_dict['c'] = 3
s_dict['d'] = 4
s_dict['e'] = 5
assert len(s_dict) == 5

s_dict['f'] = 6
assert 'a' not in s_dict
assert s_dict['f'] == 6
assert len(s_dict) == 5

s_dict['b'] = 42
s_dict['g'] = 7
assert s_dict.get('f') == 6
assert s_dict['g'] == 7
assert s_dict['b'] == 42
assert 'c' not in s_dict
assert len(s_dict) == 5

del s_dict['b']
assert len(s_dict) == 4
assert 'b' not in s_dict

with pytest.raises(NotImplementedError):
s_dict.update({'a': 1})


def test_round_coord():
x, y = round_coordinates(50.4657, 4.8647)
assert x == 50.45
assert y == 4.85
from src.open_irceline.utils import epsg_transform


def test_epsg_transform():
Expand Down

0 comments on commit 5b899da

Please sign in to comment.