From 3b1d55bd4be417833a49b0f1c7bc1cb776457b84 Mon Sep 17 00:00:00 2001 From: AminTorabi-NOAA Date: Wed, 14 Aug 2024 12:48:43 -0400 Subject: [PATCH] reading new geopackage and renaming with regex --- .../troute/HYFeaturesNetwork.py | 113 ++++++++++++------ 1 file changed, 76 insertions(+), 37 deletions(-) diff --git a/src/troute-network/troute/HYFeaturesNetwork.py b/src/troute-network/troute/HYFeaturesNetwork.py index a4b0927ea..4466afcd8 100644 --- a/src/troute-network/troute/HYFeaturesNetwork.py +++ b/src/troute-network/troute/HYFeaturesNetwork.py @@ -13,54 +13,93 @@ from datetime import datetime from pprint import pformat import os - +import fiona import troute.nhd_io as nhd_io #FIXME from troute.nhd_network import reverse_dict, extract_connections, reverse_network, reachable from .rfc_lake_gage_crosswalk import get_rfc_lake_gage_crosswalk, get_great_lakes_climatology - +import re __verbose__ = False __showtiming__ = False +def find_layer_name(layers, pattern): + """ + Find a layer name in the list of layers that matches the regex pattern. + """ + for layer in layers: + if re.search(pattern, layer, re.IGNORECASE): + return layer + return None + def read_geopkg(file_path, compute_parameters, waterbody_parameters, cpu_pool): - # Establish which layers we will read. We'll always need the flowpath tables - layers = ['flowpaths','flowpath_attributes'] - - # If waterbodies are being simulated, read lakes table - if waterbody_parameters.get('break_network_at_waterbodies',False): - layers.extend(['lakes', 'nexus']) - - # If any DA is activated, read network table as well for gage information - data_assimilation_parameters = compute_parameters.get('data_assimilation_parameters',{}) - streamflow_nudging = data_assimilation_parameters.get('streamflow_da',{}).get('streamflow_nudging',False) - usgs_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_persistence_usgs',False) - usace_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_persistence_usace',False) - rfc_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_rfc_da',{}).get('reservoir_rfc_forecasts',False) - if any([streamflow_nudging, usgs_da, usace_da, rfc_da]): - layers.append('network') + # Retrieve available layers from the GeoPackage + available_layers = fiona.listlayers(file_path) + + # patterns for the layers we want to find + layer_patterns = { + 'flowpaths': r'flow[-_]?paths?', + 'flowpath_attributes': r'flow[-_]?path[-_]?attributes?', + 'lakes': r'lakes?', + 'nexus': r'nexus?', + 'network': r'network' + } + + # Match available layers to the patterns + matched_layers = {key: find_layer_name(available_layers, pattern) + for key, pattern in layer_patterns.items()} - # If diffusive is activated, read nexus table for lat/lon information - hybrid_parameters = compute_parameters.get('hybrid_parameters',{}) - hybrid_routing = hybrid_parameters.get('run_hybrid_routing', False) - if hybrid_routing & ('nexus' not in layers): - layers.append('nexus') - - # Define a function to read a layer from the geopackage - def read_layer(layer): - try: - return gpd.read_file(file_path, layer=layer) - except Exception as e: - return pd.DataFrame() - - # Retrieve geopackage information: + layers_to_read = ['flowpaths', 'flowpath_attributes'] + + if waterbody_parameters.get('break_network_at_waterbodies', False): + layers_to_read.extend(['lakes', 'nexus']) + + data_assimilation_parameters = compute_parameters.get('data_assimilation_parameters', {}) + if any([ + data_assimilation_parameters.get('streamflow_da', {}).get('streamflow_nudging', False), + data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_persistence_usgs', False), + data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_persistence_usace', False), + data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_rfc_da', {}).get('reservoir_rfc_forecasts', False) + ]): + layers_to_read.append('network') + + hybrid_parameters = compute_parameters.get('hybrid_parameters', {}) + if hybrid_parameters.get('run_hybrid_routing', False) and 'nexus' not in layers_to_read: + layers_to_read.append('nexus') + + # Function that read a layer from the geopackage + def read_layer(layer_name): + if layer_name: + try: + return gpd.read_file(file_path, layer=layer_name) + except Exception as e: + print(f"Error reading {layer_name}: {e}") + return pd.DataFrame() + return pd.DataFrame() + + # Retrieve geopackage information using matched layer names if cpu_pool > 1: - with Parallel(n_jobs=min(cpu_pool, len(layers))) as parallel: - gpkg_list = parallel(delayed(read_layer)(layer) for layer in layers) - - table_dict = {layers[i]: gpkg_list[i] for i in range(len(layers))} + with Parallel(n_jobs=min(cpu_pool, len(layers_to_read))) as parallel: + gpkg_list = parallel(delayed(read_layer)(matched_layers[layer]) for layer in layers_to_read) + + table_dict = {layers_to_read[i]: gpkg_list[i] for i in range(len(layers_to_read))} else: - table_dict = {layer: read_layer(layer) for layer in layers} + table_dict = {layer: read_layer(matched_layers[layer]) for layer in layers_to_read} + + # Handle different key column names between flowpaths and flowpath_attributes + flowpaths_df = table_dict.get('flowpaths', pd.DataFrame()) + flowpath_attributes_df = table_dict.get('flowpath_attributes', pd.DataFrame()) + + # Check if 'link' column exists and rename it to 'id' + if 'link' in flowpath_attributes_df.columns: + flowpath_attributes_df.rename(columns={'link': 'id'}, inplace=True) + + # Merge flowpaths and flowpath_attributes + flowpaths = pd.merge( + flowpaths_df, + flowpath_attributes_df, + on='id', + how='inner' + ) - flowpaths = pd.merge(table_dict.get('flowpaths', pd.DataFrame()), table_dict.get('flowpath_attributes', pd.DataFrame()), on='id', how='inner') lakes = table_dict.get('lakes', pd.DataFrame()) network = table_dict.get('network', pd.DataFrame()) nexus = table_dict.get('nexus', pd.DataFrame())