diff --git a/bmorph/core/bmorph.py b/bmorph/core/bmorph.py index 96eb8fa..f4dab64 100644 --- a/bmorph/core/bmorph.py +++ b/bmorph/core/bmorph.py @@ -51,7 +51,8 @@ def marginalize_cdf(y_raw, z_raw, vals): def cqm(raw_x: pd.Series, train_x: pd.Series, ref_x: pd.Series, raw_y: pd.Series, train_y: pd.Series, ref_y: pd.Series=None, - method='hist', xbins=200, ybins=10, bw=3, rtol=1e-7, atol=0, nsmooth=5) -> pd.Series: + method='hist', xbins=200, ybins=10, bw=3, rtol=1e-7, atol=0, + nsmooth=5, train_cdf_min=1e-4) -> pd.Series: """Conditional Quantile Mapping Multidimensional conditional equidistant CDF matching function: @@ -89,7 +90,7 @@ def cqm(raw_x: pd.Series, train_x: pd.Series, ref_x: pd.Series, # Lookup the associated flow values in the CDFs for the reference and training periods mapped_ref = x_ref[np.argmin(np.abs(u_t - ref_cdf[nx_raw, :]), axis=1)] mapped_train = x_train[np.argmin(np.abs(u_t - train_cdf[nx_raw, :]), axis=1)] - mapped_train[mapped_train < 1e-6] = 1e-6 + mapped_train[mapped_train < train_cdf_min] = train_cdf_min multipliers = pd.Series(mapped_ref / mapped_train, index=raw_x.index, name='multiplier') if method == 'hist': # Do some smoothing just to reduce effects of histogram bin edges @@ -97,7 +98,7 @@ def cqm(raw_x: pd.Series, train_x: pd.Series, ref_x: pd.Series, return multipliers -def edcdfm(raw_x, raw_cdf, train_cdf, ref_cdf): +def edcdfm(raw_x, raw_cdf, train_cdf, ref_cdf, train_cdf_min=1e-4): '''Calculate multipliers using an adapted version of the EDCDFm technique This routine implements part of the PresRat bias correction method from @@ -156,7 +157,7 @@ def edcdfm(raw_x, raw_cdf, train_cdf, ref_cdf): # Given u_t and train_cdf determine train_x train_x = np.percentile(train_cdf, u_t) - train_x[train_x < 1e-6] = 1e-6 + train_x[train_x < train_cdf_min] = train_cdf_min # Given u_t and ref_cdf determine ref_x ref_x = np.percentile(ref_cdf, u_t) @@ -169,7 +170,8 @@ def edcdfm(raw_x, raw_cdf, train_cdf, ref_cdf): def bmorph(raw_ts, train_ts, ref_ts, raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window, raw_y=None, ref_y=None, train_y=None, - nsmooth=12, bw=3, xbins=200, ybins=10, rtol=1e-7, atol=0, method='hist'): + nsmooth=12, bw=3, xbins=200, ybins=10, rtol=1e-7, atol=0, method='hist', + smooth_multipliers=True, train_cdf_min=1e-4): '''Morph `raw_ts` based on differences between `ref_ts` and `train_ts` bmorph is an adaptation of the PresRat bias correction procedure from @@ -220,6 +222,10 @@ def bmorph(raw_ts, train_ts, ref_ts, Bins for the flow time series ybins : int Bins for the second time series + train_cdf_min : float (optional) + Minimum percentile allowed for train cdf. Defaults as 1e-4 to help handle + data spikes in corrections caused by multipliers being too large from + the ratio between reference and training flows being large. Returns ------- @@ -252,7 +258,7 @@ def bmorph(raw_ts, train_ts, ref_ts, # Calculate the bmorph multipliers based on the smoothed time series and # PDFs bmorph_multipliers = edcdfm(raw_smoothed_ts[raw_apply_window], - raw_cdf, train_cdf, ref_cdf) + raw_cdf, train_cdf, ref_cdf, train_cdf_min) # Apply the bmorph multipliers to the raw time series bmorph_ts = bmorph_multipliers * raw_ts[raw_apply_window] @@ -276,7 +282,10 @@ def bmorph(raw_ts, train_ts, ref_ts, raw_smoothed_y[raw_cdf_window], train_smoothed_y, ref_smoothed_y, bw=bw, xbins=xbins, ybins=ybins, - rtol=rtol, atol=atol, method=method) + rtol=rtol, atol=atol, method=method, + train_cdf_min=train_cdf_min) + if smooth_multipliers: + bmorph_multipliers = bmorph_multipliers.rolling(window=nsmooth, min_periods=1, center=True).mean() bmorph_ts = bmorph_multipliers[raw_apply_window] * raw_ts[raw_apply_window] return bmorph_ts, bmorph_multipliers diff --git a/bmorph/core/workflows.py b/bmorph/core/workflows.py index bbfe119..8aedbbc 100644 --- a/bmorph/core/workflows.py +++ b/bmorph/core/workflows.py @@ -5,6 +5,7 @@ from functools import partial from tqdm.autonotebook import tqdm from bmorph.util import mizuroute_utils as mizutil +import os def apply_bmorph(raw_ts, train_ts, ref_ts, apply_window, raw_train_window, ref_train_window, @@ -13,7 +14,7 @@ def apply_bmorph(raw_ts, train_ts, ref_ts, interval=pd.DateOffset(years=1), overlap=60, n_smooth_long=None, n_smooth_short=5, bw=3, xbins=200, ybins=10, - rtol=1e-6, atol=1e-8, method='hist', **kwargs): + rtol=1e-6, atol=1e-8, method='hist', train_cdf_min=1e-4, **kwargs): """Bias correction is performed by bmorph on user-defined intervals. Parameters @@ -122,7 +123,7 @@ def apply_bmorph(raw_ts, train_ts, ref_ts, raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window, raw_y, ref_y, train_y, n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol, - method=method) + method=method, train_cdf_min=train_cdf_min) bmorph_ts = bmorph_ts.append(bc_total) bmorph_multipliers = bmorph_multipliers.append(bc_mult) @@ -150,7 +151,8 @@ def apply_blendmorph(raw_upstream_ts, raw_downstream_ts, train_upstream_y=None, train_downstream_y=None, ref_upstream_y=None, ref_downstream_y=None, n_smooth_long=None, n_smooth_short=5, - bw=3, xbins=200, ybins=10, rtol=1e-6, atol=1e-8, method='hist', **kwargs): + bw=3, xbins=200, ybins=10, rtol=1e-6, atol=1e-8, + method='hist', train_cdf_min=1e-4, **kwargs): """Bias correction performed by blending bmorphed flows on user defined intervals. Blendmorph is used to perform spatially consistent bias correction, this function @@ -250,8 +252,8 @@ def apply_blendmorph(raw_upstream_ts, raw_downstream_ts, bc_multipliers = pd.Series([]) bc_totals = pd.Series([]) - raw_train_window = slice(*raw_train_window) apply_window = slice(*apply_window) + raw_train_window = slice(*raw_train_window) ref_train_window = slice(*ref_train_window) raw_ts_window = slice(pd.to_datetime(raw_upstream_ts.index.values[0]), pd.to_datetime(raw_upstream_ts.index.values[-1])) @@ -295,25 +297,25 @@ def apply_blendmorph(raw_upstream_ts, raw_downstream_ts, if run_mdcd: bc_up_total, bc_up_mult = bmorph.bmorph( raw_upstream_ts, train_upstream_ts, ref_upstream_ts, - raw_apply_window, raw_cdf_window, raw_train_window, raw_cdf_window, + raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window, raw_upstream_y, ref_upstream_y, train_upstream_y, n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol, - method=method) + method=method, train_cdf_min=train_cdf_min) bc_down_total, bc_down_mult = bmorph.bmorph( raw_downstream_ts, train_downstream_ts, ref_downstream_ts, - raw_apply_window, raw_cdf_window, raw_train_window, raw_cdf_window, + raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window, raw_downstream_y, ref_downstream_y, train_downstream_y, n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol, - method=method) + method=method, train_cdf_min=train_cdf_min) else: bc_up_total, bc_up_mult = bmorph.bmorph( raw_upstream_ts, train_upstream_ts, ref_upstream_ts, raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window, - n_smooth_short) + n_smooth_short, train_cdf_min=train_cdf_min) bc_down_total, bc_down_mult = bmorph.bmorph( raw_downstream_ts, train_downstream_ts, ref_downstream_ts, raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window, - n_smooth_short) + n_smooth_short, train_cdf_min=train_cdf_min) bc_multiplier = (blend_factor * bc_up_mult) + ((1 - blend_factor) * bc_down_mult) bc_total = (blend_factor * bc_up_total) + ((1 - blend_factor) * bc_down_total) @@ -339,7 +341,7 @@ def apply_blendmorph(raw_upstream_ts, raw_downstream_ts, return bc_totals[apply_window], bc_multipliers[apply_window] -def _scbc_c_seg(ds, raw_train_window, apply_window, ref_train_window, +def _scbc_c_seg(ds, apply_window, raw_train_window, ref_train_window, interval, overlap, condition_var, **kwargs): up_raw_ts = ds['IRFroutedRunoff'].to_series() up_train_ts = ds['up_raw_flow'].to_series() @@ -358,17 +360,17 @@ def _scbc_c_seg(ds, raw_train_window, apply_window, ref_train_window, up_raw_ts, dn_raw_ts, up_train_ts, dn_train_ts, up_ref_ts, dn_ref_ts, - raw_train_window, apply_window, ref_train_window, + apply_window, raw_train_window, ref_train_window, interval, overlap, blend_factor, raw_upstream_y=up_cond, raw_downstream_y=dn_cond, train_upstream_y=up_cond, train_downstream_y=dn_cond, - ref_upstream_y=up_cond, ref_downstream_y=dn_cond) + ref_upstream_y=up_cond, ref_downstream_y=dn_cond, **kwargs) scbc_c_locals = scbc_c_mults * local_flow.sel(time=scbc_c_mults.index) return scbc_c_flows, scbc_c_mults, scbc_c_locals -def _scbc_u_seg(ds, raw_train_window, apply_window, ref_train_window, +def _scbc_u_seg(ds, apply_window, raw_train_window, ref_train_window, interval, overlap, condition_var=None, **kwargs): up_raw_ts = ds['IRFroutedRunoff'].to_series() up_train_ts = ds['up_raw_flow'].to_series() @@ -385,14 +387,14 @@ def _scbc_u_seg(ds, raw_train_window, apply_window, ref_train_window, up_raw_ts, dn_raw_ts, up_train_ts, dn_train_ts, up_ref_ts, dn_ref_ts, - raw_train_window, apply_window, ref_train_window, - interval, overlap, blend_factor) + apply_window, raw_train_window, ref_train_window, + interval, overlap, blend_factor, **kwargs) scbc_u_locals = scbc_u_mults * local_flow.sel(time=scbc_u_mults.index) return scbc_u_flows, scbc_u_mults, scbc_u_locals -def apply_scbc(ds, mizuroute_exe, bmorph_config, client=None): +def apply_scbc(ds, mizuroute_exe, bmorph_config, client=None, save_mults=False): """ Applies Spatially Consistent Bias Correction (SCBC) by bias correcting local flows and re-routing them through @@ -413,13 +415,17 @@ def apply_scbc(ds, mizuroute_exe, bmorph_config, client=None): for descriptions of the options and their choices. client: dask.Client (optional) A `client` object to manage parallel computation. + save_mults: boolean (optional) + Whether to save multipliers from bmorph for diagnosis. If True, + multipliers are saved in the same directory as local flows. Defaults + as False to not save multipliers. Returns ------- region_totals: xr.Dataset The rerouted, total, bias corrected flows for the region """ - def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow'): + def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow', mult_path=None): flows = [r[0] for r in results] mults = [r[1] for r in results] local = [r[2] for r in results] @@ -428,8 +434,23 @@ def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow'): local_ds['time'] = flows[0].index local_ds.name = out_varname local_ds = local_ds.where(local_ds >= 1e-4, other=1e-4) + try: + os.remove(file_path) + except OSError: + pass local_ds.transpose().to_netcdf(file_path) + if mult_path: + try: + os.remove(mult_path) + except OSError: + pass + mult_ds = xr.DataArray(np.vstack(mults), dims=('seg','time')) + mult_ds['seg'] = segs + mult_ds['time'] = flows[0].index + mult_ds.name = 'mults' + mult_ds.transpose().to_netcdf(mult_path) + if 'condition_var' in bmorph_config.keys() and bmorph_config['condition_var']: scbc_type = 'conditional' scbc_fun = partial(_scbc_c_seg, **bmorph_config) @@ -447,7 +468,12 @@ def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow'): out_file = (f'{bmorph_config["data_path"]}/input/' f'{bmorph_config["output_prefix"]}_local_{scbc_type}_scbc.nc') - unpack_and_write_netcdf(results, ds['seg'], out_file) + if save_mults: + mult_file = (f'{bmorph_config["data_path"]}/input/' + f'{bmorph_config["output_prefix"]}_local_mult_{scbc_type}_scbc.nc') + else: + mult_file = None + unpack_and_write_netcdf(results, ds['seg'], out_file, mult_path=mult_file) config_path, mizuroute_config = mizutil.write_mizuroute_config( bmorph_config["output_prefix"], scbc_type, bmorph_config['apply_window'], diff --git a/bmorph/evaluation/plotting.py b/bmorph/evaluation/plotting.py index 344b4d6..34ed1ca 100644 --- a/bmorph/evaluation/plotting.py +++ b/bmorph/evaluation/plotting.py @@ -862,7 +862,8 @@ def organize_nxgraph(topo: nx.Graph): return pos def color_code_nxgraph(graph: nx.graph, measure: pd.Series, - cmap = mpl.cm.get_cmap('coolwarm_r'))-> dict: + cmap=mpl.cm.get_cmap('coolwarm_r'), + vmin=None, vmax=None)-> dict: """Creates a dictionary mapping of nodes to color values. Parameters @@ -875,6 +876,10 @@ def color_code_nxgraph(graph: nx.graph, measure: pd.Series, cmap : matplotlib.colors.LinearSegmentedColormap, optional Colormap to be used for coloring the SimpleRiverNewtork plot. This defaults as 'coolwarm_r'. + vmin: float, optional + Minimum value for coloring + vmax: float, optional + Maximum value for coloring Returns ------- @@ -891,22 +896,27 @@ def color_code_nxgraph(graph: nx.graph, measure: pd.Series, maximum = measure.max() color_vals = (measure.values) / (maximum) - color_bar = plt.cm.ScalarMappable(cmap=cmap, norm = plt.Normalize(vmin = minimum, vmax = maximum)) + color_bar = plt.cm.ScalarMappable(cmap=cmap, norm=plt.Normalize(vmin = minimum, vmax = maximum)) color_dict = {f'{seg}' : mpl.colors.to_hex(cmap(i)) for i, seg in zip(color_vals, segs)} return color_dict, color_bar else: - #determine colorbar range - extreme = abs(measure.max()) - if np.abs(measure.min()) > extreme: - extreme = np.abs(measure.min()) - - #sets up color values segs = measure.index - color_vals = (measure.values + extreme) / (2 * extreme) - color_bar = plt.cm.ScalarMappable(cmap=cmap, norm = plt.Normalize(vmin = -extreme, vmax = extreme)) - - color_dict = {f'{seg}': mpl.colors.to_hex(cmap(i)) for i, seg in zip(color_vals, segs)} + #determine colorbar range + if vmin is None and vmax is None: + extreme = np.max(np.abs([np.min(measure), np.max(measure)])) + vmin = -extreme + vmax = extreme + elif vmin is None: + vmin = measure.min() + elif vmax is None: + vmax = measure.max() + + norm = mpl.colors.Normalize(vmin=vmin, vmax=vmax) + color_bar = plt.cm.ScalarMappable(cmap=cmap, norm=norm) + + color_dict = {f'{seg}': mpl.colors.to_hex(cmap(norm(i))) + for i, seg in zip(measure.values, segs)} return color_dict, color_bar def draw_dataset(topo: xr.Dataset, color_measure: pd.Series, cmap = mpl.cm.get_cmap('coolwarm_r')): @@ -1053,12 +1063,12 @@ def plot_reduced_flows(flow_dataset: xr.Dataset, plot_sites: list, elif interval == 'week': interval_name = "Week of Year" raw_flow = flow_dataset[raw_var].groupby( - flow_dataset['time'].dt.week).reduce(reduce_func) + flow_dataset['time'].dt.isocalendar().week).reduce(reduce_func) reference_flow = flow_dataset[ref_var].groupby( - flow_dataset['time'].dt.week).reduce(reduce_func) + flow_dataset['time'].dt.isocalendar().week).reduce(reduce_func) bc_flows = list() for bc_var in bc_vars: - bc_flows.append(flow_dataset[bc_var].groupby(flow_dataset['time'].dt.week).reduce(reduce_func)) + bc_flows.append(flow_dataset[bc_var].groupby(flow_dataset['time'].dt.isocalendar().week).reduce(reduce_func)) time = raw_flow['week'].values elif interval == 'month': interval_name = "Month" diff --git a/bmorph/evaluation/simple_river_network.py b/bmorph/evaluation/simple_river_network.py index 6bdaf73..5f5b3e0 100644 --- a/bmorph/evaluation/simple_river_network.py +++ b/bmorph/evaluation/simple_river_network.py @@ -1007,7 +1007,7 @@ def pfaf_aggregate(self): self.network_graph = plotting.create_nxgraph(self.adj_mat) self.network_positions = plotting.organize_nxgraph(self.network_graph) - def color_network_graph(self, measure, cmap): + def color_network_graph(self, measure, cmap, vmax=None, vmin=None): """Creats a dictionary and colorbar depicting `measure`. Parameters @@ -1022,6 +1022,10 @@ def color_network_graph(self, measure, cmap): cmap : matplotlib.colors.LinearSegmentedColormap Colormap to be used for coloring the SimpleRiverNewtork plot. + vmin: float, optional + Minimum value for coloring + vmax: float, optional + Maximum value for coloring Returns ------- @@ -1033,12 +1037,14 @@ def color_network_graph(self, measure, cmap): for plotting in draw_network. """ if type(measure) != type(None): - return plotting.color_code_nxgraph(self.network_graph, measure, cmap) + return plotting.color_code_nxgraph(self.network_graph, measure, cmap, + vmax=vmax, vmin=vmin) else: color_bar = None segs = np.arange(0,len(self.seg_id_values)) color_vals = segs/len(segs) - color_dict = {f'{seg}': mpl.colors.to_hex(cmap(i)) for i, seg in zip(color_vals, segs)} + color_dict = {f'{seg}': mpl.colors.to_hex(cmap(i)) + for i, seg in zip(color_vals, segs)} return color_dict, color_bar def size_network_graph(self, measure): @@ -1052,6 +1058,7 @@ def draw_network(self, label_map=[], color_measure=None, cmap = mpl.cm.get_cmap( node_size = 200, font_size = 8, font_weight = 'bold', node_shape = 's', linewidths = 2, font_color = 'w', node_color = None, with_labels=False, with_cbar=False, with_background=True, cbar_labelsize=10, + vmin=None, vmax=None, edge_color='k', alpha=1, cbar_title = '', cbar_label_pad=40, ax = None): """Plots the river network through networkx. @@ -1107,6 +1114,10 @@ def draw_network(self, label_map=[], color_measure=None, cmap = mpl.cm.get_cmap( cbar_labelsize : float, optional Font size of the labels on the colorbar that can be attached in `with_cbar` being set to True, defaulted as 10. + vmin: float, optional + Minimum value for coloring + vmax: float, optional + Maximum value for coloring edge_color : str, optional Node outline color of each node, defaulted as 'k' for black. alpha : float @@ -1128,7 +1139,7 @@ def draw_network(self, label_map=[], color_measure=None, cmap = mpl.cm.get_cmap( elif color_measure.size != len(self.seg_id_values): raise Exception("Color_measure size does not match number of nodes, double check measure aggregation.") - network_color_dict, network_color_cbar = self.color_network_graph(color_measure,cmap) + network_color_dict, network_color_cbar = self.color_network_graph(color_measure,cmap, vmin=vmin, vmax=vmax) # we need to make sure that if the nodes have been relabeled by a previous # draw_network call, that we then restore them to their original labels @@ -1154,7 +1165,6 @@ def draw_network(self, label_map=[], color_measure=None, cmap = mpl.cm.get_cmap( # if we want to relabel the nodes in this function call, # then we will do so here - if len(label_map) > 0: new_network_color_dict = dict() for key in network_color_dict.keys(): diff --git a/bmorph/util/mizuroute_utils.py b/bmorph/util/mizuroute_utils.py index 697e0ed..b9a5fad 100644 --- a/bmorph/util/mizuroute_utils.py +++ b/bmorph/util/mizuroute_utils.py @@ -70,7 +70,7 @@ def run_mizuroute(mizuroute_exe, mizuroute_config): return p -def find_up(ds, seg): +def find_up(ds, seg, sel_method='first', sel_var='IRFroutedRunoff'): """ Finds the segment directly upstream of seg given seg is not a headwater segment, (in which case np.nan is returned). @@ -83,6 +83,13 @@ def find_up(ds, seg): each seg in 'down_seg'. seg: int River segment designation to search from. + sel_method: str + Method to use when selecting among multiple upstream + segments. + sel_var: str + Variable used when comparing segments amonth multiple + upstream segments. Can be 'forward_fill', 'r2', or 'kge'. + Returns ------- @@ -92,7 +99,23 @@ def find_up(ds, seg): """ if ds.sel(seg=seg)['is_headwaters']: return np.nan - up_idx = np.argwhere(ds['down_seg'].values == seg).flatten()[0] + up_idxs = np.argwhere(ds['down_seg'].values == seg).flatten() + if sel_method == 'first' or sel_method == 'forward_fill': + up_idx = up_idxs[0] + elif sel_method == 'r2': + idx_of_up_idx = np.argmax([ + np.corrcoef(ds[sel_var].sel(seg=seg), ds[sel_var].isel(seg=i))[0, 1]**2 + for i in up_idxs]) + up_idx = up_idxs[idx_of_up_idx] + elif sel_method == 'kge': + idx_of_up_idx = np.argmax([ + kling_gupta_efficiency(ds[sel_var].sel(seg=seg), ds[sel_var].isel(seg=i)) + for i in up_idxs]) + up_idx = up_idxs[idx_of_up_idx] + elif sel_method == 'kldiv': + raise NotImplementedError('kldiv has not been implemented, please select ', + 'forward_fill, r2, or kge') + up_seg = ds['seg'].isel(seg=up_idx).values[()] return up_seg @@ -311,13 +334,9 @@ def find_max_kge(ds, curr_seg_flow): """ max_kge = -np.inf max_kge_ref_seg = -1 - for ref_seg in ds['seg'].values: - ref_flow = ds.sel(seg=ref_seg).values - curr_ref_kge = kling_gupta_efficiency(curr_seg_flow, ref_flow) - if curr_ref_kge > max_kge: - max_kge = curr_ref_kge - max_kge_ref_seg = ref_seg - return max_kge, max_kge_ref_seg + all_kge = [kling_gupta_efficiency(curr_seg_flow, ds.sel(seg=ref_seg).values) + for ref_seg in ds['seg'].values] + return np.max(all_kge), ds['seg'].values[np.argmax(all_kge)] def trim_time(dataset_list: list): @@ -386,8 +405,8 @@ def map_segs_topology(routed: xr.Dataset, topology: xr.Dataset): def map_ref_sites(routed: xr.Dataset, gauge_reference: xr.Dataset, - gauge_sites=None, route_var = 'IRFroutedRunoff', - fill_method='kldiv'): + gauge_sites=None, route_var='IRFroutedRunoff', + fill_method='r2', min_kge=-0.41): """ Assigns segs within routed boolean 'is_gauge' "identifiers" and what each seg's upstream and downstream reference seg designations are. @@ -448,7 +467,7 @@ def map_ref_sites(routed: xr.Dataset, gauge_reference: xr.Dataset, routed['down_ref_seg'] = np.nan * routed['seg'] routed['up_ref_seg'] = np.nan * routed['seg'] routed['up_seg'] = 0 * routed['is_headwaters'] - routed['up_seg'].values = [find_up(routed, s) for s in routed['seg'].values] + routed['up_seg'].values = [find_up(routed, s, sel_method=fill_method) for s in routed['seg'].values] for s in routed['seg']: if s in list(gauge_segs): routed['is_gauge'].loc[{'seg':s}] = True @@ -569,9 +588,9 @@ def map_ref_sites(routed: xr.Dataset, gauge_reference: xr.Dataset, fill_up_isegs = np.where(np.isnan(routed['up_ref_seg'].values))[0] fill_down_isegs = np.where(np.isnan(routed['down_ref_seg'].values))[0] + routed['kge_up_gauge'] = min_kge + 0.0 * routed['is_gauge'] + routed['kge_down_gauge'] = min_kge + 0.0 * routed['is_gauge'] - routed['kge_up_gauge'] = 0 * routed['is_gauge'] - routed['kge_down_gauge'] = 0 * routed['is_gauge'] for curr_seg in routed['seg'].values: up_ref_seg = np.nan @@ -597,6 +616,8 @@ def map_ref_sites(routed: xr.Dataset, gauge_reference: xr.Dataset, # this seg has already been filled in, but kge still needs to be calculated ref_flow = routed[route_var].sel(seg=routed['down_ref_seg'].sel(seg=curr_seg)).values down_ref_kge = kling_gupta_efficiency(curr_seg_flow, ref_flow) + if down_ref_kge < min_kge: + down_ref_kge, down_ref_seg = find_max_kge(routed[route_var].sel(seg=gauge_segs), curr_seg_flow) routed['kge_down_gauge'].loc[{'seg':curr_seg}] = down_ref_kge else: raise ValueError('Invalid method provided for "fill_method"') @@ -631,7 +652,7 @@ def map_headwater_sites(routed: xr.Dataset): def calculate_cdf_blend_factor(routed: xr.Dataset, gauge_reference: xr.Dataset, - gauge_sites=None, fill_method='kldiv'): + gauge_sites=None, fill_method='kldiv', min_kge=-0.41): """ Calculates the cumulative distribution function blend factor based on distance to a seg's nearest up gauge site with respect to the total distance between @@ -701,19 +722,17 @@ def calculate_cdf_blend_factor(routed: xr.Dataset, gauge_reference: xr.Dataset, / (routed['r2_up_gauge'] + routed['r2_down_gauge'])).values elif fill_method == 'kge': - # since kge can be negative, the blend factor ratios - # will use kge squared to ensure they don't cancel out - raise Exception('kge is not currently supported, please select a different method') - routed['cdf_blend_factor'].values = ((routed['kge_up_gauge']**2) - / ((routed['kge_up_gauge']**2) - + (routed['kge_down_gauge']**2))).values + # since kge can be negative, the blend factor needs scaling + lower_bound = np.min([routed['kge_up_gauge'], routed['kge_down_gauge']]) + upper_bound = np.max([routed['kge_up_gauge'], routed['kge_down_gauge']]) + routed['cdf_blend_factor'].values = ((routed['kge_up_gauge'] - lower_bound) / (upper_bound - lower_bound)) routed['cdf_blend_factor'] = routed['cdf_blend_factor'].where(~np.isnan(routed['cdf_blend_factor']), other=0.0) return routed def calculate_blend_vars(routed: xr.Dataset, topology: xr.Dataset, reference: xr.Dataset, gauge_sites = None, route_var = 'IRFroutedRunoff', - fill_method='kldiv'): + fill_method='kldiv', min_kge=-0.41): """ Calculates a number of variables used in blendmorph and map_var_to_seg. @@ -779,10 +798,10 @@ def calculate_blend_vars(routed: xr.Dataset, topology: xr.Dataset, reference: xr routed = map_segs_topology(routed=routed, topology=topology) routed = map_headwater_sites(routed=routed) routed = map_ref_sites(routed=routed, gauge_reference=reference, - gauge_sites = gauge_sites, route_var = route_var, - fill_method = fill_method) + gauge_sites=gauge_sites, route_var=route_var, + fill_method=fill_method, min_kge=min_kge) routed = calculate_cdf_blend_factor(routed=routed, gauge_reference=reference, - gauge_sites = gauge_sites, fill_method = fill_method) + gauge_sites=gauge_sites, fill_method=fill_method, min_kge=min_kge) for seg in routed['seg']: # if one of the refernece sites has been left null or determined @@ -800,7 +819,7 @@ def calculate_blend_vars(routed: xr.Dataset, topology: xr.Dataset, reference: xr def map_var_to_segs(routed: xr.Dataset, map_var: xr.DataArray, var_label: str, - gauge_segs = None): + var_key: str, gauge_segs = None): """ Splits the variable into its up and down components to be used in blendmorph. @@ -814,6 +833,8 @@ def map_var_to_segs(routed: xr.Dataset, map_var: xr.DataArray, var_label: str, the same as routed, (must also contain the dimension 'seg') var_label: str suffix of the up and down parts of the variable + var_key: str + variable name to access the variable to be split in map_var gauge_segs: list, optional List of the gauge segs that identify the reaches that are gauge sites, pulled from routed if None. @@ -831,6 +852,8 @@ def map_var_to_segs(routed: xr.Dataset, map_var: xr.DataArray, var_label: str, map_var.load() routed[down_var] = np.nan * map_var routed[up_var] = np.nan * map_var + # and need to make certain dimensions all line up + map_var = xr.merge([map_var, routed])[var_key] for seg in routed['seg'].values: up_seg = routed['up_ref_seg'].sel(seg=seg) @@ -893,8 +916,7 @@ def map_met_hru_to_seg(met_hru, topo): def to_bmorph(topo: xr.Dataset, routed: xr.Dataset, reference: xr.Dataset, met_hru: xr.Dataset=None, route_var: str='IRFroutedRunoff', - fill_method = 'kldiv'): - + fill_method = 'r2', min_kge=None): ''' Prepare mizuroute output for bias correction via the blendmorph algorithm. This allows an optional dataset of hru meteorological data to be given for conditional @@ -906,10 +928,11 @@ def to_bmorph(topo: xr.Dataset, routed: xr.Dataset, reference: xr.Dataset, Topology dataset for running mizuRoute. We expect this to have ``seg`` and ``hru`` dimensions. routed: xr.Dataset - The initially routed dataset from mizuRoute + The initially routed dataset from mizuRoute. reference: xr.Dataset A dataset containing reference flows for bias correction. - We expect this to have ``site`` and ``time`` dimensions. + We expect this to have ``site`` and ``time`` dimensions with + flows being stored in ``reference_flow``. met_hru: xr.Dataset, optional A dataset of meteorological data to be mapped onto the stream segments to facilitate conditioning. @@ -967,17 +990,16 @@ def to_bmorph(topo: xr.Dataset, routed: xr.Dataset, reference: xr.Dataset, met_seg = map_met_hru_to_seg(met_hru, topo) # Get the longest overlapping time period between all datasets - routed, reference, met_seg = trim_time([routed, reference, met_seg]) routed = calculate_blend_vars(routed, topo, reference, route_var = route_var, fill_method = fill_method) # Put all data on segments seg_ref = xr.Dataset({'reference_flow':(('time','seg'), reference['reference_flow'].values)}, coords = {'time': reference['time'].values, 'seg': ref_segs},) - routed = map_var_to_segs(routed, routed[route_var], 'raw_flow') - routed = map_var_to_segs(routed, seg_ref['reference_flow'], 'ref_flow') + routed = map_var_to_segs(routed, routed[route_var], 'raw_flow', route_var) + routed = map_var_to_segs(routed, seg_ref['reference_flow'], 'ref_flow', 'reference_flow') for v in met_vars: - routed = map_var_to_segs(routed, met_seg[v], v) + routed = map_var_to_segs(routed, met_seg[v], v, v) # Merge it all together met_seg = xr.merge([met_seg, routed])