Skip to content

Commit

Permalink
reading new geopackage and renaming with regex
Browse files Browse the repository at this point in the history
  • Loading branch information
AminTorabi-NOAA committed Aug 14, 2024
1 parent f007e01 commit 3b1d55b
Showing 1 changed file with 76 additions and 37 deletions.
113 changes: 76 additions & 37 deletions src/troute-network/troute/HYFeaturesNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,93 @@
from datetime import datetime
from pprint import pformat
import os

import fiona
import troute.nhd_io as nhd_io #FIXME
from troute.nhd_network import reverse_dict, extract_connections, reverse_network, reachable
from .rfc_lake_gage_crosswalk import get_rfc_lake_gage_crosswalk, get_great_lakes_climatology

import re
__verbose__ = False
__showtiming__ = False

def find_layer_name(layers, pattern):
"""
Find a layer name in the list of layers that matches the regex pattern.
"""
for layer in layers:
if re.search(pattern, layer, re.IGNORECASE):
return layer
return None

def read_geopkg(file_path, compute_parameters, waterbody_parameters, cpu_pool):
# Establish which layers we will read. We'll always need the flowpath tables
layers = ['flowpaths','flowpath_attributes']

# If waterbodies are being simulated, read lakes table
if waterbody_parameters.get('break_network_at_waterbodies',False):
layers.extend(['lakes', 'nexus'])

# If any DA is activated, read network table as well for gage information
data_assimilation_parameters = compute_parameters.get('data_assimilation_parameters',{})
streamflow_nudging = data_assimilation_parameters.get('streamflow_da',{}).get('streamflow_nudging',False)
usgs_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_persistence_usgs',False)
usace_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_persistence_usace',False)
rfc_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_rfc_da',{}).get('reservoir_rfc_forecasts',False)
if any([streamflow_nudging, usgs_da, usace_da, rfc_da]):
layers.append('network')
# Retrieve available layers from the GeoPackage
available_layers = fiona.listlayers(file_path)

# patterns for the layers we want to find
layer_patterns = {
'flowpaths': r'flow[-_]?paths?',
'flowpath_attributes': r'flow[-_]?path[-_]?attributes?',
'lakes': r'lakes?',
'nexus': r'nexus?',
'network': r'network'
}

# Match available layers to the patterns
matched_layers = {key: find_layer_name(available_layers, pattern)
for key, pattern in layer_patterns.items()}

# If diffusive is activated, read nexus table for lat/lon information
hybrid_parameters = compute_parameters.get('hybrid_parameters',{})
hybrid_routing = hybrid_parameters.get('run_hybrid_routing', False)
if hybrid_routing & ('nexus' not in layers):
layers.append('nexus')

# Define a function to read a layer from the geopackage
def read_layer(layer):
try:
return gpd.read_file(file_path, layer=layer)
except Exception as e:
return pd.DataFrame()

# Retrieve geopackage information:
layers_to_read = ['flowpaths', 'flowpath_attributes']

if waterbody_parameters.get('break_network_at_waterbodies', False):
layers_to_read.extend(['lakes', 'nexus'])

data_assimilation_parameters = compute_parameters.get('data_assimilation_parameters', {})
if any([
data_assimilation_parameters.get('streamflow_da', {}).get('streamflow_nudging', False),
data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_persistence_usgs', False),
data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_persistence_usace', False),
data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_rfc_da', {}).get('reservoir_rfc_forecasts', False)
]):
layers_to_read.append('network')

hybrid_parameters = compute_parameters.get('hybrid_parameters', {})
if hybrid_parameters.get('run_hybrid_routing', False) and 'nexus' not in layers_to_read:
layers_to_read.append('nexus')

# Function that read a layer from the geopackage
def read_layer(layer_name):
if layer_name:
try:
return gpd.read_file(file_path, layer=layer_name)
except Exception as e:
print(f"Error reading {layer_name}: {e}")
return pd.DataFrame()
return pd.DataFrame()

# Retrieve geopackage information using matched layer names
if cpu_pool > 1:
with Parallel(n_jobs=min(cpu_pool, len(layers))) as parallel:
gpkg_list = parallel(delayed(read_layer)(layer) for layer in layers)

table_dict = {layers[i]: gpkg_list[i] for i in range(len(layers))}
with Parallel(n_jobs=min(cpu_pool, len(layers_to_read))) as parallel:
gpkg_list = parallel(delayed(read_layer)(matched_layers[layer]) for layer in layers_to_read)
table_dict = {layers_to_read[i]: gpkg_list[i] for i in range(len(layers_to_read))}
else:
table_dict = {layer: read_layer(layer) for layer in layers}
table_dict = {layer: read_layer(matched_layers[layer]) for layer in layers_to_read}

# Handle different key column names between flowpaths and flowpath_attributes
flowpaths_df = table_dict.get('flowpaths', pd.DataFrame())
flowpath_attributes_df = table_dict.get('flowpath_attributes', pd.DataFrame())

# Check if 'link' column exists and rename it to 'id'
if 'link' in flowpath_attributes_df.columns:
flowpath_attributes_df.rename(columns={'link': 'id'}, inplace=True)

# Merge flowpaths and flowpath_attributes
flowpaths = pd.merge(
flowpaths_df,
flowpath_attributes_df,
on='id',
how='inner'
)

flowpaths = pd.merge(table_dict.get('flowpaths', pd.DataFrame()), table_dict.get('flowpath_attributes', pd.DataFrame()), on='id', how='inner')
lakes = table_dict.get('lakes', pd.DataFrame())
network = table_dict.get('network', pd.DataFrame())
nexus = table_dict.get('nexus', pd.DataFrame())
Expand Down

0 comments on commit 3b1d55b

Please sign in to comment.