Skip to content

Commit

Permalink
updating nexus on parquet file
Browse files Browse the repository at this point in the history
  • Loading branch information
AminTorabi-NOAA committed Aug 2, 2024
1 parent c2cd1b4 commit 13e020b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
12 changes: 11 additions & 1 deletion src/troute-network/troute/NHDNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,22 @@ def __init__(
self._flowpath_dict = {}
self._gl_climatology_df = pd.DataFrame()
self._canadian_gage_link_df = pd.DataFrame(columns=['gages','link']).set_index('link')

self.crosswalk_nex_flowpath_poi()
super().__init__()

# Create empty dataframe for coastal_boundary_depth_df. This way we can check if
# it exists, and only read in SCHISM data during 'assemble_forcings' if it doesn't
self._coastal_boundary_depth_df = pd.DataFrame()

def crosswalk_nex_flowpath_poi(self):
self._nexus_dict = dict()
for key, values in self._connections.items():
for value in values:
new_key = f"nex-{value}"
new_value = f"wb-{key}"
if new_key not in self.nexus_dict:
self._nexus_dict[new_key] = []
self._nexus_dict[new_key].append(new_value)

def extract_waterbody_connections(rows, target_col, waterbody_null=-9999):
"""Extract waterbody mapping from dataframe.
Expand Down
14 changes: 11 additions & 3 deletions src/troute-nwm/src/nwm_routing/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import troute.nhd_io as nhd_io
from build_tests import parity_check
import logging

from troute.nhd_io import updated_flowveldepth

LOG = logging.getLogger('')

Expand Down Expand Up @@ -47,7 +47,7 @@ def _reindex_lake_to_link_id(target_df, crosswalk):
return target_df


def _parquet_output_format_converter(df, start_datetime, dt, configuration, prefix_ids):
def _parquet_output_format_converter(df, start_datetime, dt, configuration, prefix_ids, nexus_dict):
'''
Utility function for convert flowveldepth dataframe
to a timeseries and to match parquet input format
Expand All @@ -64,6 +64,14 @@ def _parquet_output_format_converter(df, start_datetime, dt, configuration, pref
--------
- timeseries_df (DataFrame): Converted timeseries data frame
'''
nex_id = {}
if prefix_ids == 'nex':
for key, val in nexus_dict.items():
nex_key = int(key.split('-')[-1])
nex_id[nex_key] = [int(v.split('-')[-1]) for v in val]

df = updated_flowveldepth(df, nex_id, seg_id = list(), mask_list = None)
df = df.reset_index().drop('Type', axis=1).set_index('featureID')
variable_to_name_map = {"q": "streamflow", "d": "depth", "v": "velocity"}
variable_to_units_map = {"streamflow": "m3/s", "velocity": "m/s", "depth": "m"}

Expand Down Expand Up @@ -502,7 +510,7 @@ def nwm_output_generator(
configuration = output_parameters["parquet_output"].get("configuration")
prefix_ids = output_parameters["parquet_output"].get("prefix_ids")
timeseries_df = _parquet_output_format_converter(flowveldepth, restart_parameters.get("start_datetime"), dt,
configuration, prefix_ids)
configuration, prefix_ids, nexus_dict)

parquet_output_segments_str = [prefix_ids + '-' + str(segment) for segment in parquet_output_segments]
timeseries_df.loc[timeseries_df['location_id'].isin(parquet_output_segments_str)].to_parquet(
Expand Down
6 changes: 1 addition & 5 deletions test/LowerColorado_TX/test_AnA_V4_NHD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,6 @@ output_parameters:
# stream_output_time: 1 #[hr]
# stream_output_type: '.nc' #please select only between netcdf '.nc' or '.csv' or '.pkl'
# stream_output_internal_frequency: 30 #[min] it should be order of 5 minutes. For instance if you want to output every hour put 60
parquet_output:
#---------
parquet_output_folder: output/
configuration: short_range
prefix_ids: nex



0 comments on commit 13e020b

Please sign in to comment.