-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added functionality to output to parquet #761
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I like the idea of adding this as an option for possible output formats. There are some details of the implementation that need to be discussed/considered a bit I think, however.
Also, a more general note, but it configuring your local dev environment git to ignore whitespace changes may be helpful on PR's like this which touch many files. It looks like some auto formatting was applied, and the formatting changes can make review more challenging when trying to see all the places in the code which have changed in functionality. If you want to contribute formatting changes, I would suggest putting them all into a single commit on a PR, or an independent PR.
timeseries_df['units'] = timeseries_df['variable_name'].map(variable_to_units_map) | ||
timeseries_df['reference_time'] = start_datetime.date() | ||
timeseries_df['location_id'] = timeseries_df['location_id'].astype('string') | ||
timeseries_df['location_id'] = 'nex-' + timeseries_df['location_id'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is generically appropriate to label all locations in the routing data frame with a nex-
prefix. This may be applicable to routing which uses a hy_features network
, but wouldn't be applicable to one which uses a nhd network
. Is there any guarantee before this point that this function only gets applied to hy_features
network results? Also, even within a hy_features
network, the routed segments this ouptut relates to is for the waterbodies
which typically have a wb-
prefix in the hydrofabric identifiers, not nex
. The nexus
and waterbody
features are related, but destinct concepts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment below...
|
||
df.index.name = 'location_id' | ||
df.reset_index(inplace=True) | ||
timeseries_df = df.melt(id_vars=['location_id'], var_name='var') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if you use value_vars=[ 'q', 'v', 'd' ]
as a kwarg to melt, you might have an easier time extracting from un-pivoted table?
The below method seems like there should be a better way besides casting to string, manipulating the string, and recasting to numeric/datetime types.
I'm not sure exactly what the df
looks like that is trying to be manipulated at this point, but I would try to consider a different method(s) for extracting the needed data from it.
If for some reason this is the only way, then please comment this implementation to describe what the state of the df is and why this is the way it needs to be manipulated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into options and this seems to be the only way. I cannot use value_vars=[ 'q', 'v', 'd' ]
as a kwarg to melt because of the format of df
column names. Also, the column names needed to be manipulated as strings. Please look at the screenshot below.
Each column name consists of a time step and a variable name (q
, v
or d
) in string format. The time steps values are used to get the value_time.
timeseries_df = _parquet_output_format_converter(flowveldepth, restart_parameters.get("start_datetime"), dt, | ||
output_parameters["parquet_output"].get("configuration")) | ||
|
||
parquet_output_segments_str = ['nex-' + str(segment) for segment in parquet_output_segments] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another use of nex-
prefix that may not be generically appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would suggest we make it a variable with a default argument in the wrapping function and a value in the yaml.
parquet_output_segments_str = [prefix_str + str(segment) for segment in parquet_output_segments]
We would need to do something more comprehensive to update T-Route to comprehend the naming/labeling of the IDs...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have modified the PR to add user defined value in yaml file for the prefix string.
class StreamOutput(BaseModel): | ||
# NOTE: required if writing StreamOutput files | ||
stream_output_directory: Optional[DirectoryPath] = None | ||
stream_output_time: int = 1 | ||
stream_output_type: streamOutput_allowedTypes = ".nc" | ||
stream_output_type:streamOutput_allowedTypes = ".nc" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this formatting change from the commit?
lakeids = np.fromiter(crosswalk.keys(), dtype=int) | ||
lakeids = np.fromiter(crosswalk.keys(), dtype = int) | ||
idxs = target_df.index.to_numpy() | ||
lake_index_intersect = np.intersect1d( | ||
idxs, | ||
lakeids, | ||
return_indices=True | ||
return_indices = True | ||
) | ||
|
||
# replace lake ids with link IDs in the target_df index array | ||
linkids = np.fromiter(crosswalk.values(), dtype=int) | ||
linkids = np.fromiter(crosswalk.values(), dtype = int) | ||
idxs[lake_index_intersect[1]] = linkids[lake_index_intersect[2]] | ||
|
||
# (re) set the target_df index | ||
target_df.set_index(idxs, inplace=True) | ||
target_df.set_index(idxs, inplace = True) | ||
|
||
return target_df | ||
|
||
|
||
def _parquet_output_format_converter(df, start_datetime, dt, configuration): | ||
def _parquet_output_format_converter(df, start_datetime, dt, configuration, prefix_ids): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can drop most of these format only changes to keep the PR super clean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reverted the format changes.
@hellkite500 @jameshalgren does this PR still need changes in your opinions or can we review/approve? |
Thanks for asking -- please proceed with final review. GitHub still shows a conflict -- do you need us to resolve that? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the slow review. Can you rebase this on the master branch to resolve conflicts?
@@ -4,13 +4,16 @@ | |||
from typing_extensions import Literal | |||
from .types import FilePath, DirectoryPath | |||
|
|||
streamOutput_allowedTypes = Literal['.csv', '.nc', '.pkl'] | |||
streamOutput_allowedTypes = Literal['.csv', '.nc', '.pkl', '.parquet'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamOutput_allowedTypes
is specific for the stream_output
class of parameters. It looks like you've created a new class, ParquetOutput
that doesn't use this. Is this change needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing this out. I have removed .parquet
from streamOutput_allowedTypes
I have rebased the PR to resolve conflicts. |
PR to add functionality to output flow, velocity and depth to parquet file. The output parquet is a timeseries data(details in Notes). It is required for the TEEHR input. TEEHR comprises of set of tools for hydrologic model and forecast evaluation. Storing the t-route output in parquet format will lead to efficient query of the data. Also, it will help in automation by connecting TEEHR with NextGen water model. CIROH along with Lynker has developed a containerized version of NextGen National Water Model NextGen In A Box (NGIAB). TEEHR uses DuckDB to query the parquet output from NGIAB stored on cloud.
Additions
Changes
Testing
Screenshots
Notes
flowveldepth
dateframe is modified to create a timeseries containing following variables:Todos
Checklist
Testing checklist
Target Environment support