From 0a5046c99e6b6b3583227c25e4fc777975d625af Mon Sep 17 00:00:00 2001 From: karnesh Date: Fri, 26 Jul 2024 14:24:35 -0400 Subject: [PATCH] added missing parameter in nwm_output_generator function and fixed formatting --- .../troute/config/output_parameters.py | 4 +- src/troute-nwm/src/nwm_routing/output.py | 94 +++++++++---------- 2 files changed, 49 insertions(+), 49 deletions(-) diff --git a/src/troute-config/troute/config/output_parameters.py b/src/troute-config/troute/config/output_parameters.py index 0b5919dfb..b367582d2 100644 --- a/src/troute-config/troute/config/output_parameters.py +++ b/src/troute-config/troute/config/output_parameters.py @@ -49,7 +49,7 @@ class CsvOutput(BaseModel): csv_output_segments: Optional[List[str]] = None -class ParquetOutput(BaseModel, extra='forbid'): +class ParquetOutput(BaseModel): # NOTE: required if writing results to parquet parquet_output_folder: Optional[DirectoryPath] = None parquet_output_segments: Optional[List[str]] = None @@ -57,7 +57,7 @@ class ParquetOutput(BaseModel, extra='forbid'): prefix_ids: Optional[str] = None -class ChrtoutOutput(BaseModel, extra='forbid'): +class ChrtoutOutput(BaseModel): # NOTE: mandatory if writing results to CHRTOUT. wrf_hydro_channel_output_source_folder: Optional[DirectoryPath] = None diff --git a/src/troute-nwm/src/nwm_routing/output.py b/src/troute-nwm/src/nwm_routing/output.py index 1ab22d2b0..08bef8c78 100644 --- a/src/troute-nwm/src/nwm_routing/output.py +++ b/src/troute-nwm/src/nwm_routing/output.py @@ -7,8 +7,8 @@ from build_tests import parity_check import logging -LOG = logging.getLogger('') +LOG = logging.getLogger('') def _reindex_lake_to_link_id(target_df, crosswalk): ''' @@ -29,20 +29,20 @@ def _reindex_lake_to_link_id(target_df, crosswalk): # evaluate intersection of lake ids and target_df index values # i.e. what are the index positions of lake ids that need replacing? - lakeids = np.fromiter(crosswalk.keys(), dtype=int) + lakeids = np.fromiter(crosswalk.keys(), dtype = int) idxs = target_df.index.to_numpy() lake_index_intersect = np.intersect1d( - idxs, - lakeids, + idxs, + lakeids, return_indices = True ) # replace lake ids with link IDs in the target_df index array - linkids = np.fromiter(crosswalk.values(), dtype=int) + linkids = np.fromiter(crosswalk.values(), dtype = int) idxs[lake_index_intersect[1]] = linkids[lake_index_intersect[2]] # (re) set the target_df index - target_df.set_index(idxs, inplace=True) + target_df.set_index(idxs, inplace = True) return target_df @@ -102,6 +102,7 @@ def nwm_output_generator( cpu_pool, waterbodies_df, waterbody_types_df, + duplicate_ids_df, data_assimilation_parameters=False, lastobs_df=None, link_gage_df=None, @@ -145,7 +146,7 @@ def nwm_output_generator( ) ################### Output Handling - + start_time = time.time() LOG.info(f"Handling output ...") @@ -187,6 +188,7 @@ def nwm_output_generator( ) if wbdyo and not waterbodies_df.empty: + # create waterbody dataframe for output to netcdf file i_columns = pd.MultiIndex.from_product( [range(nts), ["i"]] @@ -211,20 +213,20 @@ def nwm_output_generator( timestep_index = np.where(((np.array(list(set(list(timestep)))) + 1) * dt) % (dt * qts_subdivisions) == 0) ts_set = set(timestep_index[0].tolist()) flow_df_col_index = [i for i, e in enumerate(timestep) if e in ts_set] - flow_df = flow_df.iloc[:, flow_df_col_index] + flow_df = flow_df.iloc[:,flow_df_col_index] timestep, variable = zip(*wbdy.columns.tolist()) timestep_index = np.where(((np.array(list(set(list(timestep)))) + 1) * dt) % (dt * qts_subdivisions) == 0) ts_set = set(timestep_index[0].tolist()) wbdy_col_index = [i for i, e in enumerate(timestep) if e in ts_set] - i_df = wbdy.iloc[:, wbdy_col_index] - q_df = flow_df.iloc[:, 0::3] - d_df = flow_df.iloc[:, 2::3] + i_df = wbdy.iloc[:,wbdy_col_index] + q_df = flow_df.iloc[:,0::3] + d_df = flow_df.iloc[:,2::3] # replace waterbody lake_ids with outlet link ids if (link_lake_crosswalk): flowveldepth = _reindex_lake_to_link_id(flowveldepth, link_lake_crosswalk) - + # todo: create a unit test by saving FVD array to disk and then checking that # it matches FVD array from parent branch or other configurations. # flowveldepth.to_pickle(output_parameters['test_output']) @@ -240,14 +242,14 @@ def nwm_output_generator( ], copy=False, ) - + # replace waterbody lake_ids with outlet link ids if link_lake_crosswalk: # (re) set the flowveldepth index - courant.set_index(fvdidxs, inplace=True) - + courant.set_index(fvdidxs, inplace = True) + LOG.debug("Constructing the FVD DataFrame took %s seconds." % (time.time() - start)) - + if stream_output: stream_output_directory = stream_output['stream_output_directory'] stream_output_timediff = stream_output['stream_output_time'] @@ -256,7 +258,6 @@ def nwm_output_generator( nudge = np.concatenate([r[8] for r in results]) usgs_positions_id = np.concatenate([r[3][0] for r in results]) - nhd_io.write_flowveldepth( Path(stream_output_directory), flowveldepth, @@ -282,10 +283,10 @@ def nwm_output_generator( preRunLog.write("Output of FVD data every "+str(fCalc)+" minutes\n") preRunLog.write("Writing "+str(nTimeBins)+" time bins for "+str(len(flowveldepth.index))+" segments per FVD output file\n") preRunLog.close() - + if test: flowveldepth.to_pickle(Path(test)) - + if wbdyo and not waterbodies_df.empty: time_index, tmp_variable = map(list,zip(*i_df.columns.tolist())) @@ -295,12 +296,11 @@ def nwm_output_generator( else: output_waterbodies_df = waterbodies_df output_waterbody_types_df = waterbody_types_df - LOG.info("- writing t-route flow results to LAKEOUT files") start = time.time() - for i in range(i_df.shape[1]): + for i in range(i_df.shape[1]): nhd_io.write_waterbody_netcdf( - wbdyo, + wbdyo, i_df.iloc[:,[i]], q_df.iloc[:,[i]], d_df.iloc[:,[i]], @@ -319,14 +319,14 @@ def nwm_output_generator( preRunLog.write("Output of waterbody files into folder: "+str(wbdyo)+"\n") preRunLog.write("-----\n") preRunLog.close() - + LOG.debug("writing LAKEOUT files took a total time of %s seconds." % (time.time() - start)) - + if rsrto: LOG.info("- writing restart files") start = time.time() - + wrf_hydro_restart_dir = rsrto.get( "wrf_hydro_channel_restart_source_directory", None ) @@ -364,12 +364,10 @@ def nwm_output_generator( preRunLog.close() else: - LOG.critical( - 'Did not find any restart files in wrf_hydro_channel_restart_source_directory. Aborting restart write sequence.') + LOG.critical('Did not find any restart files in wrf_hydro_channel_restart_source_directory. Aborting restart write sequence.') else: - LOG.critical( - 'wrf_hydro_channel_restart_source_directory not specified in configuration file. Aborting restart write sequence.') + LOG.critical('wrf_hydro_channel_restart_source_directory not specified in configuration file. Aborting restart write sequence.') LOG.debug("writing restart files took %s seconds." % (time.time() - start)) @@ -377,12 +375,13 @@ def nwm_output_generator( LOG.info("- writing t-route flow results to CHRTOUT files") start = time.time() - + chrtout_read_folder = chrto.get( "wrf_hydro_channel_output_source_folder", None ) if chrtout_read_folder: + chrtout_files = sorted( Path(chrtout_read_folder) / f for f in run["qlat_files"] ) @@ -404,8 +403,8 @@ def nwm_output_generator( LOG.debug("writing CHRTOUT files took a total time of %s seconds." % (time.time() - start)) - if csv_output_folder: - + if csv_output_folder: + LOG.info("- writing flow, velocity, and depth results to .csv") start = time.time() @@ -413,10 +412,10 @@ def nwm_output_generator( # TO DO: create more descriptive filenames if supernetwork_parameters.get("title_string", None): filename_fvd = ( - "flowveldepth_" + supernetwork_parameters["title_string"] + ".csv" + "flowveldepth_" + supernetwork_parameters["title_string"] + ".csv" ) filename_courant = ( - "courant_" + supernetwork_parameters["title_string"] + ".csv" + "courant_" + supernetwork_parameters["title_string"] + ".csv" ) else: run_time_stamp = datetime.now().isoformat() @@ -428,7 +427,7 @@ def nwm_output_generator( # no csv_output_segments are specified, then write results for all segments if not csv_output_segments: csv_output_segments = flowveldepth.index - + flowveldepth = flowveldepth.sort_index() flowveldepth.loc[csv_output_segments].to_csv(output_path.joinpath(filename_fvd)) @@ -488,7 +487,7 @@ def nwm_output_generator( LOG.info("- writing t-route flow results at gage locations to CHANOBS file") start = time.time() - + # replace waterbody lake_ids with outlet link ids if link_lake_crosswalk: link_gage_df = _reindex_lake_to_link_id(link_gage_df, link_lake_crosswalk) @@ -499,15 +498,15 @@ def nwm_output_generator( nhd_io.write_chanobs( Path(chano['chanobs_output_directory'] + chano['chanobs_filepath']), - flowveldepth, - link_gage_df, - t0, - dt, + flowveldepth, + link_gage_df, + t0, + dt, nts, # TODO allow user to pass a list of segments at which they would like to print results # rather than just printing at gages. ) - + if (not logFileName == 'NONE'): with open(logFileName, 'a') as preRunLog: preRunLog.write("\n") @@ -518,7 +517,7 @@ def nwm_output_generator( LOG.debug("writing flow data to CHANOBS took %s seconds." % (time.time() - start)) - if lastobso: + if lastobso: # Write out LastObs as netcdf when using main_v04 or troute_model with HYfeature. # This is only needed if 1) streamflow nudging is ON and 2) a lastobs output # folder is provided by the user. @@ -555,22 +554,23 @@ def nwm_output_generator( LOG.debug("writing lastobs files took %s seconds." % (time.time() - start)) - if 'flowveldepth' in locals(): - LOG.debug(flowveldepth) + # if 'flowveldepth' in locals(): + # LOG.debug(flowveldepth) LOG.debug("output complete in %s seconds." % (time.time() - start_time)) ################### Parity Check if parity_set: + LOG.info( "conducting parity check, comparing WRF Hydro results against t-route results" ) - + start_time = time.time() - + parity_check( parity_set, results, ) - + LOG.debug("parity check complete in %s seconds." % (time.time() - start_time))