diff --git a/src/python_framework_v02/troute/nhd_io.py b/src/python_framework_v02/troute/nhd_io.py index 36948b0eb..c9ab522a3 100644 --- a/src/python_framework_v02/troute/nhd_io.py +++ b/src/python_framework_v02/troute/nhd_io.py @@ -8,6 +8,9 @@ import numpy as np from toolz import compose import dask.array as da +import sys +import math +from datetime import * def read_netcdf(geo_file_path): @@ -186,7 +189,6 @@ def get_ql_from_wrf_hydro_mf(qlat_files, index_col="feature_id", value_col="q_la 2018-01-01 13:00:00 4186117 41.233807 -75.413895 0.006496 ``` """ - filter_list = None with xr.open_mfdataset( qlat_files, @@ -339,6 +341,92 @@ def preprocess_time_station_index(xd): ) +def build_last_obs_df(lastobsfile, routelink, wrf_last_obs_flag): + # open routelink_file and extract discharges + + ds1 = xr.open_dataset(routelink) + df = ds1.to_dataframe() + df2 = df.loc[df["gages"] != b" "] + df2["gages"] = df2["gages"].astype("int") + df2 = df2[["gages", "to"]] + df2 = df2.reset_index() + df2 = df2.set_index("gages") + + with xr.open_dataset(lastobsfile) as ds: + df_model_discharges = ds["model_discharge"].to_dataframe() + df_discharges = ds["discharge"].to_dataframe() + last_ts = df_model_discharges.index.get_level_values("timeInd")[-1] + model_discharge_last_ts = df_model_discharges[ + df_model_discharges.index.get_level_values("timeInd") == last_ts + ] + discharge_last_ts = df_discharges[ + df_discharges.index.get_level_values("timeInd") == last_ts + ] + df1 = ds["stationId"].to_dataframe() + df1 = df1.astype(int) + model_discharge_last_ts = model_discharge_last_ts.join(df1) + model_discharge_last_ts = model_discharge_last_ts.join(discharge_last_ts) + model_discharge_last_ts = model_discharge_last_ts.loc[ + model_discharge_last_ts["model_discharge"] != -9999.0 + ] + model_discharge_last_ts = model_discharge_last_ts.reset_index().set_index( + "stationId" + ) + model_discharge_last_ts = model_discharge_last_ts.drop( + ["stationIdInd", "timeInd"], axis=1 + ) + model_discharge_last_ts["discharge"] = model_discharge_last_ts[ + "discharge" + ].to_frame() + # If predict from last_obs file use last obs file results + # if last_obs_file == "error-based": + # elif last_obs_file == "obs-based": # the wrf-hydro default + if wrf_last_obs_flag: + model_discharge_last_ts["last_nudge"] = ( + model_discharge_last_ts["discharge"] + - model_discharge_last_ts["model_discharge"] + ) + final_df = df2.join(model_discharge_last_ts["discharge"]) + final_df = final_df.reset_index() + final_df = final_df.set_index("to") + final_df = final_df.drop(["feature_id", "gages"], axis=1) + final_df = final_df.dropna() + + # Else predict from the model outputs from t-route if index doesn't match interrupt computation as the results won't be valid + # else: + # fvd_df = fvd_df + # if len(model_discharge_last_ts.index) == len(fvd_df.index): + # model_discharge_last_ts["last_nudge"] = ( + # model_discharge_last_ts["discharge"] - fvd_df[fvd_df.columns[0]] + # ) + # else: + # print("THE NUDGING FILE IDS DO NOT MATCH THE FLOWVELDEPTH IDS") + # sys.exit() + # # Predictions created with continuously decreasing deltas until near 0 difference + # a = 120 + # prediction_df = pd.DataFrame(index=model_discharge_last_ts.index) + + # for time in range(0, 720, 5): + # weight = math.exp(time / -a) + # delta = pd.DataFrame( + # model_discharge_last_ts["last_nudge"] / weight) + + # if time == 0: + # prediction_df[str(time)] = model_discharge_last_ts["last_nudge"] + # weight_diff = prediction_df[str(time)] - prediction_df[str(time)] + # else: + # if weight > 0.1: + # prediction_df[str(time)] = ( + # delta["last_nudge"] + model_discharge_last_ts["model_discharge"] + # ) + # elif weight < -0.1: + # prediction_df[str(time)] = ( + # delta["last_nudge"] + model_discharge_last_ts["model_discharge"] + # ) + # prediction_df["0"] = model_discharge_last_ts["model_discharge"] + return final_df + + def get_usgs_from_time_slices_csv(routelink_subset_file, usgs_csv): df2 = pd.read_csv(usgs_csv, index_col=0) @@ -421,32 +509,36 @@ def get_usgs_from_time_slices_folder( usgs_df = usgs_df.drop(["gages", "ascendingIndex", "to"], axis=1) columns_list = usgs_df.columns - for i in range(0, (len(columns_list) * 3) - 12, 12): - original_string = usgs_df.columns[i] - original_string_shortened = original_string[:-5] - temp_name1 = original_string_shortened + str("05:00") - temp_name2 = original_string_shortened + str("10:00") - temp_name3 = original_string_shortened + str("20:00") - temp_name4 = original_string_shortened + str("25:00") - temp_name5 = original_string_shortened + str("35:00") - temp_name6 = original_string_shortened + str("40:00") - temp_name7 = original_string_shortened + str("50:00") - temp_name8 = original_string_shortened + str("55:00") - usgs_df.insert(i + 1, temp_name1, np.nan) - usgs_df.insert(i + 2, temp_name2, np.nan) - usgs_df.insert(i + 4, temp_name3, np.nan) - usgs_df.insert(i + 5, temp_name4, np.nan) - usgs_df.insert(i + 7, temp_name5, np.nan) - usgs_df.insert(i + 8, temp_name6, np.nan) - usgs_df.insert(i + 10, temp_name7, np.nan) - usgs_df.insert(i + 11, temp_name8, np.nan) + original_string_first = usgs_df.columns[0] + date_time_str = original_string_first[:10] + " " + original_string_first[11:] + date_time_obj_start = datetime.strptime(date_time_str, "%Y-%m-%d %H:%M:%S") + + original_string_last = usgs_df.columns[-1] + date_time_str = original_string_last[:10] + " " + original_string_last[11:] + date_time_obj_end = datetime.strptime(date_time_str, "%Y-%m-%d %H:%M:%S") + + dates = [] + # for j in pd.date_range(date_time_obj_start, date_time_obj_end + timedelta(1), freq="5min"): + for j in pd.date_range(date_time_obj_start, date_time_obj_end, freq="5min"): + dates.append(j.strftime("%Y-%m-%d_%H:%M:00")) + + """ + # dates_to_drop = ~usgs_df.columns.isin(dates) + OR + # dates_to_drop = usgs_df.columns.difference(dates) + # dates_to_add = pd.Index(dates).difference(usgs_df.columns) + """ + + usgs_df = usgs_df.reindex(columns=dates) usgs_df = usgs_df.interpolate(method="linear", axis=1) + usgs_df = usgs_df.interpolate(method="linear", axis=1, limit_direction="backward") usgs_df.drop(usgs_df[usgs_df.iloc[:, 0] == -999999.000000].index, inplace=True) return usgs_df +# TODO: Move channel restart above usgs to keep order with execution script def get_channel_restart_from_csv( channel_initial_states_file, index_col=0, diff --git a/src/python_framework_v02/troute/nhd_network.py b/src/python_framework_v02/troute/nhd_network.py index 7682bac6d..ea762c097 100644 --- a/src/python_framework_v02/troute/nhd_network.py +++ b/src/python_framework_v02/troute/nhd_network.py @@ -88,6 +88,31 @@ def reverse_network(N): return rg +def find_tw_for_node(reaches_bytw, node): + # TODO: extend this function (or write a new one) to handle a list of nodes. + # Such functionality might be useful for finding networks corresponding to a + # list of gages, for instance. + """ + reaches_bytw is a dictionary of lists of the form + + tw 1: + [ [ seg1, seg2, seg3, ... segn ], # reach 1 + [ sega, segb, segc, ... segz ], # reach 2 + . + . + . + [ ... ] ] reach n + tw 2: + etc. + """ + for tw, rs in reaches_bytw.items(): + for r in rs: + if node in r: + return tw + + return None # Node not in reach set. + + def junctions(N): c = Counter(chain.from_iterable(N.values())) return {k for k, v in c.items() if v > 1} diff --git a/src/python_framework_v02/troute/nhd_network_utilities_v02.py b/src/python_framework_v02/troute/nhd_network_utilities_v02.py index 268e30b30..58cb0254d 100644 --- a/src/python_framework_v02/troute/nhd_network_utilities_v02.py +++ b/src/python_framework_v02/troute/nhd_network_utilities_v02.py @@ -419,10 +419,31 @@ def build_connections(supernetwork_parameters, dt): param_df["dt"] = dt param_df = param_df.rename(columns=reverse_dict(cols)) + + wbodies = {} + if "waterbody" in cols: + wbodies = build_waterbodies( + param_df[["waterbody"]], supernetwork_parameters, "waterbody" + ) + param_df = param_df.drop("waterbody", axis=1) + + gages = {} + if "gages" in cols: + gages = build_gages(param_df[["gages"]]) + param_df = param_df.drop("gages", axis=1) + param_df = param_df.astype("float32") # datasub = data[['dt', 'bw', 'tw', 'twcc', 'dx', 'n', 'ncc', 'cs', 's0']] - return connections, param_df + return connections, param_df, wbodies, gages + + +def build_gages(segment_gage_df,): + gage_list = list(map(bytes.strip, segment_gage_df.gages.values)) + gage_mask = list(map(bytes.isdigit, gage_list)) + gages = segment_gage_df.loc[gage_mask, "gages"].to_dict() + + return gages def build_waterbodies( @@ -569,20 +590,42 @@ def build_data_assimilation(data_assimilation_parameters): data_assimilation_csv = data_assimilation_parameters.get( "data_assimilation_csv", None ) - data_assimilation_filter = data_assimilation_parameters.get( - "data_assimilation_filter", None + data_assimilation_folder = data_assimilation_parameters.get( + "data_assimilation_timeslices_folder", None ) + # TODO: Fix the Logic here according to the following. + + # If there are any observations for data assimilation, there + # needs to be a complete set in the first time set or else + # there must be a "LastObs". If there is a complete set in + # the first time step, the LastObs is optional. If there are + # no observations for assimilation, there can be a LastObs + # with an empty usgs dataframe. + + last_obs_file = data_assimilation_parameters.get("wrf_hydro_last_obs_file", None) + last_obs_type = data_assimilation_parameters.get("wrf_last_obs_type", "error-based") + last_obs_crosswalk_file = data_assimilation_parameters.get( + "wrf_hydro_da_channel_ID_crosswalk_file", None + ) + + last_obs_df = pd.DataFrame() + + if last_obs_file: + last_obs_df = nhd_io.build_last_obs_df( + last_obs_file, last_obs_crosswalk_file, last_obs_type, + ) + if data_assimilation_csv: usgs_df = build_data_assimilation_csv(data_assimilation_parameters) - elif data_assimilation_filter: + elif data_assimilation_folder: usgs_df = build_data_assimilation_folder(data_assimilation_parameters) - return usgs_df + return usgs_df, last_obs_df def build_data_assimilation_csv(data_assimilation_parameters): usgs_df = nhd_io.get_usgs_from_time_slices_csv( - data_assimilation_parameters["data_assimilation_parameters_file"], + data_assimilation_parameters["wrf_hydro_da_channel_ID_crosswalk_file"], data_assimilation_parameters["data_assimilation_csv"], ) @@ -597,7 +640,7 @@ def build_data_assimilation_folder(data_assimilation_parameters): ).resolve() usgs_df = nhd_io.get_usgs_from_time_slices_folder( - data_assimilation_parameters["data_assimilation_parameters_file"], + data_assimilation_parameters["wrf_hydro_da_channel_ID_crosswalk_file"], usgs_timeslices_folder, data_assimilation_parameters["data_assimilation_filter"], ) diff --git a/src/python_routing_v02/compute_nhd_routing_SingleSeg_v02.py b/src/python_routing_v02/compute_nhd_routing_SingleSeg_v02.py index 5b0fa23a0..c5efc3072 100644 --- a/src/python_routing_v02/compute_nhd_routing_SingleSeg_v02.py +++ b/src/python_routing_v02/compute_nhd_routing_SingleSeg_v02.py @@ -328,6 +328,7 @@ def compute_nhd_routing_v02( q0, qlats, usgs_df, + last_obs_df, assume_short_ts, return_courant, waterbodies_df, @@ -455,9 +456,12 @@ def compute_nhd_routing_v02( usgs_df_sub = pd.DataFrame() nudging_positions_list = [] + last_obs_sub = pd.DataFrame() + qlat_sub = qlats.loc[param_df_sub.index] q0_sub = q0.loc[param_df_sub.index] + # TODO: Wire in the proper reservoir distinction # At present, in by-subnetwork-jit/jit-clustered, these next two lines # only produce a dummy list, but... # Eventually, the wiring for reservoir simulation needs to be added. @@ -486,6 +490,7 @@ def compute_nhd_routing_v02( usgs_df_sub.values.astype("float32"), # flowveldepth_interorder, # obtain keys and values from this dataset np.array(nudging_positions_list, dtype="int32"), + last_obs_sub.values.astype("float32"), { us: fvd for us, fvd in flowveldepth_interorder.items() @@ -599,6 +604,8 @@ def compute_nhd_routing_v02( usgs_df_sub = pd.DataFrame() nudging_positions_list = [] + last_obs_sub = pd.DataFrame() + qlat_sub = qlats.loc[param_df_sub.index] q0_sub = q0.loc[param_df_sub.index] @@ -720,6 +727,8 @@ def compute_nhd_routing_v02( usgs_df_sub = pd.DataFrame() nudging_positions_list = [] + last_obs_sub = pd.DataFrame() + reaches_list_with_type = [] for reaches in reach_list: @@ -758,6 +767,7 @@ def compute_nhd_routing_v02( waterbodies_df_sub.values, usgs_df_sub.values.astype("float32"), np.array(nudging_positions_list, dtype="int32"), + last_obs_sub.values.astype("float32"), {}, assume_short_ts, return_courant, @@ -818,6 +828,15 @@ def compute_nhd_routing_v02( usgs_df_sub = pd.DataFrame() nudging_positions_list = [] + if not last_obs_df.empty: + pass + # lastobs_segs = list(last_obs_df.index.intersection(param_df_sub.index)) + # nudging_positions_list = param_df_sub.index.get_indexer(lastobs_segs) + # last_obs_sub = last_obs_df.loc[lastobs_segs] + else: + last_obs_sub = pd.DataFrame() + # nudging_positions_list = [] + # qlat_sub = qlats.loc[common_segs].sort_index() # q0_sub = q0.loc[common_segs].sort_index() qlat_sub = qlats.loc[param_df_sub.index] @@ -862,6 +881,7 @@ def compute_nhd_routing_v02( waterbodies_df_sub.values, usgs_df_sub.values.astype("float32"), np.array(nudging_positions_list, dtype="int32"), + last_obs_sub.values.astype("float32"), {}, assume_short_ts, return_courant, @@ -1038,6 +1058,7 @@ def main(): break_network_at_waterbodies = run_parameters.get( "break_network_at_waterbodies", False ) + break_network_at_gages = run_parameters.get("break_network_at_gages", False) if showtiming: main_start_time = time.time() @@ -1047,9 +1068,14 @@ def main(): if showtiming: start_time = time.time() - # STEP 1: Build basic network connections graph - connections, param_df = nnu.build_connections(supernetwork_parameters, dt,) - wbodies = nnu.build_waterbodies(param_df, supernetwork_parameters, "waterbody") + # STEP 1: Build basic network connections graph, + # read network parameters, identify waterbodies and gages, if any. + connections, param_df, wbodies, gages = nnu.build_connections( + supernetwork_parameters, dt, + ) + if not wbodies: + break_network_at_waterbodies = False + if break_network_at_waterbodies: connections = nhd_network.replace_waterbodies_connections(connections, wbodies) @@ -1085,11 +1111,14 @@ def main(): if verbose: print("organizing connections into reaches ...") + network_break_segments = set() + if break_network_at_waterbodies: + network_break_segments = network_break_segments.union(wbodies.values()) + if break_network_at_gages: + network_break_segments = network_break_segments.union(gages.keys()) + independent_networks, reaches_bytw, rconn = nnu.organize_independent_networks( - connections, - list(waterbodies_df_reduced.index.values) - if break_network_at_waterbodies - else None, + connections, network_break_segments, ) if verbose: print("reach organization complete") @@ -1178,16 +1207,17 @@ def main(): data_assimilation_csv = data_assimilation_parameters.get( "data_assimilation_csv", None ) - data_assimilation_filter = data_assimilation_parameters.get( - "data_assimilation_filter", None + data_assimilation_folder = data_assimilation_parameters.get( + "data_assimilation_timeslices_folder", None ) - if data_assimilation_csv or data_assimilation_filter: + last_obs_file = data_assimilation_parameters.get("wrf_hydro_last_obs_file", None) + if data_assimilation_csv or data_assimilation_folder or last_obs_file: if showtiming: start_time = time.time() if verbose: print("creating usgs time_slice data array ...") - usgs_df = nnu.build_data_assimilation(data_assimilation_parameters) + usgs_df, last_obs_df = nnu.build_data_assimilation(data_assimilation_parameters) if verbose: print("usgs array complete") @@ -1196,6 +1226,7 @@ def main(): else: usgs_df = pd.DataFrame() + last_obs_df = pd.DataFrame() # STEP 7 coastal_boundary_elev = coastal_parameters.get("coastal_boundary_elev_data", None) @@ -1251,6 +1282,7 @@ def main(): q0, qlats, usgs_df, + last_obs_df, run_parameters.get("assume_short_ts", False), run_parameters.get("return_courant", False), waterbodies_df_reduced, diff --git a/src/python_routing_v02/fast_reach/diffusive.pyx b/src/python_routing_v02/fast_reach/diffusive.pyx index ed866fe69..b5aa6dde4 100644 --- a/src/python_routing_v02/fast_reach/diffusive.pyx +++ b/src/python_routing_v02/fast_reach/diffusive.pyx @@ -118,6 +118,7 @@ cpdef object compute_diffusive_tst( const double[:,:] wbody_cols, const float[:,:] usgs_values, const int[:] usgs_positions_list, + const float[:,:] lastobs_values, dict upstream_results={}, bint assume_short_ts=False, bint return_courant=False, diff --git a/src/python_routing_v02/fast_reach/mc_reach.pyx b/src/python_routing_v02/fast_reach/mc_reach.pyx index ed837e26a..a888f6ac4 100644 --- a/src/python_routing_v02/fast_reach/mc_reach.pyx +++ b/src/python_routing_v02/fast_reach/mc_reach.pyx @@ -3,11 +3,13 @@ import numpy as np from itertools import chain from operator import itemgetter -from numpy cimport ndarray from array import array +from numpy cimport ndarray # TODO: Do we need to import numpy and ndarray separately? +from libc.math cimport exp cimport numpy as np cimport cython from libc.stdlib cimport malloc, free +# from libc.stdio cimport printf #Note may get slightly better performance using cython mem module (pulls from python's heap) #from cpython.mem cimport PyMem_Malloc, PyMem_Free from troute.network.musking.mc_reach cimport MC_Segment, MC_Reach, _MC_Segment, get_mc_segment @@ -168,6 +170,7 @@ cpdef object compute_network( const double[:,:] wbody_cols, const float[:,:] usgs_values, const int[:] usgs_positions_list, + const float[:,:] lastobs_values, # const float[:] wbody_idx, # object[:] wbody_cols, # const float[:, :] wbody_vals, @@ -203,15 +206,24 @@ cpdef object compute_network( # flowveldepth is 2D float array that holds results # columns: flow (qdc), velocity (velc), and depth (depthc) for each timestep # rows: indexed by data_idx - cdef float[:,::1] flowveldepth = np.zeros((data_idx.shape[0], nsteps * 3), dtype='float32') + cdef int qvd_ts_w = 3 # There are 3 values per timestep (corresponding to 3 columns per timestep) + cdef float[:,::1] flowveldepth = np.zeros((data_idx.shape[0], (nsteps + 1) * qvd_ts_w), dtype='float32') # courant is a 2D float array that holds courant results # columns: courant number (cn), kinematic celerity (ck), x parameter(X) for each timestep # rows: indexed by data_idx cdef float[:,::1] courant = np.zeros((data_idx.shape[0], nsteps * 3), dtype='float32') - cdef int gages_size = len(usgs_positions_list) + cdef int gages_size = usgs_positions_list.shape[0] cdef int gage_i, usgs_position_i + cdef int gage_maxtimestep = usgs_values.shape[1] + + flowveldepth[:,0] = initial_conditions[:,1] # Populate initial flows + flowveldepth[:,2] = initial_conditions[:,2] # Populate initial depths + for gage_i in range(gages_size): + usgs_position_i = usgs_positions_list[gage_i] + flowveldepth[usgs_position_i, 0] = usgs_values[gage_i, 0] + # TODO: handle the instance where there are no values, only gage positions # Pseudocode: LOOP ON Upstream Inflowers # to pre-fill FlowVelDepth @@ -230,7 +242,7 @@ cpdef object compute_network( tmp = upstream_results[upstream_tw_id] fill_index = tmp["position_index"] fill_index_mask[fill_index] = False - for idx, val in enumerate(tmp["results"]): + for idx, val in enumerate(tmp["results"], qvd_ts_w): flowveldepth[fill_index, idx] = val cdef: @@ -282,6 +294,7 @@ cpdef object compute_network( ireach_cache = 0 iusreach_cache = 0 + # copy reaches into an array for ireach in range(len(reaches)): reachlen = reach_sizes[ireach] @@ -307,13 +320,16 @@ cpdef object compute_network( buf = np.empty((maxreachlen, buf_cols), dtype='float32') if return_courant: - out_buf = np.empty((maxreachlen, 6), dtype='float32') + out_buf = np.empty((maxreachlen, qvd_ts_w + 3), dtype='float32') else: - out_buf = np.empty((maxreachlen, 3), dtype='float32') + out_buf = np.empty((maxreachlen, qvd_ts_w), dtype='float32') drows_tmp = np.arange(maxreachlen, dtype=np.intp) cdef Py_ssize_t[:] drows cdef float qup, quc + cdef float a, da_weight, da_decay_time + cdef int lastobs_timestep + cdef float dt = 300.0 # TODO: pull this value from the param_df dt (see line 153) cdef int timestep = 0 cdef int ts_offset @@ -329,7 +345,7 @@ cpdef object compute_network( with nogil: while timestep < nsteps: - ts_offset = timestep * 3 + ts_offset = (timestep + 1) * qvd_ts_w ireach_cache = 0 iusreach_cache = 0 @@ -358,11 +374,8 @@ cpdef object compute_network( # upstream flow in the previous timestep is equal to the sum of flows # in upstream segments, previous timestep - if timestep > 0: - qup += flowveldepth[usreach_cache[iusreach_cache + i], ts_offset - 3] - else: - # sum of qd0 (flow out of each segment) over all upstream reaches - qup += initial_conditions[usreach_cache[iusreach_cache + i],1] + qup += flowveldepth[usreach_cache[iusreach_cache + i], ts_offset - qvd_ts_w] + # Remember, we have filled the first position in flowveldepth with qd0 buf_view = buf[:reachlen, :] out_view = out_buf[:reachlen, :] @@ -386,20 +399,9 @@ cpdef object compute_network( fill_buffer_column(srows, scols[i], drows, i + 1, data_values, buf_view) # fill buffer with qdp, depthp, velp - if timestep > 0: - fill_buffer_column(srows, ts_offset - 3, drows, 10, flowveldepth, buf_view) - fill_buffer_column(srows, ts_offset - 2, drows, 11, flowveldepth, buf_view) - fill_buffer_column(srows, ts_offset - 1, drows, 12, flowveldepth, buf_view) - else: - ''' - Changed made to accomodate initial conditions: - when timestep == 0, qdp, and depthp are taken from the initial_conditions array, - using srows to properly index - ''' - for i in range(drows.shape[0]): - buf_view[drows[i], 10] = initial_conditions[srows[i],1] #qdp = qd0 - buf_view[drows[i], 11] = 0.0 # the velp argmument is never used, set to whatever - buf_view[drows[i], 12] = initial_conditions[srows[i],2] #hdp = h0 + fill_buffer_column(srows, ts_offset - qvd_ts_w, drows, 10, flowveldepth, buf_view) + fill_buffer_column(srows, ts_offset - (qvd_ts_w - 1), drows, 11, flowveldepth, buf_view) + fill_buffer_column(srows, ts_offset - (qvd_ts_w - 2), drows, 12, flowveldepth, buf_view) if assume_short_ts: quc = qup @@ -407,21 +409,37 @@ cpdef object compute_network( compute_reach_kernel(qup, quc, reachlen, buf_view, out_view, assume_short_ts, return_courant) # copy out_buf results back to flowdepthvel - for i in range(3): + for i in range(qvd_ts_w): fill_buffer_column(drows, i, srows, ts_offset + i, out_view, flowveldepth) # copy out_buf results back to courant if return_courant: - for i in range(3,6): - fill_buffer_column(drows, i, srows, ts_offset + (i-3), out_view, courant) + for i in range(qvd_ts_w,qvd_ts_w + 3): + fill_buffer_column(drows, i, srows, ts_offset + (i-qvd_ts_w), out_view, courant) # Update indexes to point to next reach ireach_cache += reachlen iusreach_cache += usreachlen - if gages_size: + + if gages_size: # TODO: This loops over all gages for all reaches. + # We should have a membership test at the reach loop level + # so that we only enter this process for reaches where the + # gage actually exists. We have the filter in place to + # filter the gage list so that only relevant gages for a + # particular network are present in the function call --- + # adding the reach-based filter would be the next level. for gage_i in range(gages_size): usgs_position_i = usgs_positions_list[gage_i] - flowveldepth[usgs_position_i, timestep * 3] = usgs_values[gage_i, timestep] + if timestep < gage_maxtimestep: # TODO: It is possible to remove this branching logic if we just loop over the timesteps during DA and post-DA, if that is a major performance optimization. On the flip side, it would probably introduce unwanted code complexity. + flowveldepth[usgs_position_i, ts_offset] = usgs_values[gage_i, timestep] + # TODO: add/update lastobs_timestep and/or decay_timestep + else: + a = 120 # TODO: pull this a value from the config file somehow + da_decay_time = (timestep - lastobs_timestep) * dt + da_weight = exp(da_decay_time/-a) # TODO: This could be pre-calculated knowing when obs finish relative to simulation time + # replacement_value = f(lastobs_value, da_weight) # TODO: we need to be able to export these values to compute the 'Nudge' + # printf("decaying from timestep: %d %d\t", timestep, gages_size) + # flowveldepth[usgs_position_i, timestep * qvd_ts_w] = replacement_value timestep += 1 @@ -429,9 +447,9 @@ cpdef object compute_network( # The upstream keys have empty results because they are not part of any reaches # so we need to delete the null values that return if return_courant: - return np.asarray(data_idx, dtype=np.intp)[fill_index_mask], np.asarray(flowveldepth, dtype='float32')[fill_index_mask], np.asarray(courant, dtype='float32')[fill_index_mask] + return np.asarray(data_idx, dtype=np.intp)[fill_index_mask], np.asarray(flowveldepth[:,qvd_ts_w:], dtype='float32')[fill_index_mask], np.asarray(courant, dtype='float32')[fill_index_mask] else: - return np.asarray(data_idx, dtype=np.intp)[fill_index_mask], np.asarray(flowveldepth, dtype='float32')[fill_index_mask] + return np.asarray(data_idx, dtype=np.intp)[fill_index_mask], np.asarray(flowveldepth[:,qvd_ts_w:], dtype='float32')[fill_index_mask] #---------------------------------------------------------------------------------------------------------------# #---------------------------------------------------------------------------------------------------------------# @@ -704,6 +722,7 @@ cpdef object compute_network_structured_obj( const double[:,:] wbody_cols, const float[:,:] usgs_values, const int[:] usgs_positions_list, + const float[:,:] lastobs_values, dict upstream_results={}, bint assume_short_ts=False, bint return_courant=False, @@ -835,7 +854,7 @@ cpdef object compute_network_structured_obj( #if isinstance(reservoir_object, lp_kernel): #TODO: dt is currently held by the segment. Need to find better place to hold dt - routing_period = 300.0 + routing_period = 300.0 # TODO: Fix this hardcoded value to pull from dt reservoir_outflow, water_elevation = r.run(upstream_flows, 0.0, routing_period) @@ -888,6 +907,20 @@ cpdef object compute_network_structured_obj( assume_short_ts)#, #timestep, #nsteps) + + # #a = 120 + # #weight = math.exp(timestep/-a) + # #lastobs = 1 + # for i, id in enumerate(segment_ids): + # flowveldepth[id, timestep, 0] = out_buf[i, 0] + # #for pos, loid in enumerate(lastobs_ids): + # # if loid == id: + # # lasterror = flowveldepth[id, timestep, 0] - lastobs_values[pos] + # # delta = weight * lasterror + # # flowveldepth[id, timestep, 0] = flowveldepth[id, timestep, 0] + delta + # flowveldepth[id, timestep, 1] = out_buf[i, 1] + # flowveldepth[id, timestep, 2] = out_buf[i, 2] + #Copy the output out for i, id in enumerate(segment_ids): flowveldepth[id, timestep, 0] = out_buf[i, 0] @@ -1056,7 +1089,7 @@ cpdef object compute_network_structured( upstream_flows = previous_upstream_flows if r.type == compute_type.RESERVOIR_LP: - run(r, upstream_flows, 0.0, 300, &lp_outflow, &lp_water_elevation) + run(r, upstream_flows, 0.0, 300, &lp_outflow, &lp_water_elevation) # TODO: Need to replace this hard coded 300 with dt flowveldepth[r.id, timestep, 0] = lp_outflow flowveldepth[r.id, timestep, 1] = 0.0 flowveldepth[r.id, timestep, 2] = lp_water_elevation diff --git a/test/input/geo/last_obs/nudgingLastObs.2018-08-02_00_00_00.nc b/test/input/geo/last_obs/nudgingLastObs.2018-08-02_00_00_00.nc new file mode 100644 index 000000000..8e34a5f79 Binary files /dev/null and b/test/input/geo/last_obs/nudgingLastObs.2018-08-02_00_00_00.nc differ diff --git a/test/input/yaml/Florence_Benchmark_da.yaml b/test/input/yaml/Florence_Benchmark_da.yaml new file mode 100644 index 000000000..72107b70a --- /dev/null +++ b/test/input/yaml/Florence_Benchmark_da.yaml @@ -0,0 +1,131 @@ +--- +#initial input parameters +run_parameters: + parallel_compute_method: by-subnetwork-jit-clustered # OPTIONS: , "by-network", "by-subnetwork-jit", "by-subnetwork-jit-clustered" + subnetwork_target_size: 100 # by-subnetwork* requires a value here to identify the target subnetwork size. + cpu_pool: 8 + #verbose: true # verbose output (leave blank for quiet output.) + #showtiming: true # set the showtiming (omit flag for no timing information.) + #debuglevel: 1 # set the debuglevel for additional console output. + # break_network_at_waterbodies: true # replace waterbodies in the route-link dataset with segments representing the reservoir and calculate to divide the computation (leave blank for no splitting.) + # WARNING: `break_network_at_waterbodies: true` will only work if compute_method is set to "V02-structured-obj" and parallel_compute_method is unset (serial execution) or set to "by-network". + break_network_at_gages: true # Ensures gages are in a reach by themselves. + # compute_method: V02-structured-obj # OPTIONS: "V02-caching", "V02-structured-obj", "V02-structured" + assume_short_ts: true # use the previous timestep value for both current and previous flow. + qts_subdivisions: 12 # number of timesteps per forcing (qlateral) timestep. + dt: 300 # default timestep length, seconds + nts: 85 # number of timesteps to simulate. If used with ql_file or ql_folder, nts must be less than the number of ql inputs x qts_subdivisions. + # nts: 288 # number of timesteps to simulate. If used with ql_file or ql_folder, nts must be less than the number of ql inputs x qts_subdivisions. + return_courant: false # WARNING: true will only work with compute_method "V02-caching", therefore not currently compatible with simulation for waterbodies. +#output file parameters +output_parameters: {} +#data column assignment inside supernetwork_parameters +supernetwork_parameters: + title_string: "Florence_FullRes" + geo_file_path: "../../test/input/florence_fullres/florenceNudgingChannelOnly/DOMAIN/Route_Link.nc" + mask_file_path: "../../test/input/florence_fullres/florenceNudgingChannelOnly/florence_fullres_mask_tw10975909_gage8777381.txt" + # mask_file_path: "../../test/input/florence_fullres/florenceNudgingChannelOnly/florence_fullres_mask_tw8777381_gage8777381.txt" + # mask_file_path: "../../test/input/florence_fullres/florenceNudgingChannelOnly/florence_fullres_mask_tw8777535_gage8777381.txt" + mask_layer_string: "" + mask_driver_string: "csv" + mask_key: 0 + columns: + key: "link" + downstream: "to" + dx: "Length" + n: "n" # TODO: rename to `manningn` + ncc: "nCC" # TODO: rename to `mannningncc` + s0: "So" # TODO: rename to `bedslope` + bw: "BtmWdth" # TODO: rename to `bottomwidth` + waterbody: "NHDWaterbodyComID" + gages: "gages" + tw: "TopWdth" # TODO: rename to `topwidth` + twcc: "TopWdthCC" # TODO: rename to `topwidthcc` + alt: "alt" + musk: "MusK" + musx: "MusX" + cs: "ChSlp" # TODO: rename to `sideslope` + waterbody_null_code: -9999 + terminal_code: 0 + driver_string: NetCDF + layer_string: 0 + +#waterbody parameters and assignments from lake parm file +waterbody_parameters: + level_pool: + #WRF-Hydro lake parm file + level_pool_waterbody_parameter_file_path: "../../test/input/florence_fullres/florenceNudgingChannelOnly/DOMAIN/LAKEPARM.nc" + level_pool_waterbody_id: lake_id + level_pool_waterbody_area: LkArea + level_pool_weir_elevation: WeirE + level_pool_waterbody_max_elevation: LkMxE + level_pool_outfall_weir_coefficient: WeirC + level_pool_outfall_weir_length: WeirL + level_pool_overall_dam_length: DamL + level_pool_orifice_elevation: OrificeE + level_pool_orifice_coefficient: OrificeC + level_pool_orifice_area: OrificeA +#WRF-Hydro output file +forcing_parameters: + qlat_input_folder: "../../test/input/florence_fullres/florenceNudgingChannelOnly/CHRTOUT/" + qlat_file_pattern_filter: "201809180[1-8]*.CHRTOUT_DOMAIN1" + # qlat_input_file: "florence_qlat_test.csv.hourly" + qlat_file_index_col: feature_id + qlat_file_value_col: q_lateral +#WRF-Hydro restart files +restart_parameters: + #WRF-Hydro channels restart file + wrf_hydro_channel_restart_file: "../../test/input/florence_fullres/florenceNudgingChannelOnly/HYDRO_RST/HYDRO_RST.2018-09-18_00:00_DOMAIN1" + # wrf_hydro_channel_restart_file: "../../test/input/florence_fullres/florenceNudgingChannelOnly/RESTART_open-loop/HYDRO_RST.2018-09-10_00:00_DOMAIN1" + #WRF-Hydro channels ID crosswalk file + # florence_testcase/florence_933020089/DOMAIN + wrf_hydro_channel_ID_crosswalk_file: "../../test/input/florence_fullres/florenceNudgingChannelOnly/DOMAIN/Route_Link.nc" + wrf_hydro_channel_ID_crosswalk_file_field_name: link + wrf_hydro_channel_restart_upstream_flow_field_name: qlink1 + wrf_hydro_channel_restart_downstream_flow_field_name: qlink2 + wrf_hydro_channel_restart_depth_flow_field_name: hlink + #WRF-Hydro waterbodies restart file + wrf_hydro_waterbody_restart_file: "../../test/input/florence_fullres/florenceNudgingChannelOnly/HYDRO_RST/HYDRO_RST.2018-09-18_00:00_DOMAIN1" + # #WRF-Hydro waterbody ID crosswalk file + wrf_hydro_waterbody_ID_crosswalk_file: "../../test/input/florence_fullres/florenceNudgingChannelOnly/DOMAIN/LAKEPARM.nc" + wrf_hydro_waterbody_ID_crosswalk_file_field_name: lake_id + # #WRF-Hydro waterbody crosswalk filter file + wrf_hydro_waterbody_crosswalk_filter_file: "../../test/input/florence_933020089/DOMAIN/Route_Link.nc" + wrf_hydro_waterbody_crosswalk_filter_file_field_name: NHDWaterbodyComID +#WRF-Hydro data assimilation files +data_assimilation_parameters: + # florence_testcase/florence_933020089/DOMAIN + data_assimilation_timeslices_folder: "../../test/input/florence_fullres/florenceNudgingChannelOnly/nudgingTimeSliceObs_calibration" + data_assimilation_filter: "2018-09-18_0[0-9]*.usgsTimeSlice.ncdf" + wrf_hydro_da_channel_ID_crosswalk_file: "../../test/input/florence_933020089/DOMAIN/Route_Link.nc" + # wrf_hydro_last_obs_file: "../../test/input/florence_fullres/florenceNudgingChannelOnly/nudgingLastObs/nudgingLastObs.2018-09-18_00:00:00.nc" + wrf_last_obs_type: "obs-based" + # 2018-12-31_23:45:00.15min.usgsTimeSlice.ncdf + # data_assimilation_csv: "../../test/input/geo/usgs_files/usgs_files.csv" + # florence_testcase/florenceNudgingChannelOnly/FORCING_AnA_channel-only/24timeslices +parity_parameters: + # florence_testcase/florenceNudgingChannelOnly/CHRTOUT + parity_check_input_folder: "../../test/input/florence_fullres/florenceNudgingChannelOnly/CHRTOUT/" + # parity_check_input_folder: "../../test/input/florence_fullres/florenceNudgingChannelOnly/FORCING_AnA_channel-only/" + parity_check_file_pattern_filter: "201809180[1-8]*.CHRTOUT_DOMAIN1" + # parity_check_file: "florence_streamflow_test.csv.hourly" + parity_check_file_index_col: feature_id + parity_check_file_value_col: streamflow + # parity_check_compare_node: 933020089 # contains only Nan in WRF simulations including waterbodies + # parity_check_compare_node: 8835386 + parity_check_compare_node: 10975909 + # parity_check_compare_node: 8778771 # Headwater + # parity_check_compare_node: 8778459 # Midwater + # parity_check_compare_node: 8778201 # Midwater + # parity_check_compare_node: 8777277 # Just above gage + # parity_check_compare_node: 8777275 # Just above gage + # parity_check_compare_node: 8777381 # At gage + # parity_check_compare_node: 8777409 # Just above 8777353 + # parity_check_compare_node: 8777353 # Just above 8777387 -- next upstreams [8777369, 8777409] + # parity_check_compare_node: 8777351 # Just above 8777387 + # parity_check_compare_node: 8777451 # Just above 8777387 -- next upstreams [8777455, 8777489] + # parity_check_compare_node: 8777387 # Just above 8777383 -- next upstreams [8777351, 8777353] + # parity_check_compare_node: 8777383 # Below gage + # parity_check_compare_node: 8777419 # Below gage + # parity_check_compare_node: 8777523 # Below gage + # parity_check_compare_node: 8777535 # Below gage