Skip to content

Commit

Permalink
Optimizing read_geopkg function
Browse files Browse the repository at this point in the history
  • Loading branch information
AminTorabi-NOAA committed Aug 7, 2024
1 parent 5865910 commit f007e01
Showing 1 changed file with 18 additions and 32 deletions.
50 changes: 18 additions & 32 deletions src/troute-network/troute/HYFeaturesNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ def read_geopkg(file_path, compute_parameters, waterbody_parameters, cpu_pool):

# If waterbodies are being simulated, read lakes table
if waterbody_parameters.get('break_network_at_waterbodies',False):
layers.append('lakes')
layers.append('nexus')
layers.extend(['lakes', 'nexus'])

# If any DA is activated, read network table as well for gage information
data_assimilation_parameters = compute_parameters.get('data_assimilation_parameters',{})
Expand All @@ -44,40 +43,27 @@ def read_geopkg(file_path, compute_parameters, waterbody_parameters, cpu_pool):
hybrid_routing = hybrid_parameters.get('run_hybrid_routing', False)
if hybrid_routing & ('nexus' not in layers):
layers.append('nexus')


# Define a function to read a layer from the geopackage
def read_layer(layer):
try:
return gpd.read_file(file_path, layer=layer)
except Exception as e:
return pd.DataFrame()

# Retrieve geopackage information:
if cpu_pool > 1:
with Parallel(n_jobs=len(layers)) as parallel:
jobs = []
for layer in layers:
jobs.append(
delayed(gpd.read_file)
#(f, qlat_file_value_col, gw_bucket_col, terrain_ro_col)
#delayed(nhd_io.get_ql_from_csv)
(filename=file_path, layer=layer)
)
gpkg_list = parallel(jobs)
with Parallel(n_jobs=min(cpu_pool, len(layers))) as parallel:
gpkg_list = parallel(delayed(read_layer)(layer) for layer in layers)

table_dict = {layers[i]: gpkg_list[i] for i in range(len(layers))}
flowpaths = pd.merge(table_dict.get('flowpaths'), table_dict.get('flowpath_attributes'), on='id')
lakes = table_dict.get('lakes', pd.DataFrame())
network = table_dict.get('network', pd.DataFrame())
nexus = table_dict.get('nexus', pd.DataFrame())
else:
flowpaths = gpd.read_file(file_path, layer='flowpaths')
flowpath_attributes = gpd.read_file(file_path, layer='flowpath_attributes')
flowpaths = pd.merge(flowpaths, flowpath_attributes, on='id')
# If waterbodies are being simulated, read lakes table
lakes = pd.DataFrame()
if 'lakes' in layers:
lakes = gpd.read_file(file_path, layer='lakes')
# If any DA is activated, read network table as well for gage information
network = pd.DataFrame()
if 'network' in layers:
network = gpd.read_file(file_path, layer='network')
# If diffusive is activated, read nexus table for lat/lon information
nexus = pd.DataFrame()
if 'nexus' in layers:
nexus = gpd.read_file(file_path, layer='nexus')
table_dict = {layer: read_layer(layer) for layer in layers}

flowpaths = pd.merge(table_dict.get('flowpaths', pd.DataFrame()), table_dict.get('flowpath_attributes', pd.DataFrame()), on='id', how='inner')
lakes = table_dict.get('lakes', pd.DataFrame())
network = table_dict.get('network', pd.DataFrame())
nexus = table_dict.get('nexus', pd.DataFrame())

return flowpaths, lakes, network, nexus

Expand Down

0 comments on commit f007e01

Please sign in to comment.