Skip to content

Commit

Permalink
Refacto prep_kwargs_loop to use also in drinking water quality
Browse files Browse the repository at this point in the history
  • Loading branch information
tgrandje committed Oct 1, 2024
1 parent c0dc4f9 commit 056e829
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 76 deletions.
81 changes: 74 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ due time.
At this stage, the following APIs are covered by cl-hubeau:
* [piezometry/piézométrie](https://hubeau.eaufrance.fr/page/api-piezometrie)
* [hydrometry/hydrométrie](https://hubeau.eaufrance.fr/page/api-hydrometrie)
* [drinking water quality/qualité de l'eau potable](https://hubeau.eaufrance.fr/page/api-qualite-eau-potable#/qualite_eau_potable/communes)
* [drinking water quality/qualité de l'eau potable](https://hubeau.eaufrance.fr/page/api-qualite-eau-potable)
* [superficial waterbodies quality/qualité physico-chimique des cours d'eau'](https://hubeau.eaufrance.fr/page/api-qualite-cours-deau)

For any help on available kwargs for each endpoint, please refer
directly to the documentation on hubeau (this will not be covered
Expand Down Expand Up @@ -159,11 +160,6 @@ Note that this query is heavy, even if this was already restricted to nitrates.
In theory, you could also query the API without specifying the substance you're tracking,
but you may hit the 20k threshold and trigger an exception.

As it is, the `get_control_results` function already implements a double loop:

* on networks' codes (20 codes maximum) ;
* on periods, requesting only yearly datasets (which should be scalable over time **and** should work nicely with the cache algorithm).

You can also call the same function, using official city codes directly:
```python
df = drinking_water_quality.get_control_results(
Expand All @@ -185,4 +181,75 @@ with drinking_water_quality.DrinkingWaterQualitySession() as session:
df = session.get_cities_networks(nom_commune="LILLE")
df = session.get_control_results(code_departement='02', code_parametre="1340")

```
```

### Superficial waterbodies quality

4 high level functions are available (and one class for low level operations).


Get all stations (uses a 30 days caching):

```python
from cl_hubeau import superficial_waterbodies_quality
df = superficial_waterbodies_quality.get_all_stations()
```

Get all operations (uses a 30 days caching):

```python
from cl_hubeau import superficial_waterbodies_quality
df = superficial_waterbodies_quality.get_all_operations()
```

Note that this query is heavy, users should restrict it to a given territory.
For instance, you could use :
```python
df = superficial_waterbodies_quality.get_all_operations(code_region="11")
```

Get all environmental conditions:

```python
from cl_hubeau import superficial_waterbodies_quality
df = superficial_waterbodies_quality.get_all_environmental_conditions()
```

Note that this query is heavy, users should restrict it to a given territory.
For instance, you could use :
```python
df = superficial_waterbodies_quality.get_all_environmental_conditions(code_region="11")
```

Get all physicochemical analysis:
```python
from cl_hubeau import superficial_waterbodies_quality
df = superficial_waterbodies_quality.get_all_analysis()
```

Note that this query is heavy, users should restrict it to a given territory
and given parameters. For instance, you could use :
```python
df = superficial_waterbodies_quality.get_all_analysis(
code_departement="59",
code_parametre="1313"
)
```


Low level class to perform the same tasks:


Note that :

* the API is forbidding results > 20k rows and you may need inner loops
* the cache handling will be your responsibility

```python
with superficial_waterbodies_quality.SuperficialWaterbodiesQualitySession() as session:
df = session.get_stations(code_commune="59183")
df = session.get_operations(code_commune="59183")
df = session.get_environmental_conditions(code_commune="59183")
df = session.get_analysis(code_commune='59183', code_parametre="1340")

```
59 changes: 17 additions & 42 deletions cl_hubeau/drinking_water_quality/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Convenience functions for hydrometry consumption
"""

from datetime import date, datetime
from datetime import date
from itertools import product

import pandas as pd
Expand All @@ -13,7 +13,7 @@

from cl_hubeau.drinking_water_quality import DrinkingWaterQualitySession
from cl_hubeau import _config
from cl_hubeau.utils import get_cities
from cl_hubeau.utils import get_cities, prepare_kwargs_loops


def get_all_water_networks(**kwargs) -> pd.DataFrame:
Expand Down Expand Up @@ -66,8 +66,8 @@ def get_control_results(
Retrieve sanitary controls' results.
Uses a loop to avoid reaching 20k results threshold.
As queries may induce big datasets, loops are based on networks and years,
even if date_min_prelevement/date_max_prelevement are not set.
As queries may induce big datasets, loops are based on networks and 6 month
timeranges, even if date_min_prelevement/date_max_prelevement are not set.
Note that `codes_reseaux` and `codes_communes` are mutually exclusive!
Expand Down Expand Up @@ -112,41 +112,26 @@ def get_control_results(
if "date_max_prelevement" not in kwargs:
kwargs["date_max_prelevement"] = date.today().strftime("%Y-%m-%d")

ranges = pd.date_range(
start=datetime.strptime(
kwargs.pop("date_min_prelevement"), "%Y-%m-%d"
).date(),
end=datetime.strptime(
kwargs.pop("date_max_prelevement"), "%Y-%m-%d"
).date(),
kwargs_loop = prepare_kwargs_loops(
"date_min_prelevement",
"date_max_prelevement",
kwargs,
start_auto_determination,
)
dates = pd.Series(ranges).to_frame("date")
dates["year"] = dates["date"].dt.year
dates = dates.groupby("year")["date"].agg(["min", "max"])
for d in "min", "max":
dates[d] = dates[d].dt.strftime("%Y-%m-%d")
if start_auto_determination:
dates = pd.concat(
[
dates,
pd.DataFrame([{"min": "1900-01-01", "max": "2015-12-31"}]),
],
ignore_index=False,
).sort_index()

args = list(product(codes, dates.values.tolist()))

kwargs_loop = list(product(codes, kwargs_loop))
[kwargs.update({codes_names: chunk}) for chunk, kwargs in kwargs_loop]
kwargs_loop = [x[1] for x in kwargs_loop]

with DrinkingWaterQualitySession() as session:

results = [
session.get_control_results(
date_min_prelevement=date_min,
date_max_prelevement=date_max,
**{codes_names: chunk},
**kwargs
**kwargs,
**kw_loop,
)
for chunk, (date_min, date_max) in tqdm(
args,
for kw_loop in tqdm(
kwargs_loop,
desc="querying network/network and year/year",
leave=_config["TQDM_LEAVE"],
position=tqdm._get_free_pos(),
Expand All @@ -155,13 +140,3 @@ def get_control_results(
results = [x.dropna(axis=1, how="all") for x in results if not x.empty]
results = pd.concat(results, ignore_index=True)
return results


# if __name__ == "__main__":
# df = get_control_results(
# codes_communes="59350",
# code_parametre="1340",
# date_min_prelevement="2023-01-01",
# date_max_prelevement="2023-12-31",
# )
# print(df)
27 changes: 19 additions & 8 deletions cl_hubeau/superficial_waterbodies_quality/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Convenience functions for superficial waterbodies quality inspections
"""

from datetime import date, datetime
from datetime import date
import warnings

import geopandas as gpd
Expand Down Expand Up @@ -120,7 +120,12 @@ def get_all_operations(**kwargs) -> gpd.GeoDataFrame:
" & dep/dep" if "code_departement" in kwargs else ""
)

kwargs_loop = prepare_kwargs_loops(kwargs, start_auto_determination)
kwargs_loop = prepare_kwargs_loops(
"date_debut_prelevement",
"date_fin_prelevement",
kwargs,
start_auto_determination,
)

with SuperficialWaterbodiesQualitySession() as session:

Expand Down Expand Up @@ -196,7 +201,12 @@ def get_all_environmental_conditions(**kwargs) -> gpd.GeoDataFrame:
" & dep/dep" if "code_departement" in kwargs else ""
)

kwargs_loop = prepare_kwargs_loops(kwargs, start_auto_determination)
kwargs_loop = prepare_kwargs_loops(
"date_debut_prelevement",
"date_fin_prelevement",
kwargs,
start_auto_determination,
)

with SuperficialWaterbodiesQualitySession() as session:

Expand Down Expand Up @@ -269,7 +279,12 @@ def get_all_analysis(**kwargs) -> gpd.GeoDataFrame:
" & dep/dep" if "code_departement" in kwargs else ""
)

kwargs_loop = prepare_kwargs_loops(kwargs, start_auto_determination)
kwargs_loop = prepare_kwargs_loops(
"date_debut_prelevement",
"date_fin_prelevement",
kwargs,
start_auto_determination,
)

with SuperficialWaterbodiesQualitySession() as session:

Expand All @@ -285,7 +300,3 @@ def get_all_analysis(**kwargs) -> gpd.GeoDataFrame:
results = [x.dropna(axis=1, how="all") for x in results if not x.empty]
results = pd.concat(results, ignore_index=True)
return results


if __name__ == "__main__":
gdf = get_all_analysis(code_departement="32", code_parametre="1313")
48 changes: 32 additions & 16 deletions cl_hubeau/utils/prepare_loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@
Utilitary functions to prepare temporal and territorial loops
"""
import pandas as pd
from datetime import datetime
from datetime import datetime, timedelta


def prepare_kwargs_loops(kwargs: dict, start_auto_determination: bool) -> dict:
def prepare_kwargs_loops(
key_start: str, key_end: str, kwargs: dict, start_auto_determination: bool
) -> dict:
"""
Prepare a list of kwargs of arguments to prepare a temporal loop.
WARNING : kwargs is changed by side-effect!
Parameters
----------
key_start : str
Field representing the start of a timestep in the API (for instance,
"date_debut_prelevement")
key_end : str
Field representing the end of a timestep in the API (for instance,
"date_fin_prelevement")
kwargs : dict
kwargs passed to a higher level function.
start_auto_determination : bool
Expand All @@ -24,20 +32,16 @@ def prepare_kwargs_loops(kwargs: dict, start_auto_determination: bool) -> dict:
-------
args : dict
List of dict (kwargs) for inner loops. It should have 2 or 3 keys:
* date_debut_prelevement
* date_fin_prelevement
* key_start
* key_end
* code_departement (optional)
"""
start = datetime.strptime(
kwargs.pop("date_debut_prelevement"), "%Y-%m-%d"
).date()
end = datetime.strptime(
kwargs.pop("date_fin_prelevement"), "%Y-%m-%d"
).date()
start = datetime.strptime(kwargs.pop(key_start), "%Y-%m-%d").date()
end = datetime.strptime(kwargs.pop(key_end), "%Y-%m-%d").date()
ranges = pd.date_range(start, end=end, freq="6ME")
dates = pd.Series(ranges).to_frame("max")
dates["min"] = dates["max"].shift(1)
dates["min"] = dates["max"].shift(1) + timedelta(days=1)
dates = dates.dropna().loc[:, ["min", "max"]]
for d in "min", "max":
dates[d] = dates[d].dt.strftime("%Y-%m-%d")
Expand All @@ -47,15 +51,27 @@ def prepare_kwargs_loops(kwargs: dict, start_auto_determination: bool) -> dict:
dates = pd.concat(
[
dates,
pd.DataFrame([{"min": "1900-01-01", "max": "1960-01-01"}]),
pd.DataFrame(
[
{
"min": "1900-01-01",
"max": (
datetime.strptime(
dates["min"].min(), "%Y-%m-%d"
)
- timedelta(days=1)
).strftime("%Y-%m-%d"),
}
]
),
],
ignore_index=False,
).sort_index()
ignore_index=True,
).sort_values("min")
else:
dates.at[0, "min"] = start.strftime("%Y-%m-%d")
dates.at[dates.index.max(), "max"] = end.strftime("%Y-%m-%d")
args = dates.rename(
{"min": "date_debut_prelevement", "max": "date_fin_prelevement"},
{"min": key_start, "max": key_end},
axis=1,
)

Expand All @@ -69,7 +85,7 @@ def prepare_kwargs_loops(kwargs: dict, start_auto_determination: bool) -> dict:

# Force restitution of new results at first hand, to trigger
# ValueError >20k results faster
args = args.sort_values("date_fin_prelevement", ascending=False)
args = args.sort_values(key_end, ascending=False)

args = args.to_dict(orient="records")

Expand Down
4 changes: 2 additions & 2 deletions docs/drinking_water_quality.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ la probabilité d'atteindre le seuil maximal de 20 000 résultats est élevé.

En l'état, cette fonction implémente déjà une double boucle :
* sur les codes UDI
* sur les périodes, en requêtant par années calendaires (ce qui permet une scalabilité de l'algorithme dans le temps
et optimise l'usage du cache).
* sur les périodes, en requêtant par plage de 6 mois calendaires (ce qui permet
une scalabilité de l'algorithme dans le temps et optimise l'usage du cache).

Il est également possible de spécifier des arguments à la fonction, parmi ceux supportés
par le point de sortie "chroniques" de l'API, à l'exception de `code_reseau`.
Expand Down
2 changes: 1 addition & 1 deletion docs/superficial_waterbodies_quality.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ en France métropolitaine et DROM.

```python
from cl_hubeau import superficial_waterbodies_quality
df = superficial_waterbodies_quality.get_all_stations()
df = superficial_waterbodies_quality.get_all_operations()
```

{: .warning }
Expand Down

0 comments on commit 056e829

Please sign in to comment.