Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More flexible sampled_indicators #220

Merged
merged 7 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ Contributors to this version: Gabriel Rondeau-Genesse (:user:`RondeauG`), Trevor
New features and enhancements
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* A `periods` parameter was added to ``frequency_analysis.local.fit`` to compute multiple separate periods with a single call. (:pull:`219`).
* In ``xhydro.cc.sampled_indicators``, the `delta_type` argument can now be a dictionary or None, in which case the attribute `delta_kind` is used. (:pull:`220`).
* In ``xhydro.cc.sampled_indicators``, weights along a `time` or `horizon` dimension will no longer reduce that dimension. (:pull:`220`).

Breaking changes
^^^^^^^^^^^^^^^^
* The `xhydro` testing utilities have been rewritten to use `pooch` for downloading and caching testing datasets from `hydrologie/xhydro-testdata`. (:pull:`212`).
* The `xhydro` testing utilities now require `pytest-xdist` as a development dependency. (:pull:`212`).
* Many core dependencies have been updated to more modern versions. (:pull:`218`).
* The `delta_type` argument in ``xhydro.cc.sampled_indicators`` has been renamed to `delta_kind` and is no longer positional. (:pull:`220`).

Internal changes
^^^^^^^^^^^^^^^^
Expand Down
15 changes: 5 additions & 10 deletions docs/notebooks/climate_change.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
"outputs": [],
"source": [
"# Imports\n",
"# Hide INFO-level logs\n",
"import logging\n",
"from pathlib import Path\n",
"\n",
"import hvplot.xarray # noqa\n",
Expand All @@ -29,20 +27,17 @@
"import xhydro as xh\n",
"from xhydro.testing.helpers import deveraux\n",
"\n",
"logger = logging.getLogger()\n",
"logger.setLevel(logging.CRITICAL)\n",
"\n",
"D = deveraux()\n",
"\n",
"# Streamflow file (1 file - Hydrotel driven by BCC-CSM-1.1(m))\n",
"# Future streamflow file (1 file - Hydrotel driven by BCC-CSM-1.1(m))\n",
"streamflow_file = D.fetch(\n",
" \"cc_indicators/streamflow_BCC-CSM1.1-m_rcp45.nc\",\n",
")\n",
"\n",
"# Reference QMOYAN (6 platforms)\n",
"# Reference mean annual streamflow (QMOYAN) for 6 calibrations of Hydrotel\n",
"reference_files = D.fetch(\"cc_indicators/reference.zip\", pooch.Unzip())\n",
"\n",
"# QMOYAN deltas (63 simulations x 6 platforms)\n",
"# Future deltas of QMOYAN (63 simulations x 6 calibrations of Hydrotel)\n",
"deltas_files = D.fetch(\"cc_indicators/deltas.zip\", pooch.Unzip())"
]
},
Expand Down Expand Up @@ -492,7 +487,7 @@
"\n",
"- *ds*: Dataset containing the historical indicators. The indicators are expected to be represented by a distribution of pre-computed percentiles.\n",
"- *deltas*: Dataset containing the future deltas to apply to the historical indicators.\n",
"- *delta_type*: Type of delta provided. Must be one of ['absolute', 'percentage'].\n",
"- *delta_kind*: Kind of delta provided. Must be one of ['absolute', 'percentage'].\n",
"- *ds_weights* (optional): Weights to use when sampling the historical indicators, for dimensions other than 'percentile'/'quantile'. Dimensions not present in this Dataset, or if None, will be sampled uniformly unless they are shared with 'deltas'.\n",
"- *delta_weights* (optional): Weights to use when sampling the deltas, such as along the 'realization' dimension. Dimensions not present in this Dataset, or if None, will be sampled uniformly unless they are shared with 'ds'.\n",
"- *n*: Number of samples to generate.\n",
Expand All @@ -516,7 +511,7 @@
"fut_pct, hist_dist, delta_dist, fut_dist = xh.cc.sampled_indicators(\n",
" ref,\n",
" deltas=deltas,\n",
" delta_type=\"percentage\",\n",
" delta_kind=\"percentage\",\n",
" delta_weights=weights,\n",
" n=n,\n",
" seed=0,\n",
Expand Down
156 changes: 133 additions & 23 deletions src/xhydro/cc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Module to compute climate change statistics using xscen functions."""

from typing import Optional, Union
from datetime import datetime

import numpy as np
import xarray as xr
import xscen as xs

# Special imports from xscen
from xscen import climatological_op, compute_deltas, ensemble_stats, produce_horizon
Expand All @@ -17,11 +18,11 @@
]


def sampled_indicators(
def sampled_indicators( # noqa: C901
ds: xr.Dataset,
deltas: xr.Dataset,
delta_type: str,
*,
delta_kind: str | dict | None = None,
ds_weights: xr.DataArray | None = None,
delta_weights: xr.DataArray | None = None,
n: int = 50000,
Expand All @@ -37,8 +38,10 @@ def sampled_indicators(
The percentiles should be stored in either a dimension called "percentile" [0, 100] or "quantile" [0, 1].
deltas : xr.Dataset
Dataset containing the future deltas to apply to the historical indicators.
delta_type : str
Type of delta provided. Must be one of ['absolute', 'percentage'].
delta_kind : str or dict, optional
Type of delta provided. Recognized values are: ['absolute', 'abs.', '+'], ['percentage', 'pct.', '%'].
If a dict is provided, it should map the variable names to their respective delta type.
If None, the variables should have a 'delta_kind' attribute.
ds_weights : xr.DataArray, optional
Weights to use when sampling the historical indicators, for dimensions other than 'percentile'/'quantile'.
Dimensions not present in this Dataset, or if None, will be sampled uniformly unless they are shared with 'deltas'.
Expand All @@ -65,6 +68,9 @@ def sampled_indicators(

Notes
-----
Weights along the 'time' or 'horizon' dimensions are supported, but behave differently than other dimensions. They will not
be stacked alongside other dimensions in the new 'sample' dimension. Rather, a separate sampling will be done for each time/horizon,

The future percentiles are computed as follows:
1. Sample 'n' values from the historical distribution, weighting the percentiles by their associated coverage.
2. Sample 'n' values from the delta distribution, using the provided weights.
Expand All @@ -90,7 +96,9 @@ def sampled_indicators(
)
}
)
deltas_weighted = True
if delta_weights is None:
deltas_weighted = False
dims = set(deltas.dims).difference(list(shared_dims) + exclude_dims)
delta_weights = xr.DataArray(
np.ones([deltas.sizes[dim] for dim in dims]),
Expand All @@ -115,31 +123,104 @@ def sampled_indicators(
f"Dimension(s) {problem_dims} is shared between 'ds' and 'deltas', but not between 'ds_weights' and 'delta_weights'."
)

# Prepare the operation to apply on the variables
if delta_kind is None:
try:
delta_kind = {
var: deltas[var].attrs["delta_kind"] for var in deltas.data_vars
}
except KeyError:
raise KeyError(
"The 'delta_kind' argument is None, but the variables within the 'deltas' Dataset are missing a 'delta_kind' attribute."
)
elif isinstance(delta_kind, str):
delta_kind = {var: delta_kind for var in deltas.data_vars}
elif isinstance(delta_kind, dict):
if not all([var in delta_kind for var in deltas.data_vars]):
raise ValueError(
f"If 'delta_kind' is a dict, it should contain all the variables in 'deltas'. Missing variables: "
f"{list(set(deltas.data_vars) - set(delta_kind))}."
)

def _add_sampling_attrs(d, prefix, history, ds_for_attrs=None):
for var in d.data_vars:
if ds_for_attrs is not None:
d[var].attrs = ds_for_attrs[var].attrs
d[var].attrs["sampling_kind"] = delta_kind[var]
d[var].attrs["sampling_n"] = n
d[var].attrs["sampling_seed"] = seed
d[var].attrs[
"description"
] = f"{prefix} of {d[var].attrs.get('long_name', var)} constructed from a perturbation approach and random sampling."
d[var].attrs[
"long_name"
] = f"{prefix} of {d[var].attrs.get('long_name', var)}"
old_history = d[var].attrs.get("history", "")
d[var].attrs[
"history"
] = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {history}\n {old_history}"

# Sample the distributions
_, pdim, mult = _perc_or_quantile(ds)
ds_dist = (
_weighted_sampling(ds, percentile_weights, n=n, seed=seed)
.drop_vars(["sample", *percentile_weights.dims])
.drop_vars("sample", errors="ignore")
.assign_coords({"sample": np.arange(n)})
)
_add_sampling_attrs(
ds_dist,
"Historical distribution",
f"{'Weighted' if ds_weights is not None else 'Uniform'} random sampling "
f"applied to the historical distribution using 'xhydro.sampled_indicators' "
f"and {n} samples.",
ds_for_attrs=ds,
)

deltas_dist = (
_weighted_sampling(deltas, delta_weights, n=n, seed=seed)
.drop_vars(["sample", *delta_weights.dims])
.drop_vars("sample", errors="ignore")
.assign_coords({"sample": np.arange(n)})
)
_add_sampling_attrs(
deltas_dist,
"Future delta distribution",
f"{'Weighted' if deltas_weighted else 'Uniform'} random sampling "
f"applied to the delta distribution using 'xhydro.sampled_indicators' "
f"and {n} samples.",
ds_for_attrs=deltas,
)

# Element-wise multiplication of the ref_dist and ens_dist
if delta_type == "percentage":
fut_dist = ds_dist * (1 + deltas_dist / 100)
elif delta_type == "absolute":
fut_dist = ds_dist + deltas_dist
else:
raise ValueError(
f"Unknown operation '{delta_type}', expected one of ['absolute', 'percentage']."
)
# Apply the deltas element-wise to the historical distribution
fut_dist = xr.Dataset()
for var in deltas.data_vars:
with xr.set_options(keep_attrs=True):
if delta_kind[var] in ["absolute", "abs.", "+"]:
fut_dist[var] = ds_dist[var] + deltas_dist[var]
elif delta_kind[var] in ["percentage", "pct.", "%"]:
fut_dist[var] = ds_dist[var] * (1 + deltas_dist[var] / 100)
else:
raise ValueError(
f"Unknown operation '{delta_kind[var]}', expected one of ['absolute', 'abs.', '+'], ['percentage', 'pct.', '%']."
)
_add_sampling_attrs(
fut_dist,
"Future distribution",
f"Future distribution constructed from a perturbation approach and random sampling with {n} samples.",
ds_for_attrs=ds,
)

# Build the attributes based on common attributes between the historical and delta distributions
fut_dist = xs.clean_up(fut_dist, common_attrs_only=[ds, deltas])

# Compute future percentiles
fut_pct = fut_dist.quantile(ds.percentile / mult, dim="sample")
with xr.set_options(keep_attrs=True):
fut_pct = fut_dist.quantile(ds.percentile / mult, dim="sample")
_add_sampling_attrs(
fut_pct,
"Future percentiles",
f"Future percentiles computed from the future distribution using 'xhydro.sampled_indicators' and {n} samples.",
ds_for_attrs=ds,
)

if pdim == "percentile":
fut_pct = fut_pct.rename({"quantile": "percentile"})
Expand Down Expand Up @@ -185,6 +266,7 @@ def _weighted_sampling(
ds: xr.Dataset, weights: xr.DataArray, n: int = 50000, seed: int | None = None
) -> xr.Dataset:
"""Sample from a distribution with weights.
In the case of weights on a 'time' or 'horizon' dimension, the sampling is done separately for each time/horizon, but with the same seed.

Parameters
----------
Expand All @@ -205,23 +287,51 @@ def _weighted_sampling(
# Prepare the weights
if weights.isnull().any():
raise ValueError("The weights contain NaNs.")
weights = weights / weights.sum() # Must equal 1

# Weights on the time dimension need to be handled differently
time_dim = [dim for dim in weights.dims if dim in ["time", "horizon"]]
if len(time_dim) > 1:
raise NotImplementedError(
"Weights on multiple time dimensions are not supported."
)
time_dim = time_dim[0] if time_dim else ""
other_dims = list(set(weights.dims).difference([time_dim]))

# Must equal 1
weights = weights / weights.sum(other_dims)

# For performance reasons, remove the chunking along impacted dimensions
ds = ds.chunk({dim: -1 for dim in weights.dims})
weights = weights.chunk({dim: -1 for dim in weights.dims})

# Stack the dimensions containing weights
ds = ds.stack({"sample": sorted(list(weights.dims))})
weights = weights.stack({"sample": sorted(list(weights.dims))})
ds = ds.stack({"sample": sorted(other_dims)})
weights = weights.stack({"sample": sorted(other_dims)})
weights = weights.reindex_like(ds)

# Sample the dimensions with weights
rng = np.random.default_rng(seed=seed)
idx = rng.choice(weights.size, size=n, p=weights)

# Create the distribution dataset
ds_dist = ds.isel({"sample": idx}).chunk({"sample": -1})
if not time_dim:
idx = rng.choice(len(weights.sample), size=n, p=weights)
# Create the distribution dataset
ds_dist = ds.isel({"sample": idx}).chunk({"sample": -1})
else:
# 2D weights are not supported by np.random.choice, so we need to loop over the time dimension
ds_tmp = [
ds.sel({time_dim: time}).isel(
{
"sample": rng.choice(
len(weights.sample), size=n, p=weights.sel({time_dim: time})
)
}
)
for time in ds[time_dim].values
]
# Remove the coordinates from the sample dimension to allow concatenation
for i, ds_ in enumerate(ds_tmp):
ds_tmp[i] = ds_.reset_index("sample", drop=True)
ds_dist = xr.concat(ds_tmp, dim=time_dim).chunk({time_dim: -1, "sample": -1})

return ds_dist

Expand Down
Loading