Skip to content

Commit

Permalink
Merge pull request #82 from UW-Hydro/develop
Browse files Browse the repository at this point in the history
Release merge
  • Loading branch information
arbennett authored Jul 22, 2021
2 parents 6506cde + b520de1 commit 7fbdaa9
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 80 deletions.
23 changes: 16 additions & 7 deletions bmorph/core/bmorph.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def marginalize_cdf(y_raw, z_raw, vals):

def cqm(raw_x: pd.Series, train_x: pd.Series, ref_x: pd.Series,
raw_y: pd.Series, train_y: pd.Series, ref_y: pd.Series=None,
method='hist', xbins=200, ybins=10, bw=3, rtol=1e-7, atol=0, nsmooth=5) -> pd.Series:
method='hist', xbins=200, ybins=10, bw=3, rtol=1e-7, atol=0,
nsmooth=5, train_cdf_min=1e-4) -> pd.Series:
"""Conditional Quantile Mapping
Multidimensional conditional equidistant CDF matching function:
Expand Down Expand Up @@ -89,15 +90,15 @@ def cqm(raw_x: pd.Series, train_x: pd.Series, ref_x: pd.Series,
# Lookup the associated flow values in the CDFs for the reference and training periods
mapped_ref = x_ref[np.argmin(np.abs(u_t - ref_cdf[nx_raw, :]), axis=1)]
mapped_train = x_train[np.argmin(np.abs(u_t - train_cdf[nx_raw, :]), axis=1)]
mapped_train[mapped_train < 1e-6] = 1e-6
mapped_train[mapped_train < train_cdf_min] = train_cdf_min
multipliers = pd.Series(mapped_ref / mapped_train, index=raw_x.index, name='multiplier')
if method == 'hist':
# Do some smoothing just to reduce effects of histogram bin edges
multipliers = multipliers.rolling(nsmooth, win_type='triang', min_periods=1).mean()
return multipliers


def edcdfm(raw_x, raw_cdf, train_cdf, ref_cdf):
def edcdfm(raw_x, raw_cdf, train_cdf, ref_cdf, train_cdf_min=1e-4):
'''Calculate multipliers using an adapted version of the EDCDFm technique
This routine implements part of the PresRat bias correction method from
Expand Down Expand Up @@ -156,7 +157,7 @@ def edcdfm(raw_x, raw_cdf, train_cdf, ref_cdf):

# Given u_t and train_cdf determine train_x
train_x = np.percentile(train_cdf, u_t)
train_x[train_x < 1e-6] = 1e-6
train_x[train_x < train_cdf_min] = train_cdf_min

# Given u_t and ref_cdf determine ref_x
ref_x = np.percentile(ref_cdf, u_t)
Expand All @@ -169,7 +170,8 @@ def edcdfm(raw_x, raw_cdf, train_cdf, ref_cdf):
def bmorph(raw_ts, train_ts, ref_ts,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
raw_y=None, ref_y=None, train_y=None,
nsmooth=12, bw=3, xbins=200, ybins=10, rtol=1e-7, atol=0, method='hist'):
nsmooth=12, bw=3, xbins=200, ybins=10, rtol=1e-7, atol=0, method='hist',
smooth_multipliers=True, train_cdf_min=1e-4):
'''Morph `raw_ts` based on differences between `ref_ts` and `train_ts`
bmorph is an adaptation of the PresRat bias correction procedure from
Expand Down Expand Up @@ -220,6 +222,10 @@ def bmorph(raw_ts, train_ts, ref_ts,
Bins for the flow time series
ybins : int
Bins for the second time series
train_cdf_min : float (optional)
Minimum percentile allowed for train cdf. Defaults as 1e-4 to help handle
data spikes in corrections caused by multipliers being too large from
the ratio between reference and training flows being large.
Returns
-------
Expand Down Expand Up @@ -252,7 +258,7 @@ def bmorph(raw_ts, train_ts, ref_ts,
# Calculate the bmorph multipliers based on the smoothed time series and
# PDFs
bmorph_multipliers = edcdfm(raw_smoothed_ts[raw_apply_window],
raw_cdf, train_cdf, ref_cdf)
raw_cdf, train_cdf, ref_cdf, train_cdf_min)

# Apply the bmorph multipliers to the raw time series
bmorph_ts = bmorph_multipliers * raw_ts[raw_apply_window]
Expand All @@ -276,7 +282,10 @@ def bmorph(raw_ts, train_ts, ref_ts,
raw_smoothed_y[raw_cdf_window],
train_smoothed_y, ref_smoothed_y,
bw=bw, xbins=xbins, ybins=ybins,
rtol=rtol, atol=atol, method=method)
rtol=rtol, atol=atol, method=method,
train_cdf_min=train_cdf_min)
if smooth_multipliers:
bmorph_multipliers = bmorph_multipliers.rolling(window=nsmooth, min_periods=1, center=True).mean()
bmorph_ts = bmorph_multipliers[raw_apply_window] * raw_ts[raw_apply_window]

return bmorph_ts, bmorph_multipliers
Expand Down
64 changes: 45 additions & 19 deletions bmorph/core/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from functools import partial
from tqdm.autonotebook import tqdm
from bmorph.util import mizuroute_utils as mizutil
import os

def apply_bmorph(raw_ts, train_ts, ref_ts,
apply_window, raw_train_window, ref_train_window,
Expand All @@ -13,7 +14,7 @@ def apply_bmorph(raw_ts, train_ts, ref_ts,
interval=pd.DateOffset(years=1),
overlap=60, n_smooth_long=None, n_smooth_short=5,
bw=3, xbins=200, ybins=10,
rtol=1e-6, atol=1e-8, method='hist', **kwargs):
rtol=1e-6, atol=1e-8, method='hist', train_cdf_min=1e-4, **kwargs):
"""Bias correction is performed by bmorph on user-defined intervals.
Parameters
Expand Down Expand Up @@ -122,7 +123,7 @@ def apply_bmorph(raw_ts, train_ts, ref_ts,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
raw_y, ref_y, train_y,
n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol,
method=method)
method=method, train_cdf_min=train_cdf_min)
bmorph_ts = bmorph_ts.append(bc_total)
bmorph_multipliers = bmorph_multipliers.append(bc_mult)

Expand Down Expand Up @@ -150,7 +151,8 @@ def apply_blendmorph(raw_upstream_ts, raw_downstream_ts,
train_upstream_y=None, train_downstream_y=None,
ref_upstream_y=None, ref_downstream_y=None,
n_smooth_long=None, n_smooth_short=5,
bw=3, xbins=200, ybins=10, rtol=1e-6, atol=1e-8, method='hist', **kwargs):
bw=3, xbins=200, ybins=10, rtol=1e-6, atol=1e-8,
method='hist', train_cdf_min=1e-4, **kwargs):
"""Bias correction performed by blending bmorphed flows on user defined intervals.
Blendmorph is used to perform spatially consistent bias correction, this function
Expand Down Expand Up @@ -250,8 +252,8 @@ def apply_blendmorph(raw_upstream_ts, raw_downstream_ts,
bc_multipliers = pd.Series([])
bc_totals = pd.Series([])

raw_train_window = slice(*raw_train_window)
apply_window = slice(*apply_window)
raw_train_window = slice(*raw_train_window)
ref_train_window = slice(*ref_train_window)
raw_ts_window = slice(pd.to_datetime(raw_upstream_ts.index.values[0]),
pd.to_datetime(raw_upstream_ts.index.values[-1]))
Expand Down Expand Up @@ -295,25 +297,25 @@ def apply_blendmorph(raw_upstream_ts, raw_downstream_ts,
if run_mdcd:
bc_up_total, bc_up_mult = bmorph.bmorph(
raw_upstream_ts, train_upstream_ts, ref_upstream_ts,
raw_apply_window, raw_cdf_window, raw_train_window, raw_cdf_window,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
raw_upstream_y, ref_upstream_y, train_upstream_y,
n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol,
method=method)
method=method, train_cdf_min=train_cdf_min)
bc_down_total, bc_down_mult = bmorph.bmorph(
raw_downstream_ts, train_downstream_ts, ref_downstream_ts,
raw_apply_window, raw_cdf_window, raw_train_window, raw_cdf_window,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
raw_downstream_y, ref_downstream_y, train_downstream_y,
n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol,
method=method)
method=method, train_cdf_min=train_cdf_min)
else:
bc_up_total, bc_up_mult = bmorph.bmorph(
raw_upstream_ts, train_upstream_ts, ref_upstream_ts,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
n_smooth_short)
n_smooth_short, train_cdf_min=train_cdf_min)
bc_down_total, bc_down_mult = bmorph.bmorph(
raw_downstream_ts, train_downstream_ts, ref_downstream_ts,
raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window,
n_smooth_short)
n_smooth_short, train_cdf_min=train_cdf_min)

bc_multiplier = (blend_factor * bc_up_mult) + ((1 - blend_factor) * bc_down_mult)
bc_total = (blend_factor * bc_up_total) + ((1 - blend_factor) * bc_down_total)
Expand All @@ -339,7 +341,7 @@ def apply_blendmorph(raw_upstream_ts, raw_downstream_ts,
return bc_totals[apply_window], bc_multipliers[apply_window]


def _scbc_c_seg(ds, raw_train_window, apply_window, ref_train_window,
def _scbc_c_seg(ds, apply_window, raw_train_window, ref_train_window,
interval, overlap, condition_var, **kwargs):
up_raw_ts = ds['IRFroutedRunoff'].to_series()
up_train_ts = ds['up_raw_flow'].to_series()
Expand All @@ -358,17 +360,17 @@ def _scbc_c_seg(ds, raw_train_window, apply_window, ref_train_window,
up_raw_ts, dn_raw_ts,
up_train_ts, dn_train_ts,
up_ref_ts, dn_ref_ts,
raw_train_window, apply_window, ref_train_window,
apply_window, raw_train_window, ref_train_window,
interval, overlap, blend_factor,
raw_upstream_y=up_cond, raw_downstream_y=dn_cond,
train_upstream_y=up_cond, train_downstream_y=dn_cond,
ref_upstream_y=up_cond, ref_downstream_y=dn_cond)
ref_upstream_y=up_cond, ref_downstream_y=dn_cond, **kwargs)

scbc_c_locals = scbc_c_mults * local_flow.sel(time=scbc_c_mults.index)
return scbc_c_flows, scbc_c_mults, scbc_c_locals


def _scbc_u_seg(ds, raw_train_window, apply_window, ref_train_window,
def _scbc_u_seg(ds, apply_window, raw_train_window, ref_train_window,
interval, overlap, condition_var=None, **kwargs):
up_raw_ts = ds['IRFroutedRunoff'].to_series()
up_train_ts = ds['up_raw_flow'].to_series()
Expand All @@ -385,14 +387,14 @@ def _scbc_u_seg(ds, raw_train_window, apply_window, ref_train_window,
up_raw_ts, dn_raw_ts,
up_train_ts, dn_train_ts,
up_ref_ts, dn_ref_ts,
raw_train_window, apply_window, ref_train_window,
interval, overlap, blend_factor)
apply_window, raw_train_window, ref_train_window,
interval, overlap, blend_factor, **kwargs)

scbc_u_locals = scbc_u_mults * local_flow.sel(time=scbc_u_mults.index)
return scbc_u_flows, scbc_u_mults, scbc_u_locals


def apply_scbc(ds, mizuroute_exe, bmorph_config, client=None):
def apply_scbc(ds, mizuroute_exe, bmorph_config, client=None, save_mults=False):
"""
Applies Spatially Consistent Bias Correction (SCBC) by
bias correcting local flows and re-routing them through
Expand All @@ -413,13 +415,17 @@ def apply_scbc(ds, mizuroute_exe, bmorph_config, client=None):
for descriptions of the options and their choices.
client: dask.Client (optional)
A `client` object to manage parallel computation.
save_mults: boolean (optional)
Whether to save multipliers from bmorph for diagnosis. If True,
multipliers are saved in the same directory as local flows. Defaults
as False to not save multipliers.
Returns
-------
region_totals: xr.Dataset
The rerouted, total, bias corrected flows for the region
"""
def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow'):
def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow', mult_path=None):
flows = [r[0] for r in results]
mults = [r[1] for r in results]
local = [r[2] for r in results]
Expand All @@ -428,8 +434,23 @@ def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow'):
local_ds['time'] = flows[0].index
local_ds.name = out_varname
local_ds = local_ds.where(local_ds >= 1e-4, other=1e-4)
try:
os.remove(file_path)
except OSError:
pass
local_ds.transpose().to_netcdf(file_path)

if mult_path:
try:
os.remove(mult_path)
except OSError:
pass
mult_ds = xr.DataArray(np.vstack(mults), dims=('seg','time'))
mult_ds['seg'] = segs
mult_ds['time'] = flows[0].index
mult_ds.name = 'mults'
mult_ds.transpose().to_netcdf(mult_path)

if 'condition_var' in bmorph_config.keys() and bmorph_config['condition_var']:
scbc_type = 'conditional'
scbc_fun = partial(_scbc_c_seg, **bmorph_config)
Expand All @@ -447,7 +468,12 @@ def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow'):

out_file = (f'{bmorph_config["data_path"]}/input/'
f'{bmorph_config["output_prefix"]}_local_{scbc_type}_scbc.nc')
unpack_and_write_netcdf(results, ds['seg'], out_file)
if save_mults:
mult_file = (f'{bmorph_config["data_path"]}/input/'
f'{bmorph_config["output_prefix"]}_local_mult_{scbc_type}_scbc.nc')
else:
mult_file = None
unpack_and_write_netcdf(results, ds['seg'], out_file, mult_path=mult_file)
config_path, mizuroute_config = mizutil.write_mizuroute_config(
bmorph_config["output_prefix"],
scbc_type, bmorph_config['apply_window'],
Expand Down
40 changes: 25 additions & 15 deletions bmorph/evaluation/plotting.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ def organize_nxgraph(topo: nx.Graph):
return pos

def color_code_nxgraph(graph: nx.graph, measure: pd.Series,
cmap = mpl.cm.get_cmap('coolwarm_r'))-> dict:
cmap=mpl.cm.get_cmap('coolwarm_r'),
vmin=None, vmax=None)-> dict:
"""Creates a dictionary mapping of nodes to color values.
Parameters
Expand All @@ -875,6 +876,10 @@ def color_code_nxgraph(graph: nx.graph, measure: pd.Series,
cmap : matplotlib.colors.LinearSegmentedColormap, optional
Colormap to be used for coloring the SimpleRiverNewtork
plot. This defaults as 'coolwarm_r'.
vmin: float, optional
Minimum value for coloring
vmax: float, optional
Maximum value for coloring
Returns
-------
Expand All @@ -891,22 +896,27 @@ def color_code_nxgraph(graph: nx.graph, measure: pd.Series,
maximum = measure.max()

color_vals = (measure.values) / (maximum)
color_bar = plt.cm.ScalarMappable(cmap=cmap, norm = plt.Normalize(vmin = minimum, vmax = maximum))
color_bar = plt.cm.ScalarMappable(cmap=cmap, norm=plt.Normalize(vmin = minimum, vmax = maximum))

color_dict = {f'{seg}' : mpl.colors.to_hex(cmap(i)) for i, seg in zip(color_vals, segs)}
return color_dict, color_bar
else:
#determine colorbar range
extreme = abs(measure.max())
if np.abs(measure.min()) > extreme:
extreme = np.abs(measure.min())

#sets up color values
segs = measure.index
color_vals = (measure.values + extreme) / (2 * extreme)
color_bar = plt.cm.ScalarMappable(cmap=cmap, norm = plt.Normalize(vmin = -extreme, vmax = extreme))

color_dict = {f'{seg}': mpl.colors.to_hex(cmap(i)) for i, seg in zip(color_vals, segs)}
#determine colorbar range
if vmin is None and vmax is None:
extreme = np.max(np.abs([np.min(measure), np.max(measure)]))
vmin = -extreme
vmax = extreme
elif vmin is None:
vmin = measure.min()
elif vmax is None:
vmax = measure.max()

norm = mpl.colors.Normalize(vmin=vmin, vmax=vmax)
color_bar = plt.cm.ScalarMappable(cmap=cmap, norm=norm)

color_dict = {f'{seg}': mpl.colors.to_hex(cmap(norm(i)))
for i, seg in zip(measure.values, segs)}
return color_dict, color_bar

def draw_dataset(topo: xr.Dataset, color_measure: pd.Series, cmap = mpl.cm.get_cmap('coolwarm_r')):
Expand Down Expand Up @@ -1053,12 +1063,12 @@ def plot_reduced_flows(flow_dataset: xr.Dataset, plot_sites: list,
elif interval == 'week':
interval_name = "Week of Year"
raw_flow = flow_dataset[raw_var].groupby(
flow_dataset['time'].dt.week).reduce(reduce_func)
flow_dataset['time'].dt.isocalendar().week).reduce(reduce_func)
reference_flow = flow_dataset[ref_var].groupby(
flow_dataset['time'].dt.week).reduce(reduce_func)
flow_dataset['time'].dt.isocalendar().week).reduce(reduce_func)
bc_flows = list()
for bc_var in bc_vars:
bc_flows.append(flow_dataset[bc_var].groupby(flow_dataset['time'].dt.week).reduce(reduce_func))
bc_flows.append(flow_dataset[bc_var].groupby(flow_dataset['time'].dt.isocalendar().week).reduce(reduce_func))
time = raw_flow['week'].values
elif interval == 'month':
interval_name = "Month"
Expand Down
Loading

0 comments on commit 7fbdaa9

Please sign in to comment.