diff --git a/src/open_gira/disruption.py b/src/open_gira/disruption.py new file mode 100644 index 00000000..c9a65bd0 --- /dev/null +++ b/src/open_gira/disruption.py @@ -0,0 +1,67 @@ +import geopandas as gpd +import pandas as pd +import rasterio + +import snail.intersection + + +def filter_edges_by_raster( + edges: gpd.GeoDataFrame, + raster_path: str, + failure_threshold: float, + band: int = 1 +) -> gpd.GeoDataFrame: + """ + Remove edges from a network that are exposed to gridded values in excess of + a given threshold. + + Args: + edges: Network edges to consider. Must contain geometry column containing linestrings. + raster_path: Path to raster file on disk, openable by rasterio. + failure_threshold: Edges experiencing a raster value in excess of this will be + removed from the network. + band: Band of raster to read data from. + + Returns: + Network without edges experiencing raster values in excess of threshold. + """ + + # we will create a new edge_id column, fail if we would overwrite + assert "edge_id" not in edges.columns + + # split out edges without geometry as snail will not handle them gracefully + print("Parition edges by existence of geometry...") + no_geom_edges = edges.loc[edges.geometry.type.isin({None}), :].copy() + with_geom_edges = edges.loc[~edges.geometry.type.isin({None}), :].copy() + + # we need an id to select edges by + with_geom_edges["edge_id"] = range(len(with_geom_edges)) + + print("Read raster transform...") + grid = snail.intersection.GridDefinition.from_raster(raster_path) + + print("Prepare linestrings...") + edges = snail.intersection.prepare_linestrings(with_geom_edges) + + print("Split linestrings...") + splits = snail.intersection.split_linestrings(edges, grid) + + print("Lookup raster indices...") + splits_with_indices = snail.intersection.apply_indices(splits, grid) + + print("Read raster data into memory...") + with rasterio.open(raster_path) as dataset: + raster = dataset.read(band) + + print("Lookup raster value for splits...") + values = snail.intersection.get_raster_values_for_splits(splits_with_indices, raster) + splits_with_values = splits_with_indices.copy() + splits_with_values["raster_value"] = values + + print(f"Filter out edges with splits experiencing values in excess of {failure_threshold} threshold...") + failed_splits_mask = splits_with_values.raster_value > failure_threshold + failed_edge_ids = set(splits_with_values[failed_splits_mask].edge_id.unique()) + surviving_edges_with_geom = edges.loc[~edges.edge_id.isin(failed_edge_ids), :] + + print("Done filtering edges...") + return pd.concat([no_geom_edges, surviving_edges_with_geom]).sort_index().drop(columns=["edge_id"]) \ No newline at end of file diff --git a/src/open_gira/plot/utils.py b/src/open_gira/plot/utils.py index b73c1ecd..c8197d97 100644 --- a/src/open_gira/plot/utils.py +++ b/src/open_gira/plot/utils.py @@ -3,6 +3,60 @@ """ +import geopandas as gpd +import numpy as np +import pandas as pd +import shapely +from shapely.ops import split + + +def chop_at_antimeridian(gdf: gpd.GeoDataFrame, drop_null_geometry: bool = False) -> gpd.GeoDataFrame: + """ + Cut LineStrings either side of antimeridian, then drop the fragments that + intersect with antimeridian. + + Warning: Will create new rows (split geometries) with duplicate indices. + + Args: + gdf: Table with geometry to chop at antimeridian + drop_null_geometry: If true, drop any null geometry rows before plotting + + Returns: + Table, potentially with new rows. No rows in the table should have + geometries that cross the antimeridian. + """ + if drop_null_geometry: + gdf = gdf.loc[~gdf.geometry.isna(), :] + + assert set(gdf.geometry.type) == {'LineString'} + + def split_on_meridian(gdf: gpd.GeoDataFrame, meridian: shapely.geometry.LineString) -> gpd.GeoDataFrame: + return gdf.assign(geometry=gdf.apply(lambda row: split(row.geometry, meridian), axis=1)).explode(index_parts=False) + + xlim = 179.9 + ylim = 90 + + split_e = split_on_meridian(gdf, shapely.geometry.LineString([(xlim, ylim), (xlim, -ylim)])) + split_e_and_w = split_on_meridian(split_e, shapely.geometry.LineString([(-xlim, ylim), (-xlim, -ylim)])) + + def crosses_antimeridian(row: pd.Series) -> bool: + """ + Check if there are longitudes in a geometry that are near the antimeridian + (i.e. -180) and both sides of it. If so, return true. + """ + x, _ = row.geometry.coords.xy + longitudes_near_antimeridian = np.array(x)[np.argwhere(np.abs(np.abs(x) - 180) < xlim).ravel()] + if len(longitudes_near_antimeridian) == 0: + return False + hemispheres = np.unique(np.sign(longitudes_near_antimeridian)) + if (-1 in hemispheres) and (1 in hemispheres): + return True + else: + return False + + return split_e_and_w[~split_e_and_w.apply(crosses_antimeridian, axis=1)] + + def figure_size( min_x: float, min_y: float, diff --git a/src/open_gira/routing.py b/src/open_gira/routing.py new file mode 100644 index 00000000..740a54e9 --- /dev/null +++ b/src/open_gira/routing.py @@ -0,0 +1,222 @@ +""" +Allocate flows (value and volume) from an origin-destination (OD) file across a network of edges. +""" + +import multiprocessing +import os +import tempfile +import time + +import igraph as ig +import geopandas as gpd +import pandas as pd +from tqdm import tqdm + + +# dict containing: +# 'value_kusd' -> float +# 'volume_tons' -> float +# 'edge_indicies' -> list[indicies] +FlowResult = dict[str, float | list[int]] + +# dict with FlowResult values +# source node, destination country -> FlowResult dict +# e.g. +# ('thailand_4_123', 'GID_0_GBR') -> FlowResult dict +RouteResult = dict[tuple[str, str], FlowResult] + + +# We do not do routing within partner countries, instead we terminate at a port +# in the destination country. Destination country nodes are connected to ports by +# special edges. We give these edges a very high value, so that least cost route +# finding will take them as a last resort (no other lower cost route is +# available). + +# When calculating the total cost of a route, we want to be able to identify +# these imaginary links and remove them. +DESTINATION_LINK_COST_USD_T: float = 1E6 + + +def init_worker(graph_filepath: str, od_filepath: str) -> None: + """ + Create global variables referencing graph and OD to persist through worker lifetime. + + Args: + graph_filepath: Filepath of pickled igraph.Graph to route over. + od_filepath: Filepath to table of flows from origin node 'id' to + destination country 'partner_GID_0', should also contain 'value_kusd' + and 'volume_tons'. + """ + print(f"Process {os.getpid()} initialising...") + global graph + graph = ig.Graph.Read_Pickle(graph_filepath) + global od + od = pd.read_parquet(od_filepath) + return + + +def route_from_node(from_node: str) -> RouteResult: + """ + Route flows from single 'from_node' to destinations across graph. Record value and + volume flowing across each edge. + + Args: + from_node: Node ID of source node. + + Returns: + Mapping from (source node, destination country node) key, to value of + flow, volume of flow and list of edge ids of route. + """ + print(f"Process {os.getpid()} routing {from_node}...") + + from_node_od = od[od.id == from_node] + destination_nodes: list[str] = [f"GID_0_{iso_a3}" for iso_a3 in from_node_od.partner_GID_0.unique()] + + routes_edge_list = [] + try: + routes_edge_list: list[list[int]] = graph.get_shortest_paths( + f"road_{from_node}", + destination_nodes, + weights="cost_USD_t", + output="epath" + ) + except ValueError as error: + if "no such vertex" in str(error): + print(f"{error}... skipping destination") + pass + else: + raise error + + routes: RouteResult = {} + + if routes_edge_list: + assert len(routes_edge_list) == len(destination_nodes) + else: + return routes + + # lookup trade value and volume for each pairing of from_node and partner country + for i, destination_node in enumerate(destination_nodes): + # "GID_0_GBR" -> "GBR" + iso_a3 = destination_node.split("_")[-1] + route = from_node_od[ + (from_node_od.id == from_node) & (from_node_od.partner_GID_0 == iso_a3) + ] + value_kusd, = route.value_kusd + volume_tons, = route.volume_tons + + routes[(from_node, destination_node)] = { + "value_kusd": value_kusd, + "volume_tons": volume_tons, + "edge_indices": routes_edge_list[i] + } + + print(f"Process {os.getpid()} finished routing {from_node}...") + return routes + + +def route_from_all_nodes(od: pd.DataFrame, edges: gpd.GeoDataFrame, n_cpu: int) -> RouteResult: + """ + Route flows from origins to destinations across graph. + + Args: + od: Table of flows from origin node 'id' to destination country + 'partner_GID_0', should also contain 'value_kusd' and 'volume_tons'. + edges: Table of edges to construct graph from. First column should be + source node id and second destination node id. + n_cpu: Number of CPUs to use for routing. + + Returns: + Mapping from source node, to destination country node, to flow in value + and volume along this route and list of edge indices constituting + the route. + """ + + print("Creating graph...") + # cannot add vertices as edges reference port493_out, port281_in, etc. which are missing from nodes file + # use_vids=False as edges.from_id and edges_to_id are not integers + graph = ig.Graph.DataFrame(edges, directed=True, use_vids=False) + + temp_dir = tempfile.TemporaryDirectory() + + print("Writing graph to disk...") + graph_filepath = os.path.join(temp_dir.name, "graph.pickle") + graph.write_pickle(graph_filepath) + + print("Writing OD to disk...") + od_filepath = os.path.join(temp_dir.name, "od.pq") + od.to_parquet(od_filepath) + + print("Routing...") + start = time.time() + from_nodes = od.id.unique() + args = ((from_node,) for from_node in from_nodes) + # as each process is created, it will load the graph and od from disk in + # init_worker and then persist these in memory as globals between chunks + with multiprocessing.Pool( + processes=n_cpu, + initializer=init_worker, + initargs=(graph_filepath, od_filepath), + ) as pool: + routes: list[RouteResult] = pool.starmap(route_from_node, args) + + print("\n") + print(f"Routing completed in {time.time() - start:.2f}s") + + temp_dir.cleanup() + + # flatten our list of RouteResult dicts into one dict + return {k: v for item in routes for (k, v) in item.items()} + + +def lookup_route_costs( + routes_path: str, + edges_path: str, + destination_link_cost_USD_t: float = DESTINATION_LINK_COST_USD_T +) -> pd.DataFrame: + """ + For each route (source -> destination pair), lookup the edges + of the least cost route (the route taken) and sum those costs. + Store alongside value and volume of route. + + Args: + routes_path: Path to routes table, should have multi-index: (source node, + destination node) and include value_kusd, volume_tons and edge_indices + columns + edges_path: Path to edges table, should have cost_USD_t column which we + will positional index into with edge_indices from the routes table. + destination_link_cost_USD_t: Cost of traversing 'destination' links, to + partner entities. There should only be one of these links in any given + route. + + Returns: + Routes appended with their total cost in USD t-1 + """ + routes_with_edge_indices: pd.DataFrame = pd.read_parquet(routes_path) + edges: gpd.GeoDataFrame = gpd.read_parquet(edges_path) + cost_col_id = edges.columns.get_loc("cost_USD_t") + routes = [] + for index, route_data in tqdm(routes_with_edge_indices.iterrows(), total=len(routes_with_edge_indices)): + source_node, destination_node = index + cost_including_destination_link_USD_t = edges.iloc[route_data.edge_indices, cost_col_id].sum() + + cost_USD_t: float = cost_including_destination_link_USD_t % destination_link_cost_USD_t + + if int(cost_including_destination_link_USD_t // destination_link_cost_USD_t) != 1: + # must have exactly 1 destination link, otherwise not a valid route + continue + + if cost_USD_t != 0: + routes.append( + ( + source_node, + destination_node.split("_")[-1], + route_data.value_kusd, + route_data.volume_tons, + cost_USD_t + ) + ) + + return pd.DataFrame( + routes, + columns=["source_node", "destination_node", "value_kusd", "volume_tons", "cost_USD_t"] + ) diff --git a/workflow/Snakefile b/workflow/Snakefile index 616db8c0..0b31eb4d 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -134,6 +134,7 @@ include: "transport/create_overall_bbox.smk" include: "transport/join_data.smk" include: "transport/maritime/maritime.smk" include: "transport/multi-modal/multi_modal.smk" +include: "transport/flow_allocation/allocate.smk" include: "flood/aqueduct.smk" include: "flood/trim_hazard_data.smk" diff --git a/workflow/transport/flow_allocation/allocate.py b/workflow/transport/flow_allocation/allocate.py new file mode 100644 index 00000000..a5f3d113 --- /dev/null +++ b/workflow/transport/flow_allocation/allocate.py @@ -0,0 +1,48 @@ + +import geopandas as gpd +import pandas as pd +from tqdm import tqdm + +from open_gira.routing import route_from_all_nodes, RouteResult + + +if __name__ == "__main__": + + print("Reading network...") + # read in global multi-modal transport network + edges = gpd.read_parquet(snakemake.input.edges) + available_destinations = edges[edges["mode"] == "imaginary"].to_id.unique() + available_country_destinations = [d.split("_")[-1] for d in available_destinations if d.startswith("GID_")] + + print("Reading OD matrix...") + # read in trade OD matrix + od = pd.read_parquet(snakemake.input.od) + print(f"OD has {len(od):,d} flows") + + # 5t threshold drops THL road -> GID_0 OD from ~21M -> ~2M + minimum_flow_volume_tons = snakemake.config["minimum_flow_volume_t"] + od = od[od.volume_tons > minimum_flow_volume_tons] + print(f"After dropping flows with volume < {minimum_flow_volume_tons}t, OD has {len(od):,d} flows") + + # drop any flows we can't find a route to + od = od[od.partner_GID_0.isin(available_country_destinations)] + print(f"After dropping unrouteable destination countries, OD has {len(od):,d} flows") + + routes: RouteResult = route_from_all_nodes(od, edges, snakemake.threads) + + print("Writing routes to disk as parquet...") + pd.DataFrame(routes).T.to_parquet(snakemake.output.routes) + + print("Assigning route flows to edges...") + edges["value_kusd"] = 0 + edges["volume_tons"] = 0 + value_col_id = edges.columns.get_loc("value_kusd") + volume_col_id = edges.columns.get_loc("volume_tons") + for (from_node, destination_country), route_data in tqdm(routes.items()): + edges.iloc[route_data["edge_indices"], value_col_id] += route_data["value_kusd"] + edges.iloc[route_data["edge_indices"], volume_col_id] += route_data["volume_tons"] + + print("Writing edge flows to disk as geoparquet...") + edges.to_parquet(snakemake.output.edges_with_flows) + + print("Done") \ No newline at end of file diff --git a/workflow/transport/flow_allocation/allocate.smk b/workflow/transport/flow_allocation/allocate.smk new file mode 100644 index 00000000..065b3d0a --- /dev/null +++ b/workflow/transport/flow_allocation/allocate.smk @@ -0,0 +1,81 @@ +rule allocate_intact_network: + """ + Allocate a trade OD matrix across a multi-modal transport network + + Test with: + snakemake -c1 -- results/flow_allocation/project-thailand/edges.gpq + """ + input: + edges = "{OUTPUT_DIR}/multi-modal_network/{PROJECT_SLUG}/edges.gpq", + od = "{OUTPUT_DIR}/input/trade_matrix/{PROJECT_SLUG}/trade_nodes_total.parquet", + threads: workflow.cores + params: + # if this changes, we want to trigger a re-run + minimum_flow_volume_t = config["minimum_flow_volume_t"], + output: + routes = "{OUTPUT_DIR}/flow_allocation/{PROJECT_SLUG}/routes.pq", + edges_with_flows = "{OUTPUT_DIR}/flow_allocation/{PROJECT_SLUG}/edges.gpq", + script: + "./allocate.py" + + +rule allocate_degraded_network: + """ + Allocate a trade OD matrix across a multi-modal transport network which has + lost edges as a result of intersection with a hazard map. + + Test with: + snakemake -c1 -- results/flow_allocation/project-thailand/hazard-thai-floods-2011-JBA/edges.gpq + """ + input: + edges = "{OUTPUT_DIR}/multi-modal_network/{PROJECT_SLUG}/{HAZARD_SLUG}/edges.gpq", + od = "{OUTPUT_DIR}/input/trade_matrix/{PROJECT_SLUG}/trade_nodes_total.parquet", + threads: workflow.cores + params: + # if this changes, we want to trigger a re-run + minimum_flow_volume_t = config["minimum_flow_volume_t"], + output: + routes = "{OUTPUT_DIR}/flow_allocation/{PROJECT_SLUG}/{HAZARD_SLUG}/routes.pq", + edges_with_flows = "{OUTPUT_DIR}/flow_allocation/{PROJECT_SLUG}/{HAZARD_SLUG}/edges.gpq", + script: + "./allocate.py" + + +rule accumulate_route_costs_intact: + """ + For each route in the OD (source -> destination pair), lookup the edges of + the least cost route (the route taken) and sum those costs. Store alongside + value and volume of route. + + Test with: + snakemake -c1 -- results/flow_allocation/project-thailand/routes_with_costs.pq + """ + input: + routes = "{OUTPUT_DIR}/flow_allocation/{PROJECT_SLUG}/routes.pq", + edges_with_flows = "{OUTPUT_DIR}/flow_allocation/{PROJECT_SLUG}/edges.gpq", + output: + routes_with_costs = "{OUTPUT_DIR}/flow_allocation/{PROJECT_SLUG}/routes_with_costs.pq", + run: + from open_gira.routing import lookup_route_costs + + lookup_route_costs(input.routes, input.edges_with_flows).to_parquet(output.routes_with_costs) + + +rule accumulate_route_costs_degraded: + """ + For each route in the OD (source -> destination pair), lookup the edges of + the least cost route (the route taken) and sum those costs. Store alongside + value and volume of route. + + Test with: + snakemake -c1 -- results/flow_allocation/project-thailand/hazard-thai-floods-2011-JBA/routes_with_costs.pq + """ + input: + routes = "{OUTPUT_DIR}/flow_allocation/{PROJECT_SLUG}/{HAZARD_SLUG}/routes.pq", + edges_with_flows = "{OUTPUT_DIR}/flow_allocation/{PROJECT_SLUG}/{HAZARD_SLUG}/edges.gpq", + output: + routes_with_costs = "{OUTPUT_DIR}/flow_allocation/{PROJECT_SLUG}/{HAZARD_SLUG}/routes_with_costs.pq", + run: + from open_gira.routing import lookup_route_costs + + lookup_route_costs(input.routes, input.edges_with_flows).to_parquet(output.routes_with_costs)