Skip to content

Commit

Permalink
Merge branch 'master' into masking_output
Browse files Browse the repository at this point in the history
  • Loading branch information
AminTorabi-NOAA authored Jul 26, 2024
2 parents 5cb1197 + 0a5046c commit 9054838
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 34 deletions.
12 changes: 12 additions & 0 deletions src/troute-config/troute/config/output_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

streamOutput_allowedTypes = Literal['.csv', '.nc', '.pkl']


class OutputParameters(BaseModel):
chanobs_output: Optional["ChanobsOutput"] = None
# NOTE: this appears to be optional. See nwm_routing/input.py ~:477
csv_output: Optional["CsvOutput"] = None
# NOTE: this appears to be optional. See nwm_routing/input.py ~:496
parquet_output: Optional["ParquetOutput"] = None
# NOTE: this appears to be optional. See nwm_routing/input.py ~:563
chrtout_output: Optional["ChrtoutOutput"] = None
lite_restart: Optional["LiteRestart"] = None
# NOTE: this appears to be optional. See nwm_routing/input.py ~:520
Expand Down Expand Up @@ -46,6 +49,14 @@ class CsvOutput(BaseModel):
csv_output_segments: Optional[List[str]] = None


class ParquetOutput(BaseModel):
# NOTE: required if writing results to parquet
parquet_output_folder: Optional[DirectoryPath] = None
parquet_output_segments: Optional[List[str]] = None
configuration: Optional[str] = None
prefix_ids: Optional[str] = None


class ChrtoutOutput(BaseModel):
# NOTE: mandatory if writing results to CHRTOUT.
wrf_hydro_channel_output_source_folder: Optional[DirectoryPath] = None
Expand Down Expand Up @@ -79,6 +90,7 @@ class WrfHydroParityCheck(BaseModel):
class ParityCheckCompareFileSet(BaseModel):
validation_files: List[FilePath]


class StreamOutput(BaseModel):
# NOTE: required if writing StreamOutput files
stream_output_directory: Optional[DirectoryPath] = None
Expand Down
19 changes: 19 additions & 0 deletions src/troute-nwm/src/nwm_routing/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,25 @@ def check_inputs(

else:
LOG.debug('No csv output folder specified. Results will NOT be written to csv')

if output_parameters.get('parquet_output', None):

if output_parameters['parquet_output'].get('parquet_output_folder', None):

_does_path_exist(
'parquet_output_folder',
output_parameters['parquet_output']['parquet_output_folder']
)

output_segs = output_parameters['parquet_output'].get('parquet_output_segments', None)
if not output_segs:
LOG.debug('No parquet output segments specified. Results for all domain segments will be written')

else:
LOG.debug('No parquet output folder specified. Results will NOT be written in parquet format')

else:
LOG.debug('No parquet output folder specified. Results will NOT be written in parquet format')

if output_parameters.get('chrtout_output', None):

Expand Down
133 changes: 113 additions & 20 deletions src/troute-nwm/src/nwm_routing/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,52 @@ def _reindex_lake_to_link_id(target_df, crosswalk):

# (re) set the target_df index
target_df.set_index(idxs, inplace = True)

return target_df


def _parquet_output_format_converter(df, start_datetime, dt, configuration, prefix_ids):
'''
Utility function for convert flowveldepth dataframe
to a timeseries and to match parquet input format
of TEEHR
Arguments:
----------
- df (DataFrame): Data frame to be converted
- start_datetime: Date time from restart parameters
- dt: Time step
- configuration: configuration (for instance- short_range, medium_range)
Returns:
--------
- timeseries_df (DataFrame): Converted timeseries data frame
'''

df.index.name = 'location_id'
df.reset_index(inplace=True)
timeseries_df = df.melt(id_vars=['location_id'], var_name='var')
timeseries_df['var'] = timeseries_df['var'].astype('string')
timeseries_df[['timestep', 'variable']] = timeseries_df['var'].str.strip("()").str.split(",", n=1, expand=True)
timeseries_df['variable'] = timeseries_df['variable'].str.strip().str.replace("'", "")
timeseries_df['timestep'] = timeseries_df['timestep'].astype('int')
timeseries_df['value_time'] = (start_datetime + pd.to_timedelta(timeseries_df['timestep'] * dt, unit='s'))
variable_to_name_map = {"q": "streamflow", "d": "depth", "v": "velocity"}
timeseries_df["variable_name"] = timeseries_df["variable"].map(variable_to_name_map)
timeseries_df.drop(['var', 'timestep', 'variable'], axis=1, inplace=True)
timeseries_df['configuration'] = configuration
variable_to_units_map = {"streamflow": "m3/s", "velocity": "m/s", "depth": "m"}
timeseries_df['units'] = timeseries_df['variable_name'].map(variable_to_units_map)
timeseries_df['reference_time'] = start_datetime.date()
timeseries_df['location_id'] = timeseries_df['location_id'].astype('string')
timeseries_df['location_id'] = prefix_ids + '-' + timeseries_df['location_id']
timeseries_df['value'] = timeseries_df['value'].astype('double')
timeseries_df['reference_time'] = timeseries_df['reference_time'].astype('datetime64[us]')
timeseries_df['value_time'] = timeseries_df['value_time'].astype('datetime64[us]')

return timeseries_df


def nwm_output_generator(
run,
results,
Expand All @@ -68,7 +111,7 @@ def nwm_output_generator(
poi_crosswalk = None,
logFileName='NONE'
):

dt = run.get("dt")
nts = run.get("nts")
t0 = run.get("t0")
Expand Down Expand Up @@ -113,6 +156,8 @@ def nwm_output_generator(

csv_output = output_parameters.get("csv_output", None)
csv_output_folder = None
parquet_output = output_parameters.get("parquet_output", None)
parquet_output_folder = None
rsrto = output_parameters.get("hydro_rst_output", None)
chrto = output_parameters.get("chrtout_output", None)
test = output_parameters.get("test_output", None)
Expand All @@ -127,8 +172,14 @@ def nwm_output_generator(
)
csv_output_segments = csv_output.get("csv_output_segments", None)

if csv_output_folder or rsrto or chrto or chano or test or wbdyo or stream_output:

if parquet_output:
parquet_output_folder = output_parameters["parquet_output"].get(
"parquet_output_folder", None
)
parquet_output_segments = parquet_output.get("parquet_output_segments", None)

if csv_output_folder or parquet_output_folder or rsrto or chrto or chano or test or wbdyo or stream_output:

start = time.time()
qvd_columns = pd.MultiIndex.from_product(
[range(nts), ["q", "v", "d"]]
Expand Down Expand Up @@ -197,7 +248,6 @@ def nwm_output_generator(

# replace waterbody lake_ids with outlet link ids
if link_lake_crosswalk:

# (re) set the flowveldepth index
courant.set_index(fvdidxs, inplace = True)

Expand Down Expand Up @@ -257,7 +307,7 @@ def nwm_output_generator(
output_waterbody_types_df = waterbody_types_df
LOG.info("- writing t-route flow results to LAKEOUT files")
start = time.time()
for i in range(i_df.shape[1]):
for i in range(i_df.shape[1]):
nhd_io.write_waterbody_netcdf(
wbdyo,
i_df.iloc[:,[i]],
Expand All @@ -278,7 +328,7 @@ def nwm_output_generator(
preRunLog.write("Output of waterbody files into folder: "+str(wbdyo)+"\n")
preRunLog.write("-----\n")
preRunLog.close()

LOG.debug("writing LAKEOUT files took a total time of %s seconds." % (time.time() - start))

if rsrto:
Expand Down Expand Up @@ -324,23 +374,23 @@ def nwm_output_generator(

else:
LOG.critical('Did not find any restart files in wrf_hydro_channel_restart_source_directory. Aborting restart write sequence.')

else:
LOG.critical('wrf_hydro_channel_restart_source_directory not specified in configuration file. Aborting restart write sequence.')

LOG.debug("writing restart files took %s seconds." % (time.time() - start))

if chrto:

LOG.info("- writing t-route flow results to CHRTOUT files")
start = time.time()

chrtout_read_folder = chrto.get(
"wrf_hydro_channel_output_source_folder", None
)

if chrtout_read_folder:

if chrtout_read_folder:

chrtout_files = sorted(
Path(chrtout_read_folder) / f for f in run["qlat_files"]
)
Expand Down Expand Up @@ -397,7 +447,51 @@ def nwm_output_generator(
LOG.debug("writing CSV file took %s seconds." % (time.time() - start))
# usgs_df_filtered = usgs_df[usgs_df.index.isin(csv_output_segments)]
# usgs_df_filtered.to_csv(output_path.joinpath("usgs_df.csv"))


if parquet_output_folder:

LOG.info("- writing flow, velocity, and depth results to .parquet")
start = time.time()

# create filenames
# TO DO: create more descriptive filenames
if supernetwork_parameters.get("title_string", None):
filename_fvd = (
"flowveldepth_" + supernetwork_parameters["title_string"] + ".parquet"
)
filename_courant = (
"courant_" + supernetwork_parameters["title_string"] + ".parquet"
)
else:
run_time_stamp = datetime.now().isoformat()
filename_fvd = "flowveldepth_" + run_time_stamp + ".parquet"
filename_courant = "courant_" + run_time_stamp + ".parquet"

output_path = Path(parquet_output_folder).resolve()

# no parquet_output_segments are specified, then write results for all segments
if not parquet_output_segments:
parquet_output_segments = flowveldepth.index

flowveldepth = flowveldepth.sort_index()
configuration = output_parameters["parquet_output"].get("configuration")
prefix_ids = output_parameters["parquet_output"].get("prefix_ids")
timeseries_df = _parquet_output_format_converter(flowveldepth, restart_parameters.get("start_datetime"), dt,
configuration, prefix_ids)

parquet_output_segments_str = [prefix_ids + '-' + str(segment) for segment in parquet_output_segments]
timeseries_df.loc[timeseries_df['location_id'].isin(parquet_output_segments_str)].to_parquet(
output_path.joinpath(filename_fvd), allow_truncated_timestamps=True)

if return_courant:
courant = courant.sort_index()
timeseries_courant = _parquet_output_format_converter(courant, restart_parameters.get("start_datetime"), dt,
configuration, prefix_ids)
timeseries_courant.loc[timeseries_courant['location_id'].isin(parquet_output_segments_str)].to_parquet(
output_path.joinpath(filename_courant), allow_truncated_timestamps=True)

LOG.debug("writing parquet file took %s seconds." % (time.time() - start))

if chano:

LOG.info("- writing t-route flow results at gage locations to CHANOBS file")
Expand All @@ -412,7 +506,7 @@ def nwm_output_generator(
chano['chanobs_filepath'] = str(chano['chanobs_filepath'])

nhd_io.write_chanobs(
Path(chano['chanobs_output_directory'] + chano['chanobs_filepath']),
Path(chano['chanobs_output_directory'] + chano['chanobs_filepath']),
flowveldepth,
link_gage_df,
t0,
Expand All @@ -432,7 +526,7 @@ def nwm_output_generator(

LOG.debug("writing flow data to CHANOBS took %s seconds." % (time.time() - start))

if lastobso:
if lastobso:
# Write out LastObs as netcdf when using main_v04 or troute_model with HYfeature.
# This is only needed if 1) streamflow nudging is ON and 2) a lastobs output
# folder is provided by the user.
Expand Down Expand Up @@ -469,24 +563,23 @@ def nwm_output_generator(

LOG.debug("writing lastobs files took %s seconds." % (time.time() - start))


# if 'flowveldepth' in locals():
# LOG.debug(flowveldepth)
# LOG.debug(flowveldepth)

LOG.debug("output complete in %s seconds." % (time.time() - start_time))

################### Parity Check

if parity_set:

LOG.info(
"conducting parity check, comparing WRF Hydro results against t-route results"
)

start_time = time.time()

parity_check(
parity_set, results,
)

LOG.debug("parity check complete in %s seconds." % (time.time() - start_time))
LOG.debug("parity check complete in %s seconds." % (time.time() - start_time))
13 changes: 0 additions & 13 deletions src/troute-routing/troute/routing/utils.py

This file was deleted.

7 changes: 6 additions & 1 deletion test/LowerColorado_TX/test_AnA_V4_NHD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ compute_parameters:
reservoir_rfc_forecasts_lookback_hours : 48

#--------------------------------------------------------------------------------
# output_parameters:
output_parameters:
# #----------
# test_output: output/lcr_flowveldepth.pkl
# lite_restart:
Expand All @@ -124,5 +124,10 @@ compute_parameters:
# stream_output_time: 1 #[hr]
# stream_output_type: '.nc' #please select only between netcdf '.nc' or '.csv' or '.pkl'
# stream_output_internal_frequency: 30 #[min] it should be order of 5 minutes. For instance if you want to output every hour put 60
parquet_output:
#---------
parquet_output_folder: output/
configuration: short_range
prefix_ids: nex


0 comments on commit 9054838

Please sign in to comment.