Skip to content

Commit

Permalink
added missing parameter in nwm_output_generator function and fixed fo…
Browse files Browse the repository at this point in the history
…rmatting
  • Loading branch information
karnesh authored and shorvath-noaa committed Jul 26, 2024
1 parent c959cb3 commit 0a5046c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 49 deletions.
4 changes: 2 additions & 2 deletions src/troute-config/troute/config/output_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ class CsvOutput(BaseModel):
csv_output_segments: Optional[List[str]] = None


class ParquetOutput(BaseModel, extra='forbid'):
class ParquetOutput(BaseModel):
# NOTE: required if writing results to parquet
parquet_output_folder: Optional[DirectoryPath] = None
parquet_output_segments: Optional[List[str]] = None
configuration: Optional[str] = None
prefix_ids: Optional[str] = None


class ChrtoutOutput(BaseModel, extra='forbid'):
class ChrtoutOutput(BaseModel):
# NOTE: mandatory if writing results to CHRTOUT.
wrf_hydro_channel_output_source_folder: Optional[DirectoryPath] = None

Expand Down
94 changes: 47 additions & 47 deletions src/troute-nwm/src/nwm_routing/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from build_tests import parity_check
import logging

LOG = logging.getLogger('')

LOG = logging.getLogger('')

def _reindex_lake_to_link_id(target_df, crosswalk):
'''
Expand All @@ -29,20 +29,20 @@ def _reindex_lake_to_link_id(target_df, crosswalk):

# evaluate intersection of lake ids and target_df index values
# i.e. what are the index positions of lake ids that need replacing?
lakeids = np.fromiter(crosswalk.keys(), dtype=int)
lakeids = np.fromiter(crosswalk.keys(), dtype = int)
idxs = target_df.index.to_numpy()
lake_index_intersect = np.intersect1d(
idxs,
lakeids,
idxs,
lakeids,
return_indices = True
)

# replace lake ids with link IDs in the target_df index array
linkids = np.fromiter(crosswalk.values(), dtype=int)
linkids = np.fromiter(crosswalk.values(), dtype = int)
idxs[lake_index_intersect[1]] = linkids[lake_index_intersect[2]]

# (re) set the target_df index
target_df.set_index(idxs, inplace=True)
target_df.set_index(idxs, inplace = True)

return target_df

Expand Down Expand Up @@ -102,6 +102,7 @@ def nwm_output_generator(
cpu_pool,
waterbodies_df,
waterbody_types_df,
duplicate_ids_df,
data_assimilation_parameters=False,
lastobs_df=None,
link_gage_df=None,
Expand Down Expand Up @@ -145,7 +146,7 @@ def nwm_output_generator(
)

################### Output Handling

start_time = time.time()

LOG.info(f"Handling output ...")
Expand Down Expand Up @@ -187,6 +188,7 @@ def nwm_output_generator(
)

if wbdyo and not waterbodies_df.empty:

# create waterbody dataframe for output to netcdf file
i_columns = pd.MultiIndex.from_product(
[range(nts), ["i"]]
Expand All @@ -211,20 +213,20 @@ def nwm_output_generator(
timestep_index = np.where(((np.array(list(set(list(timestep)))) + 1) * dt) % (dt * qts_subdivisions) == 0)
ts_set = set(timestep_index[0].tolist())
flow_df_col_index = [i for i, e in enumerate(timestep) if e in ts_set]
flow_df = flow_df.iloc[:, flow_df_col_index]
flow_df = flow_df.iloc[:,flow_df_col_index]

timestep, variable = zip(*wbdy.columns.tolist())
timestep_index = np.where(((np.array(list(set(list(timestep)))) + 1) * dt) % (dt * qts_subdivisions) == 0)
ts_set = set(timestep_index[0].tolist())
wbdy_col_index = [i for i, e in enumerate(timestep) if e in ts_set]
i_df = wbdy.iloc[:, wbdy_col_index]
q_df = flow_df.iloc[:, 0::3]
d_df = flow_df.iloc[:, 2::3]
i_df = wbdy.iloc[:,wbdy_col_index]
q_df = flow_df.iloc[:,0::3]
d_df = flow_df.iloc[:,2::3]

# replace waterbody lake_ids with outlet link ids
if (link_lake_crosswalk):
flowveldepth = _reindex_lake_to_link_id(flowveldepth, link_lake_crosswalk)

# todo: create a unit test by saving FVD array to disk and then checking that
# it matches FVD array from parent branch or other configurations.
# flowveldepth.to_pickle(output_parameters['test_output'])
Expand All @@ -240,14 +242,14 @@ def nwm_output_generator(
],
copy=False,
)

# replace waterbody lake_ids with outlet link ids
if link_lake_crosswalk:
# (re) set the flowveldepth index
courant.set_index(fvdidxs, inplace=True)

courant.set_index(fvdidxs, inplace = True)
LOG.debug("Constructing the FVD DataFrame took %s seconds." % (time.time() - start))

if stream_output:
stream_output_directory = stream_output['stream_output_directory']
stream_output_timediff = stream_output['stream_output_time']
Expand All @@ -256,7 +258,6 @@ def nwm_output_generator(

nudge = np.concatenate([r[8] for r in results])
usgs_positions_id = np.concatenate([r[3][0] for r in results])

nhd_io.write_flowveldepth(
Path(stream_output_directory),
flowveldepth,
Expand All @@ -282,10 +283,10 @@ def nwm_output_generator(
preRunLog.write("Output of FVD data every "+str(fCalc)+" minutes\n")
preRunLog.write("Writing "+str(nTimeBins)+" time bins for "+str(len(flowveldepth.index))+" segments per FVD output file\n")
preRunLog.close()

if test:
flowveldepth.to_pickle(Path(test))

if wbdyo and not waterbodies_df.empty:

time_index, tmp_variable = map(list,zip(*i_df.columns.tolist()))
Expand All @@ -295,12 +296,11 @@ def nwm_output_generator(
else:
output_waterbodies_df = waterbodies_df
output_waterbody_types_df = waterbody_types_df

LOG.info("- writing t-route flow results to LAKEOUT files")
start = time.time()
for i in range(i_df.shape[1]):
for i in range(i_df.shape[1]):
nhd_io.write_waterbody_netcdf(
wbdyo,
wbdyo,
i_df.iloc[:,[i]],
q_df.iloc[:,[i]],
d_df.iloc[:,[i]],
Expand All @@ -319,14 +319,14 @@ def nwm_output_generator(
preRunLog.write("Output of waterbody files into folder: "+str(wbdyo)+"\n")
preRunLog.write("-----\n")
preRunLog.close()

LOG.debug("writing LAKEOUT files took a total time of %s seconds." % (time.time() - start))

if rsrto:

LOG.info("- writing restart files")
start = time.time()

wrf_hydro_restart_dir = rsrto.get(
"wrf_hydro_channel_restart_source_directory", None
)
Expand Down Expand Up @@ -364,25 +364,24 @@ def nwm_output_generator(
preRunLog.close()

else:
LOG.critical(
'Did not find any restart files in wrf_hydro_channel_restart_source_directory. Aborting restart write sequence.')
LOG.critical('Did not find any restart files in wrf_hydro_channel_restart_source_directory. Aborting restart write sequence.')

else:
LOG.critical(
'wrf_hydro_channel_restart_source_directory not specified in configuration file. Aborting restart write sequence.')
LOG.critical('wrf_hydro_channel_restart_source_directory not specified in configuration file. Aborting restart write sequence.')

LOG.debug("writing restart files took %s seconds." % (time.time() - start))

if chrto:

LOG.info("- writing t-route flow results to CHRTOUT files")
start = time.time()

chrtout_read_folder = chrto.get(
"wrf_hydro_channel_output_source_folder", None
)

if chrtout_read_folder:

chrtout_files = sorted(
Path(chrtout_read_folder) / f for f in run["qlat_files"]
)
Expand All @@ -404,19 +403,19 @@ def nwm_output_generator(

LOG.debug("writing CHRTOUT files took a total time of %s seconds." % (time.time() - start))

if csv_output_folder:

if csv_output_folder:
LOG.info("- writing flow, velocity, and depth results to .csv")
start = time.time()

# create filenames
# TO DO: create more descriptive filenames
if supernetwork_parameters.get("title_string", None):
filename_fvd = (
"flowveldepth_" + supernetwork_parameters["title_string"] + ".csv"
"flowveldepth_" + supernetwork_parameters["title_string"] + ".csv"
)
filename_courant = (
"courant_" + supernetwork_parameters["title_string"] + ".csv"
"courant_" + supernetwork_parameters["title_string"] + ".csv"
)
else:
run_time_stamp = datetime.now().isoformat()
Expand All @@ -428,7 +427,7 @@ def nwm_output_generator(
# no csv_output_segments are specified, then write results for all segments
if not csv_output_segments:
csv_output_segments = flowveldepth.index

flowveldepth = flowveldepth.sort_index()
flowveldepth.loc[csv_output_segments].to_csv(output_path.joinpath(filename_fvd))

Expand Down Expand Up @@ -488,7 +487,7 @@ def nwm_output_generator(

LOG.info("- writing t-route flow results at gage locations to CHANOBS file")
start = time.time()

# replace waterbody lake_ids with outlet link ids
if link_lake_crosswalk:
link_gage_df = _reindex_lake_to_link_id(link_gage_df, link_lake_crosswalk)
Expand All @@ -499,15 +498,15 @@ def nwm_output_generator(

nhd_io.write_chanobs(
Path(chano['chanobs_output_directory'] + chano['chanobs_filepath']),
flowveldepth,
link_gage_df,
t0,
dt,
flowveldepth,
link_gage_df,
t0,
dt,
nts,
# TODO allow user to pass a list of segments at which they would like to print results
# rather than just printing at gages.
)

if (not logFileName == 'NONE'):
with open(logFileName, 'a') as preRunLog:
preRunLog.write("\n")
Expand All @@ -518,7 +517,7 @@ def nwm_output_generator(

LOG.debug("writing flow data to CHANOBS took %s seconds." % (time.time() - start))

if lastobso:
if lastobso:
# Write out LastObs as netcdf when using main_v04 or troute_model with HYfeature.
# This is only needed if 1) streamflow nudging is ON and 2) a lastobs output
# folder is provided by the user.
Expand Down Expand Up @@ -555,22 +554,23 @@ def nwm_output_generator(

LOG.debug("writing lastobs files took %s seconds." % (time.time() - start))

if 'flowveldepth' in locals():
LOG.debug(flowveldepth)
# if 'flowveldepth' in locals():
# LOG.debug(flowveldepth)

LOG.debug("output complete in %s seconds." % (time.time() - start_time))

################### Parity Check

if parity_set:

LOG.info(
"conducting parity check, comparing WRF Hydro results against t-route results"
)

start_time = time.time()

parity_check(
parity_set, results,
)

LOG.debug("parity check complete in %s seconds." % (time.time() - start_time))

0 comments on commit 0a5046c

Please sign in to comment.