Skip to content

Commit

Permalink
perturbed indicators
Browse files Browse the repository at this point in the history
  • Loading branch information
RondeauG committed Dec 7, 2023
1 parent ac60ff8 commit bea66e9
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 1 deletion.
233 changes: 233 additions & 0 deletions xhydro/cc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
"""Module to compute climate change statistics using xscen functions."""

from typing import Optional, Union

import numpy as np
import xarray as xr

# Special imports from xscen
from xscen import ( # FIXME: To be replaced with climatological_op once available
climatological_mean,
Expand All @@ -12,6 +17,7 @@
"climatological_op",
"compute_deltas",
"ensemble_stats",
"perturbed_indicators",
"produce_horizon",
]

Expand All @@ -27,3 +33,230 @@ def climatological_op(ds, **kwargs):
See :py:func:`xscen.aggregate.climatological_mean` for more details.
"""
return climatological_mean(ds, **kwargs)


def perturbed_indicators(
ds: xr.Dataset,
deltas: xr.Dataset,
delta_type: str,
*,
delta_weights: Optional[xr.DataArray] = None,
stack_weights: Optional[Union[xr.DataArray, str, list[str]]] = None,
n: int = 50000,
seed: int = None,
return_dist: bool = False,
) -> Union[xr.Dataset, tuple[xr.Dataset, xr.Dataset, xr.Dataset, xr.Dataset]]:
"""Compute future indicators using a perturbation approach and random sampling.
Parameters
----------
ds : xr.Dataset
Dataset containing the historical indicators. The indicators are expected to be represented by a distribution of pre-computed percentiles.
The percentiles should be stored in either a dimension called "percentile" [0, 100] or "quantile" [0, 1].
deltas : xr.Dataset
Dataset containing the future deltas to apply to the historical indicators.
delta_type : str
Type of delta provided. Must be one of ['absolute', 'percentage'].
delta_weights : xr.DataArray, optional
Weights to use when sampling the deltas, such as along the 'realization' dimension.
If not provided, the deltas will be sampled uniformly.
stack_weights : xr.DataArray, string or list of strings, optional
Weights to use along other dimensions, which will be stacked along the 'sample' dimension.
Use a string or list of strings to specify dimension(s) to stack for sampling purposes, but without applying weights.
The dimensions in 'stack_weights' must exist in both 'ds' and 'deltas'.
Use None to skip stacking additional dimensions.
n : int
Number of samples to generate.
seed : int
Seed to use for the random number generator.
return_dist : bool
Whether to return the full distributions (ds, deltas, fut) or only the percentiles.
Returns
-------
fut_pct : xr.Dataset
Dataset containing the future percentiles.
ds_dist : xr.Dataset
The historical distribution, stacked along the 'sample' dimension.
deltas_dist : xr.Dataset
The delta distribution, stacked along the 'sample' dimension.
fut_dist : xr.Dataset
The future distribution, stacked along the 'sample' dimension.
Notes
-----
The future percentiles are computed as follows:
1. Sample 'n' values from the historical distribution, weighting the percentiles by their associated coverage.
2. Sample 'n' values from the delta distribution, using the provided weights.
3. Create the future distribution by applying the sampled deltas to the sampled historical distribution, element-wise.
4. Compute the percentiles of the future distribution.
"""
# Prepare weights
percentile_weights = _percentile_weights(ds)
if stack_weights is not None:
# If stack_weights is a string or list of strings, convert it to a DataArray of ones
if isinstance(stack_weights, str):
stack_weights = [stack_weights]
if isinstance(stack_weights, list):
stack_weights = xr.DataArray(
np.ones([ds.sizes[dim] for dim in stack_weights]),
coords={dim: ds[dim] for dim in stack_weights},
dims=stack_weights,
)

percentile_weights = (
percentile_weights.expand_dims(
{dim: stack_weights[dim] for dim in stack_weights.dims}
)
* stack_weights
)
delta_weights = (
delta_weights.expand_dims(
{dim: stack_weights[dim] for dim in stack_weights.dims}
)
* stack_weights
)

# Sample the distributions
_, pdim, mult = _perc_or_quantile(ds)
ds_dist = (
_weighted_sampling(ds, percentile_weights, n=n, seed=seed)
.drop_vars(["sample", pdim, *stack_weights.dims])
.assign_coords({"sample": np.arange(n)})
)
deltas_dist = (
_weighted_sampling(deltas, delta_weights, n=n, seed=seed)
.drop_vars(["sample", *delta_weights.dims, *stack_weights.dims])
.assign_coords({"sample": np.arange(n)})
)

# Element-wise multiplication of the ref_dist and ens_dist
if delta_type == "percentage":
fut_dist = ds_dist * (1 + deltas_dist / 100)
elif delta_type == "absolute":
fut_dist = ds_dist + deltas_dist
else:
raise ValueError(
f"Unknown operation '{delta_type}', expected one of ['absolute', 'percentage']."
)

# Compute future percentiles
fut_pct = fut_dist.quantile(ds.percentile / mult, dim="sample")

if pdim == "percentile":
fut_pct = fut_pct.rename({"quantile": "percentile"})
fut_pct["percentile"] = ds.percentile

if return_dist:
return fut_pct, ds_dist, deltas_dist, fut_dist
else:
return fut_pct


def _percentile_weights(da: Union[xr.DataArray, xr.Dataset]) -> xr.DataArray:
"""Compute the weights associated with percentiles, with support for unevenly spaced percentiles.
Parameters
----------
da : xr.DataArray or xr.Dataset
DataArray or Dataset containing the percentiles to use when sampling.
The percentiles are expected to be stored in either a dimension called "percentile" [0, 100] or "quantile" [0, 1].
Returns
-------
p : xr.DataArray
DataArray containing the weights associated with the percentiles.
"""
pct, pdim, multiplier = _perc_or_quantile(da)

# Temporarily add a 0 and 100th percentile
p0 = xr.DataArray([0], coords={pdim: [0]}, dims=[pdim])
p1 = xr.DataArray([multiplier], coords={pdim: [multiplier]}, dims=[pdim])
p = xr.concat([p0, pct, p1], dim=pdim)
p = p.diff(pdim) / 2
p[0] = (
p[0] * 2
) # The first and last weights need to be doubled to account for the fact that the first and last percentiles are not centered
p[-1] = p[-1] * 2
p = p.rolling({pdim: 2}, center=True).sum().shift({pdim: -1})[:-1]

return p


def _weighted_sampling(
ds: xr.Dataset, weights: xr.DataArray, n: int = 50000, seed: int = None
) -> xr.Dataset:
"""Sample from a distribution with weights.
Parameters
----------
ds : xr.Dataset
Dataset containing the distribution to sample from.
weights : xr.DataArray
Weights to use when sampling.
n : int
Number of samples to generate.
seed : int
Seed to use for the random number generator.
Returns
-------
ds_dist : xr.Dataset
Dataset containing the 'n' samples, stacked along the 'sample' dimension.
"""
# Prepare the weights
if weights.isnull().any():
raise ValueError("The weights contain NaNs.")
weights = weights / weights.sum() # Must equal 1

# For performance reasons, remove the chunking along impacted dimensions
ds = ds.chunk({dim: -1 for dim in weights.dims})
weights = weights.chunk({dim: -1 for dim in weights.dims})

# Stack the dimensions containing weights
ds = ds.stack({"sample": weights.dims})
weights = weights.stack({"sample": weights.dims})
weights = weights.reindex_like(ds)

# Sample the dimensions with weights
rng = np.random.default_rng(seed=seed)
idx = rng.choice(weights.size, size=n, p=weights)

# Create the distribution dataset
ds_dist = ds.isel({"sample": idx})

return ds_dist


def _perc_or_quantile(
da: Union[xr.DataArray, xr.Dataset]
) -> tuple[xr.DataArray, str, int]:
"""Return 'percentile' or 'quantile' depending on the name of the percentile dimension."""
if isinstance(da, xr.DataArray):
pdim = str(da.name)
pct = da
if pdim not in ["percentile", "quantile"]:
raise ValueError(
f"DataArray is not named 'percentile' or 'quantile': received {pdim}"
)
elif isinstance(da, xr.Dataset):
pdim = [dim for dim in da.dims if dim in ["percentile", "quantile"]]
if len(pdim) != 1:
raise ValueError(
f"Dataset contains more than one dimension in ['percentile', 'quantile']: {pdim}"
)
pdim = pdim[0]
pct = da[pdim]
else:
raise TypeError(f"Expected DataArray or Dataset, got {type(da)}")

multiplier = 100 if pdim == "percentile" else 1
if (pct.min() < 0 or pct.max() > multiplier) or (
pdim == "percentile" and pct.max() <= 1
):
raise ValueError(
f"The {pdim} values do not seem to be in the [0, {multiplier}] range."
)

return pct, pdim, multiplier
2 changes: 1 addition & 1 deletion xhydro/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def publish_release_notes(
changes = re.sub(search, replacement, changes)

if not file:
return
return changes
if isinstance(file, (Path, os.PathLike)):
file = Path(file).open("w")
print(changes, file=file)

0 comments on commit bea66e9

Please sign in to comment.