diff --git a/config/config.yaml b/config/config.yaml index c61cc2d2..0361825f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,6 +1,6 @@ -########################## -### TRANSPORT WORKFLOW ### -########################## +################################## +### TRANSPORT DAMAGES WORKFLOW ### +################################## # OSM datasets in PBF format, principally from: https://download.geofabrik.de/ # infrastructure_datasets: @@ -66,6 +66,37 @@ keep_tags: # Number of slices to cut dataset into -- must be a square number slice_count: 64 +##################################### +### TRANSPORT DISRUPTION WORKFLOW ### +##################################### + +# country for which trade OD has been prepared, and we are to route land trade flows +study_country_iso_a3: "THA" + +# transport cost information +# road +road_cost_USD_t_km: 0.05 +road_cost_USD_t_h: 0.48 +road_default_speed_limit_km_h: 80 +# rail +rail_cost_USD_t_km: 0.05 +rail_cost_USD_t_h: 0.38 +rail_average_freight_speed_km_h: 40 + +# cost of changing transport mode in USD per tonne +# from mistral/ccg-critical-minerals/processed_data/transport_costs/intermodal.xlsx, 20240611 +intermodal_cost_USD_t: + road_rail: 5 + maritime_road: 4 + maritime_rail: 5 + +# drop trade flows with less volume than this (accelerate flow allocation) +# N.B. 50t threshold preserves 91% of total volume and 88% of total value +# for combined agriculture & manufacturing flows between Thailand road nodes -> partner countries +minimum_flow_volume_t: 50 + +# if disrupting a network, remove edges experiencing hazard values in excess of this +edge_failure_threshold: 0.5 ######################### ### FLOODING WORKFLOW ### diff --git a/environment.yml b/environment.yml index f6539593..0def2eaf 100644 --- a/environment.yml +++ b/environment.yml @@ -3,13 +3,12 @@ name: open-gira channels: - conda-forge # majority of dependencies - - bioconda # snakemake - defaults dependencies: - python=3.10 - pip - pip: # delegate to pip for non-conda packages - - nismod-snail==0.3.1 # vector-raster intersections + - nismod-snail==0.5.2 # vector-raster intersections - osmium==3.2.0 # OpenStreetMap protobuf handling - snkit==1.8.1 # spatial network cleaning # required zenodo_get version not available via conda-forge @@ -23,8 +22,9 @@ dependencies: - dask<2024.3.0 # larger-than-memory computing - flake8 # linter - gdal>=3.3 # command-line tools for spatial data - - geopandas==0.14.1 # geospatial dataframes + - geopandas==0.14.4 # geospatial dataframes - geopy # geocoding client + - igraph # graph algorithms and data structures - ipykernel # notebook support - jupyter # notebook support - jq # JSON processing tool @@ -50,7 +50,7 @@ dependencies: - rioxarray # xarray datasets from raster files - scipy # scientific computing library - spatialpandas # plotting large datasets - - snakemake==7.18.2 # workflow management + - bioconda::snakemake==7.18.2 # workflow management # https://github.com/snakemake/snakemake/issues/1891 - tabulate==0.8.10 # snakemake dependency with bug in 9.0.0, pin previous - tqdm==4.62.3 # progress bars diff --git a/src/open_gira/network.py b/src/open_gira/network.py deleted file mode 100644 index 56030867..00000000 --- a/src/open_gira/network.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -Functionality for creating and manipulating networks. -""" - -import logging -import warnings - -import geopandas as gpd -import snkit - - -def create_network( - edges: gpd.GeoDataFrame, - nodes: gpd.GeoDataFrame = None, - id_prefix: str = "" -) -> snkit.network.Network: - """ - Create snkit network from edges and (optional) nodes and clean the result. - - Arguments: - edges (gpd.GeoDataFrame): Expected to contain geometry column of linestrings - nodes (gpd.GeoDataFrame): Optional nodes to include. snkit will try to snap to edges - - Returns: - snkit.network.Network: Built network - """ - - logging.info("Starting network creation") - - # drop edges with no geometry - empty_idx = edges.geometry.apply(lambda e: e is None or e.is_empty) - if empty_idx.sum(): - empty_edges = edges[empty_idx] - logging.info(f"Found {len(empty_edges)} empty edges.") - logging.info(empty_edges) - edges = edges[~empty_idx].copy() - - logging.info("Creating network") - network = snkit.Network(nodes, edges) - - logging.info("Splitting multilines") - network = snkit.network.split_multilinestrings(network) - - if nodes is not None and not nodes.empty: - # check we have only point nodes - assert set(network.nodes.geometry.type.values) == {"Point"} - - logging.info("Dropping duplicate geometries") - # silence shapely.ops.split ShapelyDeprecationWarning regarding: - # shapley.ops.split failure on split by empty geometry collection - # this is currently caught by snkit.network.split_edge_at_points, - # but won't be for shapely==2.0 - warnings.filterwarnings( - "ignore", - message=( - ".*GeometryTypeError will derive from ShapelyError " - "and not TypeError or ValueError in Shapely 2.0*" - ) - ) - network.nodes = snkit.network.drop_duplicate_geometries(network.nodes) - - logging.info("Snapping nodes to edges") - network = snkit.network.snap_nodes(network) - - logging.info("Adding endpoints") - network = snkit.network.add_endpoints(network) - - logging.info("Splitting edges at nodes") - network = snkit.network.split_edges_at_nodes(network) - - # check we have only linestrings - assert set(network.edges.geometry.type.values) == {"LineString"} - - logging.info("Renaming nodes and edges") - network = snkit.network.add_ids(network, edge_prefix=id_prefix, node_prefix=id_prefix) - - logging.info("Creating network topology") - network = snkit.network.add_topology(network, id_col="id") - - return network diff --git a/src/open_gira/network_creation.py b/src/open_gira/network_creation.py new file mode 100644 index 00000000..74e92ff9 --- /dev/null +++ b/src/open_gira/network_creation.py @@ -0,0 +1,423 @@ +import logging +import warnings + +import geopandas as gpd +import numpy as np +import pandas as pd +import pyproj +from scipy.spatial import cKDTree +from shapely.geometry import LineString + +import snkit + + +def create_network( + edges: gpd.GeoDataFrame, + nodes: gpd.GeoDataFrame = None, + id_prefix: str = "" +) -> snkit.network.Network: + """ + Create snkit network from edges and (optional) nodes and clean the result. + + Arguments: + edges (gpd.GeoDataFrame): Expected to contain geometry column of linestrings + nodes (gpd.GeoDataFrame): Optional nodes to include. snkit will try to snap to edges + + Returns: + snkit.network.Network: Built network + """ + + logging.info("Starting network creation") + + # drop edges with no geometry + empty_idx = edges.geometry.apply(lambda e: e is None or e.is_empty) + if empty_idx.sum(): + empty_edges = edges[empty_idx] + logging.info(f"Found {len(empty_edges)} empty edges.") + logging.info(empty_edges) + edges = edges[~empty_idx].copy() + + logging.info("Creating network") + network = snkit.Network(nodes, edges) + + logging.info("Splitting multilines") + network = snkit.network.split_multilinestrings(network) + + if nodes is not None and not nodes.empty: + # check we have only point nodes + assert set(network.nodes.geometry.type.values) == {"Point"} + + logging.info("Dropping duplicate geometries") + # silence shapely.ops.split ShapelyDeprecationWarning regarding: + # shapley.ops.split failure on split by empty geometry collection + # this is currently caught by snkit.network.split_edge_at_points, + # but won't be for shapely==2.0 + warnings.filterwarnings( + "ignore", + message=( + ".*GeometryTypeError will derive from ShapelyError " + "and not TypeError or ValueError in Shapely 2.0*" + ) + ) + network.nodes = snkit.network.drop_duplicate_geometries(network.nodes) + + logging.info("Snapping nodes to edges") + network = snkit.network.snap_nodes(network) + + logging.info("Adding endpoints") + network = snkit.network.add_endpoints(network) + + logging.info("Splitting edges at nodes") + network = snkit.network.split_edges_at_nodes(network) + + # check we have only linestrings + assert set(network.edges.geometry.type.values) == {"LineString"} + + logging.info("Renaming nodes and edges") + network = snkit.network.add_ids(network, edge_prefix=id_prefix, node_prefix=id_prefix) + + logging.info("Creating network topology") + network = snkit.network.add_topology(network, id_col="id") + + return network + + +def duplicate_reverse_and_append_edges(edges: pd.DataFrame) -> pd.DataFrame: + """ + Given edges with `from_id`, `to_id`, `from_iso_a4` and `to_iso_a3` columns, + create duplicate edges with direction reversed. + + Args: + edges: Table of edges to reverse and append to + + Returns: + Table consisting of original edges and their reversed duplicates. + """ + reversed_edges = edges.copy() + reversed_edges.from_id = edges.to_id + reversed_edges.to_id = edges.from_id + reversed_edges.from_iso_a3 = edges.to_iso_a3 + reversed_edges.to_iso_a3 = edges.from_iso_a3 + return pd.concat([edges, reversed_edges]) + + +def clean_maxspeed(value: str, default_km_h: float, min_km_h = 20, max_km_h = 140) -> float: + """ + Cast, check and return the value of OSM maxspeed tag. + + Args: + value: Speed limit value to clean + default_km_h: Where data is missing or obviously wrong, return this value. + + Returns: + Hopefully a sensible numeric speed limit value in km h-1 + """ + + try: + speed_km_h = float(value) + except ValueError: + return default_km_h + + if np.isnan(speed_km_h): + return default_km_h + elif (speed_km_h < min_km_h) or (speed_km_h > max_km_h): + return default_km_h + elif (speed_km_h >= min_km_h) and (speed_km_h <= max_km_h): + return speed_km_h + else: + raise RuntimeError(f"Unforeseen consequences with {speed_km_h=}") + + +def preprocess_road_network( + nodes_path: str, + edges_path: str, + filter_iso_a3: set[str], + cost_USD_t_km: float, + cost_USD_t_h: float, + directional: float, + default_max_speed_km_h: float, +) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: + """ + Preprocess road network data into a suitable format for multi-modal routing. + Find length of edges, calculate cost. Relabel IDs to include mode so they're unique across networks. + + Args: + nodes_path: Path to nodes geoparquet file on disk + edges_path: Path to edges geoparquet file on disk + filter_iso_a3: ISO A3 codes of countries to keep features for + cost_USD_t_km: Cost of transporting goods in USD per tonne km (broadly, cost of fuel) + cost_USD_t_h: Cost of transporting goods in USD per tonne h (broadly, wages) + directional: Whether to duplicate, reverse and append edges (from_id and to_id switched) + default_max_speed_km_h: Value to gap fill speed limit data with + + Returns: + Nodes and edges as two GeoDataFrames. + """ + + edges = gpd.read_parquet(edges_path) + # at least one foot in the countries in question + edges = edges[edges.from_iso_a3.isin(filter_iso_a3) | edges.to_iso_a3.isin(filter_iso_a3)] + edges["distance_km"] = edges.geometry.to_crs(edges.estimate_utm_crs()).length / 1_000 + + edges["mode"] = "road" + edges["max_speed_km_h"] = edges.tag_maxspeed.apply(clean_maxspeed, args=(default_max_speed_km_h,)) + edges["avg_speed_km_h"] = edges.max_speed_km_h.apply(lambda x: np.clip(2/3 * x, None, default_max_speed_km_h)) + + edges["cost_USD_t"] = cost_USD_t_km * edges["distance_km"] + cost_USD_t_h * edges["distance_km"] * 1 / edges["avg_speed_km_h"] + edges["id"] = edges.apply(lambda row: f"{row['mode']}_{row['id']}", axis=1) + edges["to_id"] = edges.apply(lambda row: f"{row['mode']}_{row['to_id']}", axis=1) + edges["from_id"] = edges.apply(lambda row: f"{row['mode']}_{row['from_id']}", axis=1) + + if directional: + edges = duplicate_reverse_and_append_edges(edges) + + nodes = gpd.read_parquet(nodes_path) + nodes["mode"] = "road" + nodes["id"] = nodes.apply(lambda row: f"{row['mode']}_{row['id']}", axis=1) + + return nodes, edges + + +def preprocess_rail_network( + nodes_path: str, + edges_path: str, + filter_iso_a3: set[str], + cost_USD_t_km: float, + cost_USD_t_h: float, + directional: float, + avg_speed_km_h: float, +) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: + """ + Preprocess rail network data into a suitable format for multi-modal routing. + Find length of edges, calculate cost. Relabel IDs to include mode so they're unique across networks. + + Args: + nodes_path: Path to nodes geoparquet file on disk + edges_path: Path to edges geoparquet file on disk + filter_iso_a3: ISO A3 codes of countries to keep features for + cost_USD_t_km: Cost of transporting goods in USD per tonne km (broadly, cost of fuel) + cost_USD_t_h: Cost of transporting goods in USD per tonne h (broadly, wages) + directional: Whether to duplicate, reverse and append edges (from_id and to_id switched) + avg_speed_km_h: Average speed of trains across network, used for all edges + + Returns: + Nodes and edges as two GeoDataFrames. + """ + + edges = gpd.read_parquet(edges_path) + # at least one foot in the countries in question + edges = edges[edges.from_iso_a3.isin(filter_iso_a3) | edges.to_iso_a3.isin(filter_iso_a3)] + edges["distance_km"] = edges.geometry.to_crs(edges.estimate_utm_crs()).length / 1_000 + + edges["mode"] = "rail" + + edges["cost_USD_t"] = cost_USD_t_km * edges["distance_km"] + cost_USD_t_h * edges["distance_km"] * 1 / avg_speed_km_h + edges["id"] = edges.apply(lambda row: f"{row['mode']}_{row['id']}", axis=1) + edges["to_id"] = edges.apply(lambda row: f"{row['mode']}_{row['to_id']}", axis=1) + edges["from_id"] = edges.apply(lambda row: f"{row['mode']}_{row['from_id']}", axis=1) + + if directional: + edges = duplicate_reverse_and_append_edges(edges) + + nodes = gpd.read_parquet(nodes_path) + nodes["mode"] = "rail" + nodes["id"] = nodes.apply(lambda row: f"{row['mode']}_{row['id']}", axis=1) + + return nodes, edges + + +def preprocess_maritime_network(nodes_path: str, edges_path: str) -> tuple[gpd.GeoDataFrame, pd.DataFrame]: + """ + Preprocess maritime network data into a suitable format for multi-modal routing. + Relabel IDs so they're unique across networks. + + Args: + nodes_path: Path to nodes geoparquet file on disk + edges_path: Path to edges parquet file on disk + + Returns: + Nodes GeoDataFrame and edges DataFrame. + """ + edges = pd.read_parquet(edges_path).rename(columns={"from_iso3": "from_iso_a3", "to_iso3": "to_iso_a3"}) + edges["mode"] = "maritime" + edges["cost_USD_t"] = edges["distance_km"] * edges["cost_USD_t_km"] + + nodes = gpd.read_parquet(nodes_path) + nodes = nodes.rename(columns={"iso3": "iso_a3"}) + nodes = nodes.drop(columns=["Continent_Code"]) + ports_mask = nodes.infra == "port" + + # we want to connect our road and rail nodes to the port_land node of the port_in, port_out, port_land trifecta + nodes.loc[ports_mask, "id"] = nodes.loc[ports_mask, :].apply(lambda row: f"{row.id}_land", axis=1) + + return nodes, edges + + +def find_nearest_points( + a: gpd.GeoDataFrame, + b: gpd.GeoDataFrame, + b_id_col: str, +) -> gpd.GeoDataFrame: + """ + Given two GeoDataFrames of point locations, `a` and `b`, for each point in `a`, find the closest in `b`. + + Modified from: + https://gis.stackexchange.com/questions/222315/finding-nearest-point-in-other-geodataframe-using-geopandas + + a: Table of points to start from, we run over every row here + b: Table of candidate closest points + b_id_col: Name of column in b identifying points + + Returns: + `a`, joined with the values from `b_id_col` from the points of `b` which are closest + """ + + # find nearest point in b for each and every point in a + tree = cKDTree(b.geometry.get_coordinates().to_numpy()) + distances, indicies = tree.query(a.geometry.get_coordinates().to_numpy(), k=1) + nearest_points = b.iloc[indicies][[b_id_col, "geometry"]] \ + .reset_index(drop=True).rename(columns={"geometry": "nearest_node_geometry"}) + + return pd.concat( + [ + a.reset_index(drop=True), + nearest_points, + pd.Series(data=distances, name='distance') + ], + axis=1 + ) + + +def create_edges_to_nearest_nodes( + a: gpd.GeoDataFrame, + b: gpd.GeoDataFrame, + max_distance_m: float, + projected_coordinate_system: pyproj.crs.crs.CRS, +) -> gpd.GeoDataFrame: + """ + Given two sets of nodes, a and b, loop through nodes in a, finding the closest + node in b (which is less than `max_distance_m` away). Create a linear linestring + connecting these points. + + Args: + a: Table of nodes to connect from, containing GeoSeries of point locations. + Must contain "id", "iso_a3" and "geometry" columns. + b: Table of candidate notes to connect to, containing Geoseries of point locations. + Must contain "id" and "geometry" columns. + max_distance_m: Edges only created if their span in metres is equal to or less than this value. + projected_coordinate_system: Project points to this CRS (must use metres!) before estimating distances. + + Returns: + Table of linking edges. + """ + + point_pairs = find_nearest_points( + a.to_crs(projected_coordinate_system), + b.to_crs(projected_coordinate_system).rename(columns={"id": "nearest_node_id"}), + "nearest_node_id" + ).rename(columns={"distance": "distance_m"}) + + point_pairs = point_pairs[point_pairs.distance_m < max_distance_m] + + edges = point_pairs.apply( + lambda row: { + "from_id": row.id, + "to_id": row.nearest_node_id, + "from_iso_a3": row.iso_a3, + "to_iso_a3": row.iso_a3, # assume link does not cross a border + "geometry": LineString([row.geometry, row.nearest_node_geometry]), + "distance_m": row.distance_m + }, + axis=1, + result_type="expand" + ) + + return gpd.GeoDataFrame(edges).reset_index(drop=True).set_crs(projected_coordinate_system) + + +def find_importing_node_id(row: pd.Series, exporting_country: str) -> str: + """ + Return the node id lying in the importing country + + Args: + row: Table row with columns from_iso_a3, to_iso_a3, from_id and to_id + exporting_country: ISO A3 code of exporting country + + Returns + node id + """ + if row.from_iso_a3 == exporting_country and row.to_iso_a3 != exporting_country: + return row.to_id + elif row.from_iso_a3 != exporting_country and row.to_iso_a3 == exporting_country: + return row.from_id + else: + raise RuntimeError + + +def create_edges_to_destination_countries( + origin_nodes: gpd.GeoDataFrame, + destination_country_nodes: gpd.GeoDataFrame, + cost_USD_t: float = 1E6, +) -> gpd.GeoDataFrame: + """ + Create edges between nodes within the same country of zero cost. + + Args: + origin_nodes: Table of origin nodes to create edges from + destination_country_nodes: Table of destination nodes to connect to, one per country + cost_USD_t: Cost of traversing this edge, in USD per tonne. Defaults to a very high + value, so that route allocations will only use these edges as a final connection + to the destination, rather than a general means of traversal. + + Returns: + Table of edges of same length as origin_nodes, connecting these to destination_country_nodes + """ + + assert len(destination_country_nodes.iso_a3) == len(destination_country_nodes.iso_a3.unique()) + assert origin_nodes.crs == destination_country_nodes.crs + + def make_edge(row: pd.Series) -> dict: + destination = destination_country_nodes.set_index("iso_a3").loc[row.iso_a3] + return { + "from_id": row.id, + "to_id": destination.id, + "from_iso_a3": row.iso_a3, + "to_iso_a3": row.iso_a3, + "mode": "imaginary", + "geometry": LineString( + [ + row.geometry, + destination.geometry + ] + ), + "cost_USD_t": cost_USD_t + } + + edges = origin_nodes.apply( + make_edge, + axis=1, + result_type="expand" + ) + + return gpd.GeoDataFrame(edges).reset_index(drop=True).set_crs(origin_nodes.crs) + + +def path_edges_from_ordered_id_list(path_node_ids: list[str], edges: gpd.GeoDataFrame) -> gpd.GeoDataFrame: + """ + If `path_node_ids` are sequential nodes forming a path through a graph made of `edges`, + return the subset of ordered edges connecting these nodes. + + Args: + path_node_ids: Sequential node ids of some path through graph + edges: Set of edges containing possible path edges + + Returns: + Ordered subset of `edges` corresponding to path prescribed by `path_node_ids` + """ + route_edges = [] + for i, from_node_id in enumerate(path_node_ids[:-1]): + to_node_id = path_node_ids[i + 1] + edge = edges[(edges.from_id == from_node_id) & (edges.to_id == to_node_id)] + assert len(edge) == 1 + route_edges.append(edge) + return pd.concat(route_edges) diff --git a/workflow/Snakefile b/workflow/Snakefile index 30c76f5b..616db8c0 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -76,27 +76,29 @@ if not config["best_estimate_windspeed_failure_threshold"] in config["transmissi # Constrain wildcards to NOT use _ or / wildcard_constraints: + ADMIN_SLUG="admin-level-[0-4]", + AGG_FUNC_SLUG="agg-sum", + CHUNK_SLUG="chunk-[\d]+", + COST_OR_FRACTION="cost|fraction", + DATASET="[^_/]+", + DIRECT_DAMAGE_TYPES="fraction_per_RP|cost_per_RP|EAD|EAD_and_cost_per_RP", + EVENTS_OR_FIXED="events|fixed", + FILTER_SLUG="filter-[^_/]+", + FILENAME="[^/]+", + HAZARD_SLUG="hazard-[^_/]+", + IRIS_SCENARIO="PRESENT|SSP1-2050|SSP2-2050|SSP5-2050", # the output dir must end in 'results' # e.g. '/data/open-gira/results', './results', '/my-results', etc. # this prevents us matching past it into other folders for an incorrect OUTPUT_DIR, # but is more flexible than reading a value from, for example, config.yaml OUTPUT_DIR="^.*results", - DATASET="[^_/]+", + PROJECT_SLUG="project-[^_/]+", SLICE_SLUG="slice-[0-9]+", - FILTER_SLUG="filter-[^_/]+", - HAZARD_SLUG="hazard-[^_/]+", - ADMIN_SLUG="admin-level-[0-4]", - AGG_FUNC_SLUG="agg-sum", - FILENAME="[^/]+", STORM_BASIN="EP|NA|NI|SI|SP|WP", - STORM_RP="[0-9]+", - IRIS_SCENARIO="PRESENT|SSP1-2050|SSP2-2050|SSP5-2050", STORM_MODEL="constant|CMCC-CM2-VHR4|CNRM-CM6-1-HR|EC-Earth3P-HR|HadGEM3-GC31-HM", STORM_MODEL_FUTURE="CMCC-CM2-VHR4|CNRM-CM6-1-HR|EC-Earth3P-HR|HadGEM3-GC31-HM", + STORM_RP="[0-9]+", STORM_SET="(?:IBTrACS|STORM|IRIS)[^\/]*", - EVENTS_OR_FIXED="events|fixed", - COST_OR_FRACTION="cost|fraction", - DIRECT_DAMAGE_TYPES="fraction_per_RP|cost_per_RP|EAD|EAD_and_cost_per_RP", SAMPLE="\d+", # may be upper or lower, one 'f' or two TIFF_FILE="[^\/\.\s]+\.[tT][iI][fF][fF]?", @@ -130,6 +132,8 @@ include: "transport/create_network.smk" include: "transport/osm_to_geoparquet.smk" include: "transport/create_overall_bbox.smk" include: "transport/join_data.smk" +include: "transport/maritime/maritime.smk" +include: "transport/multi-modal/multi_modal.smk" include: "flood/aqueduct.smk" include: "flood/trim_hazard_data.smk" diff --git a/workflow/transport/create_rail_network.py b/workflow/transport/create_rail_network.py index fa230a62..19376a94 100644 --- a/workflow/transport/create_rail_network.py +++ b/workflow/transport/create_rail_network.py @@ -14,7 +14,7 @@ from open_gira.admin import get_administrative_data from open_gira.assets import RailAssets from open_gira.io import write_empty_frames -from open_gira.network import create_network +from open_gira.network_creation import create_network from open_gira.utils import str_to_bool diff --git a/workflow/transport/create_road_network.py b/workflow/transport/create_road_network.py index 44e07f35..3a76afc6 100644 --- a/workflow/transport/create_road_network.py +++ b/workflow/transport/create_road_network.py @@ -17,7 +17,7 @@ from open_gira.admin import get_administrative_data from open_gira.assets import RoadAssets from open_gira.io import write_empty_frames -from open_gira.network import create_network +from open_gira.network_creation import create_network from open_gira.utils import str_to_bool diff --git a/workflow/transport/maritime/maritime.smk b/workflow/transport/maritime/maritime.smk new file mode 100644 index 00000000..2e50a09b --- /dev/null +++ b/workflow/transport/maritime/maritime.smk @@ -0,0 +1,155 @@ +rule create_maritime_network: + input: + nodes = "{OUTPUT_DIR}/input/networks/maritime/nodes.gpq", + edges_no_geom = "{OUTPUT_DIR}/input/networks/maritime/edges_by_cargo/maritime_base_network_general_cargo.pq", + edges_visualisation = "{OUTPUT_DIR}/input/networks/maritime/edges.gpq", + output: + nodes = "{OUTPUT_DIR}/maritime_network/nodes.gpq", + edges = "{OUTPUT_DIR}/maritime_network/edges.gpq", + run: + import igraph as ig + import geopandas as gpd + from shapely.geometry import Point + from shapely.ops import linemerge + from tqdm import tqdm + + from open_gira.network_creation import preprocess_maritime_network + + # possible cargo types = ("container", "dry_bulk", "general_cargo", "roro", "tanker") + # for now, just use 'general_cargo' + maritime_nodes, maritime_edges_no_geom = preprocess_maritime_network( + input.nodes, + input.edges_no_geom + ) + + if config["study_country_iso_a3"] == "THA": + # put Bangkok port in the right place... + maritime_nodes.loc[maritime_nodes.name == "Bangkok_Thailand", "geometry"] = Point((100.5753, 13.7037)) + + # %% + # Jasper's maritime edges in 'edges_by_cargo' do not contain geometry + # this is because the AIS data that they were derived from only contain origin and destination port, not route + # this is a pain for visualisation, so we will create a geometry for each from `maritime_vis_edges` + + maritime_vis_edges = gpd.read_parquet(input.edges_visualisation) + vis_graph = ig.Graph.DataFrame(maritime_vis_edges, directed=True, use_vids=False) + + maritime_edges = maritime_edges_no_geom.copy() + change_of_port_mask = maritime_edges_no_geom.from_port != maritime_edges_no_geom.to_port + port_pairs_to_generate_geom_for = maritime_edges_no_geom[change_of_port_mask] + for index, row in tqdm(port_pairs_to_generate_geom_for.iterrows(), total=len(port_pairs_to_generate_geom_for)): + edge_list = vis_graph.get_shortest_path(row.from_port, row.to_port, weights="distance", output="epath") + route_edges = maritime_vis_edges.iloc[edge_list] + route_linestring = linemerge(list(route_edges.geometry)) + maritime_edges.loc[index, "geometry"] = route_linestring + + maritime_edges = gpd.GeoDataFrame(maritime_edges).set_crs(epsg=4326) + + maritime_nodes.to_parquet(output.nodes) + maritime_edges.to_parquet(output.edges) + + +rule plot_maritime_network: + input: + nodes = "{OUTPUT_DIR}/maritime_network/nodes.gpq", + edges = "{OUTPUT_DIR}/maritime_network/edges.gpq", + output: + edges_plot = "{OUTPUT_DIR}/maritime_network/edges.png", + run: + import geopandas as gpd + import matplotlib + import matplotlib.pyplot as plt + import numpy as np + + from open_gira.plot.utils import chop_at_antimeridian + + matplotlib.use("Agg") + plt.style.use("bmh") + + world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres')) + world.geometry = world.geometry.boundary + + maritime_nodes = gpd.read_parquet(input.nodes) + maritime_edges = gpd.read_parquet(input.edges) + + # whole network + f, ax = plt.subplots(figsize=(16, 7)) + chop_at_antimeridian(maritime_edges, drop_null_geometry=True).plot( + ax=ax, + linewidth=0.5, + alpha=0.8 + ) + world.plot(ax=ax, lw=0.5, alpha=0.2) + ax.set_xticks(np.linspace(-180, 180, 13)) + ax.set_yticks([-60, -30, 0, 30, 60]) + ax.set_ylim(-65, 85) + ax.set_xlim(-180, 180) + ax.grid(alpha=0.3) + ax.set_xlabel("Longitude [deg]") + ax.set_ylabel("Latitude [deg]") + f.savefig(output.edges_plot) + + +rule plot_port_connections: + input: + nodes = "{OUTPUT_DIR}/maritime_network/nodes.gpq", + edges = "{OUTPUT_DIR}/maritime_network/edges.gpq", + output: + port_trade_plots = directory("{OUTPUT_DIR}/maritime_network/port_trade_plots"), + run: + import os + + import geopandas as gpd + import matplotlib + import matplotlib.pyplot as plt + from tqdm import tqdm + + from open_gira.plot.utils import chop_at_antimeridian + + matplotlib.use("Agg") + plt.style.use("bmh") + + maritime_nodes = gpd.read_parquet(input.nodes) + maritime_edges = gpd.read_parquet(input.edges) + + # disambiguate the global view and plot the routes from each port, one port at a time + world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres')) + world.geometry = world.geometry.boundary + + ports = maritime_edges.from_port.unique() + os.makedirs(output.port_trade_plots) + for port_id in tqdm(ports): + filepath = os.path.join(output.port_trade_plots, f"{port_id}.png") + f, ax = plt.subplots(figsize=(10,10)) + port = maritime_nodes[maritime_nodes.id == f"{port_id}_land"] + routes = maritime_edges[maritime_edges.from_port == port_id] + if all(routes.geometry.isna()): + continue + try: + chop_at_antimeridian(routes, drop_null_geometry=True).plot( + column="to_port", + categorical=True, + ax=ax + ) + except ValueError: + print(f"Failed to plot {port_id}, skipping...") + continue + maritime_nodes[maritime_nodes.id == f"{port_id}_land"].plot( + ax=ax, + markersize=500, + marker="*", + facecolor="none", + color="r" + ) + xmin, xmax = ax.get_xlim() + ymin, ymax = ax.get_ylim() + world.plot(ax=ax, linewidth=0.5, alpha=0.4) + ax.set_xlim(xmin, xmax) + ax.set_ylim(ymin, ymax) + port_name, = port.name + ax.set_title(f"{port_id} ({port_name.replace('_', ', ')}) estimated routes") + ax.get_xaxis().set_visible(False) + ax.get_yaxis().set_visible(False) + + f.savefig(filepath) + plt.close(f) \ No newline at end of file diff --git a/workflow/transport/multi-modal/multi_modal.py b/workflow/transport/multi-modal/multi_modal.py new file mode 100644 index 00000000..47290319 --- /dev/null +++ b/workflow/transport/multi-modal/multi_modal.py @@ -0,0 +1,202 @@ + +import geopandas as gpd +import numpy as np +import matplotlib +import matplotlib.patches as mpatches +import matplotlib.pyplot as plt +import pandas as pd + +from open_gira.network_creation import ( + duplicate_reverse_and_append_edges, preprocess_road_network, + preprocess_rail_network, create_edges_to_nearest_nodes, + find_importing_node_id, create_edges_to_destination_countries +) +from open_gira.routing import DESTINATION_LINK_COST_USD_T + +matplotlib.use("Agg") + +if __name__ == "__main__": + + study_country: str = snakemake.config["study_country_iso_a3"] + + print("Preprocessing road network...") + road_nodes, road_edges = preprocess_road_network( + snakemake.input.road_network_nodes, + snakemake.input.road_network_edges, + {study_country,}, + snakemake.config["road_cost_USD_t_km"], + snakemake.config["road_cost_USD_t_h"], + True, + snakemake.config["road_default_speed_limit_km_h"] + ) + + print("Preprocessing rail network...") + rail_nodes, rail_edges = preprocess_rail_network( + snakemake.input.rail_network_nodes, + snakemake.input.rail_network_edges, + {study_country,}, + snakemake.config["rail_cost_USD_t_km"], + snakemake.config["rail_cost_USD_t_h"], + True, + snakemake.config["rail_average_freight_speed_km_h"] + ) + + print("Reading maritime network...") + maritime_nodes = gpd.read_parquet(snakemake.input.maritime_nodes) + maritime_edges = gpd.read_parquet(snakemake.input.maritime_edges) + + maximum_intermodal_connection_metres = 2_000 + + print("Making intermodal connections...") + # road-rail + rail_road_edges = create_edges_to_nearest_nodes( + rail_nodes.loc[rail_nodes.station == True, ["id", "iso_a3", "geometry"]], + road_nodes.loc[:, ["id", "geometry"]], + maximum_intermodal_connection_metres, + rail_nodes.estimate_utm_crs() + ).to_crs(epsg=4326) + rail_road_edges["mode"] = "road_rail" + + # road-maritime + maritime_road_edges = create_edges_to_nearest_nodes( + maritime_nodes.loc[ + (maritime_nodes.infra == "port") & (maritime_nodes.iso_a3 == study_country), + ["id", "iso_a3", "geometry"] + ], + road_nodes.loc[:, ["id", "geometry"]], + maximum_intermodal_connection_metres, + road_nodes.estimate_utm_crs() + ).to_crs(epsg=4326) + maritime_road_edges["mode"] = "maritime_road" + + # rail-maritime + maritime_rail_edges = create_edges_to_nearest_nodes( + maritime_nodes.loc[ + (maritime_nodes.infra == "port") & (maritime_nodes.iso_a3 == study_country), + ["id", "iso_a3", "geometry"] + ], + rail_nodes.loc[rail_nodes.station == True, ["id", "geometry"]], + maximum_intermodal_connection_metres, + road_nodes.estimate_utm_crs() + ).to_crs(epsg=4326) + maritime_rail_edges["mode"] = "maritime_rail" + + intermodal_edges = pd.concat( + [ + rail_road_edges, + maritime_road_edges, + maritime_rail_edges + ] + ).to_crs(epsg=4326) + # as the maritime edges are directional, we're making road, rail and intermodal directional too (so duplicate) + intermodal_edges = duplicate_reverse_and_append_edges(intermodal_edges) + + intermodal_cost_USD_t = snakemake.config["intermodal_cost_USD_t"] + intermodal_edges["cost_USD_t"] = intermodal_edges["mode"].map(intermodal_cost_USD_t) + + # concatenate different kinds of nodes and edges + node_cols = ["id", "iso_a3", "geometry"] + nodes = gpd.GeoDataFrame( + pd.concat( + [ + road_nodes.loc[:, node_cols], + rail_nodes.loc[:, node_cols], + maritime_nodes.loc[:, node_cols] + ] + ), + crs=4326 + ) + + edge_cols = ["from_id", "to_id", "from_iso_a3", "to_iso_a3", "mode", "cost_USD_t", "geometry"] + edges = pd.concat( + [ + intermodal_edges.loc[:, edge_cols], + road_edges.loc[:, edge_cols], + rail_edges.loc[:, edge_cols], + maritime_edges.loc[:, edge_cols] + ] + ) + # add nodes for destination countries (not null, not origin country) + # neighbouring countries will have destination node connected to border crossings + countries = nodes.iso_a3.unique() + countries = countries[countries != np.array(None)] + countries = set(countries) + countries.remove(study_country) + + admin_boundaries = gpd.read_parquet(snakemake.input.admin_boundaries) + country_nodes = admin_boundaries.set_index("GID_0").loc[list(countries), ["geometry"]] \ + .sort_index().reset_index().rename(columns={"GID_0": "iso_a3"}) + country_nodes["id"] = country_nodes.apply(lambda row: f"GID_0_{row.iso_a3}", axis=1) + country_nodes.geometry = country_nodes.geometry.representative_point() + destination_country_nodes = country_nodes.loc[:, ["id", "iso_a3", "geometry"]] + + nodes = pd.concat( + [ + nodes, + destination_country_nodes + ] + ) + + # find nodes which lie on far side of border crossing + border_crossing_mask = \ + (edges.from_iso_a3 != edges.to_iso_a3) \ + & ((edges.from_iso_a3 == study_country) | (edges.to_iso_a3 == study_country)) \ + & ((edges["mode"] == "road") | (edges["mode"] == "rail")) \ + + importing_node_ids = edges[border_crossing_mask].apply(find_importing_node_id, exporting_country=study_country, axis=1) + importing_nodes = nodes.set_index("id").loc[importing_node_ids].reset_index() + # two importing nodes are labelled as THA, drop these + importing_nodes = importing_nodes[importing_nodes.iso_a3 != study_country] + + # connect these nodes to their containing country + land_border_to_importing_country_edges = \ + create_edges_to_destination_countries(importing_nodes, destination_country_nodes) + land_border_to_importing_country_edges.plot() + + print("Plotting land border crossings for inspection...") + # plot THA land border crossing points for sanity + to_plot = importing_nodes + country_ints, labels = pd.factorize(to_plot["iso_a3"]) + unique_country_ints = [] + cmap = plt.get_cmap("viridis") + colours = [cmap(x) for x in country_ints / max(country_ints)] + [unique_country_ints.append(c) for c in colours if c not in unique_country_ints] + colour_map = dict(zip(labels, unique_country_ints)) + f, ax = plt.subplots() + to_plot.plot(color=colours, ax=ax) + ax.set_title("Land border crossing points") + patches = [mpatches.Patch(color=colour, label=label) for label, colour in colour_map.items()] + ax.legend(handles=patches) + f.savefig(snakemake.output.border_crossing_plot) + + print("Making terminal connections to destination countries...") + # connect foreign ports to their country with new edges + foreign_ports = maritime_nodes[maritime_nodes.infra=="port"] + foreign_ports = foreign_ports[foreign_ports.iso_a3 != study_country] + port_to_importing_countries_edges = create_edges_to_destination_countries( + foreign_ports, + destination_country_nodes, + DESTINATION_LINK_COST_USD_T + ) + + # add in edges connecting destination countries to THA land borders and foreign ports + edges = pd.concat( + [ + edges.loc[:, edge_cols], + duplicate_reverse_and_append_edges(land_border_to_importing_country_edges.loc[:, edge_cols]), + duplicate_reverse_and_append_edges(port_to_importing_countries_edges.loc[:, edge_cols]), + ] + ).reset_index(drop=True) + + # there are duplicate edges (repeated from_id -> to_id pairs), drop these here + edges["unique_edge_id"] = edges.apply(lambda row: f"{row.from_id}_{row.to_id}", axis=1) + edges = edges[~edges.unique_edge_id.duplicated(keep="first")].drop(columns=["unique_edge_id"]) + + print("Write out network to disk as geoparquet...") + # write out global multi-modal transport network to disk + # reset indicies to 0-start integers + # these will correspond to igraph's internal edge/vertex ids + nodes = nodes.reset_index(drop=True) + nodes.to_parquet(snakemake.output.nodes) + edges = edges.reset_index(drop=True) + edges.to_parquet(snakemake.output.edges) \ No newline at end of file diff --git a/workflow/transport/multi-modal/multi_modal.smk b/workflow/transport/multi-modal/multi_modal.smk new file mode 100644 index 00000000..ff3061de --- /dev/null +++ b/workflow/transport/multi-modal/multi_modal.smk @@ -0,0 +1,75 @@ +rule create_multi_modal_network: + """ + Take previously created road, rail and maritime networks and combine them + into a single multi-modal network with intermodal connections within + distance limit of: any road node, any rail station and any maritime port. + """ + input: + admin_boundaries = "{OUTPUT_DIR}/input/admin-boundaries/admin-level-0.geoparquet", + road_network_nodes = "{OUTPUT_DIR}/input/networks/road/{PROJECT_SLUG}/nodes.gpq", + road_network_edges = "{OUTPUT_DIR}/input/networks/road/{PROJECT_SLUG}/edges.gpq", + rail_network_nodes = "{OUTPUT_DIR}/input/networks/rail/{PROJECT_SLUG}/nodes.gpq", + rail_network_edges = "{OUTPUT_DIR}/input/networks/rail/{PROJECT_SLUG}/edges.gpq", + maritime_nodes = "{OUTPUT_DIR}/maritime_network/nodes.gpq", + maritime_edges = "{OUTPUT_DIR}/maritime_network/edges.gpq", + output: + border_crossing_plot = "{OUTPUT_DIR}/multi-modal_network/{PROJECT_SLUG}/border_crossings.png", + nodes = "{OUTPUT_DIR}/multi-modal_network/{PROJECT_SLUG}/nodes.gpq", + edges = "{OUTPUT_DIR}/multi-modal_network/{PROJECT_SLUG}/edges.gpq", + script: + "./multi_modal.py" + + +rule remove_edges_in_excess_of_threshold: + """ + Take part of multi-modal network and remove edges that experience hazard + values in excess of a given threshold. + """ + input: + edges = "{OUTPUT_DIR}/multi-modal_network/{PROJECT_SLUG}/edges.gpq", + raster = "{OUTPUT_DIR}/hazard/{HAZARD_SLUG}.tif", + output: + edges = "{OUTPUT_DIR}/multi-modal_network/{PROJECT_SLUG}/{HAZARD_SLUG}/chunks/{CHUNK_SLUG}/edges.gpq" + run: + import geopandas as gpd + import numpy as np + + from open_gira.disruption import filter_edges_by_raster + + i = int(wildcards.CHUNK_SLUG.split("-")[-1]) + edges = gpd.read_parquet(input.edges) + edges = edges.loc[edges["mode"].isin({"road", "rail"}), :] + chunk_size = int(np.ceil(len(edges) / workflow.cores)) + print(f"Chunk {i}: intersecting edges [{i * chunk_size}: {(i + 1) * chunk_size}]") + + surviving_edges = filter_edges_by_raster( + edges.iloc[i * chunk_size: (i + 1) * chunk_size, :], + input.raster, + float(config["edge_failure_threshold"]) + ) + + surviving_edges.to_parquet(output.edges) + + +rule join_intersection_results: + """ + Pull together chunks of intersection operation to remove edges exceeding threshold value. + """ + input: + all_edges = "{OUTPUT_DIR}/multi-modal_network/{PROJECT_SLUG}/edges.gpq", + intersected_edges = expand( + "{{OUTPUT_DIR}}/multi-modal_network/{{PROJECT_SLUG}}/{{HAZARD_SLUG}}/chunks/chunk-{chunk}/edges.gpq", + chunk=range(0, workflow.cores) + ) + output: + edges = "{OUTPUT_DIR}/multi-modal_network/{PROJECT_SLUG}/{HAZARD_SLUG}/edges.gpq", + run: + import geopandas as gpd + import pandas as pd + + all_edges = gpd.read_parquet(input.all_edges) + + intersected_edges_post_hazard = pd.concat([gpd.read_parquet(path) for path in input.intersected_edges]) + edges = pd.concat([all_edges.loc[~all_edges["mode"].isin({"road", "rail"}), :], intersected_edges_post_hazard]) + + edges.to_parquet(output.edges) \ No newline at end of file