From b1c78722c2e33d1ef4c9ce60b6968440e83ceed4 Mon Sep 17 00:00:00 2001 From: Toru Seo <34780089+toruseo@users.noreply.github.com> Date: Tue, 17 Sep 2024 15:10:37 +0900 Subject: [PATCH] relocate 'get_shortest_path_*()' to `Utilities` submodule --- tests/test_other_functions.py | 104 +++++++++++++++++++--------------- uxsim/Utilities/Utilities.py | 73 +++++++++++++++++++++++- uxsim/uxsim.py | 66 +-------------------- 3 files changed, 131 insertions(+), 112 deletions(-) diff --git a/tests/test_other_functions.py b/tests/test_other_functions.py index dabdf47..44bca2c 100644 --- a/tests/test_other_functions.py +++ b/tests/test_other_functions.py @@ -3,6 +3,7 @@ """ import pytest +from numpy import * from uxsim import * from uxsim.Utilities import * @@ -405,7 +406,7 @@ def test_shortest_path_costs(): W.analyzer.cumulative_curves(figsize=(4,2)) - spd = W.get_shortest_path_distance_between_all_nodes() + spd = get_shortest_path_distance_between_all_nodes(W) assert spd["orig", "dest"] == 2000 assert spd["orig", "mid1"] == 1000 assert spd["orig", "mid2"] == 1000 @@ -415,11 +416,11 @@ def test_shortest_path_costs(): assert spd[orig, dest] == 2000 assert spd[dest, orig] == np.Inf - spd = W.get_shortest_path_distance_between_all_nodes(return_matrix=True) + spd = get_shortest_path_distance_between_all_nodes(W, return_matrix=True) assert spd[0, 3] == 2000 assert spd[3, 0] == np.Inf - spt = W.get_shortest_path_instantaneous_travel_time_between_all_nodes() + spt = get_shortest_path_instantaneous_travel_time_between_all_nodes(W) assert equal_tolerance(spt["orig", "dest"], 150) assert equal_tolerance(spt["orig", "mid1"], 50, rel_tol=0.2) assert equal_tolerance(spt["orig", "mid2"], 150) @@ -429,7 +430,7 @@ def test_shortest_path_costs(): assert equal_tolerance(spt[orig, dest], 150) assert spt[dest, orig] == np.Inf - spt = W.get_shortest_path_instantaneous_travel_time_between_all_nodes(return_matrix=True) + spt = get_shortest_path_instantaneous_travel_time_between_all_nodes(W, return_matrix=True) assert equal_tolerance(spt[0, 3], 150) assert spt[3, 0] == np.Inf @@ -569,55 +570,66 @@ def test_area2area_demand_and_stats(): @pytest.mark.flaky(reruns=10) def test_area_stats(): - W = World( - name="", - deltan=10, - tmax=3000, - print_mode=1, save_mode=1, show_mode=0, - random_seed=None, - ) - - n_nodes = 4 - imax = n_nodes - jmax = n_nodes - nodes = {} - for i in range(imax): - for j in range(jmax): - nodes[i,j] = W.addNode(f"n{(i,j)}", i, j, flow_capacity=1.6) - - links = {} - for i in range(imax): - for j in range(jmax): - if i != imax-1: - links[i,j,i+1,j] = W.addLink(f"l{(i,j,i+1,j)}", nodes[i,j], nodes[i+1,j], length=1000) - if i != 0: - links[i,j,i-1,j] = W.addLink(f"l{(i,j,i-1,j)}", nodes[i,j], nodes[i-1,j], length=1000) - if j != jmax-1: - links[i,j,i,j+1] = W.addLink(f"l{(i,j,i,j+1)}", nodes[i,j], nodes[i,j+1], length=1000) - if j != 0: - links[i,j,i,j-1] = W.addLink(f"l{(i,j,i,j-1)}", nodes[i,j], nodes[i,j-1], length=1000) + rec_volume_areaN = [] + rec_volume_areaS = [] + rec_ttt_areaN = [] + rec_delay_areaN = [] - area_dict = { - "areaN": [nodes[0,i] for i in range(n_nodes)], - "areaS": [nodes[n_nodes-1,i] for i in range(n_nodes)], - "areaNW": [nodes[0,0]], - "areaSE": [nodes[n_nodes-1, n_nodes-1]] - } + for i in range(10): + W = World( + name="", + deltan=10, + tmax=3000, + print_mode=1, save_mode=1, show_mode=0, + random_seed=None, + ) - W.adddemand_nodes2nodes(area_dict["areaN"], area_dict["areaS"], 0, 3000, volume=7000) + n_nodes = 4 + imax = n_nodes + jmax = n_nodes + nodes = {} + for i in range(imax): + for j in range(jmax): + nodes[i,j] = W.addNode(f"n{(i,j)}", i, j, flow_capacity=1.6) + + links = {} + for i in range(imax): + for j in range(jmax): + if i != imax-1: + links[i,j,i+1,j] = W.addLink(f"l{(i,j,i+1,j)}", nodes[i,j], nodes[i+1,j], length=1000) + if i != 0: + links[i,j,i-1,j] = W.addLink(f"l{(i,j,i-1,j)}", nodes[i,j], nodes[i-1,j], length=1000) + if j != jmax-1: + links[i,j,i,j+1] = W.addLink(f"l{(i,j,i,j+1)}", nodes[i,j], nodes[i,j+1], length=1000) + if j != 0: + links[i,j,i,j-1] = W.addLink(f"l{(i,j,i,j-1)}", nodes[i,j], nodes[i,j-1], length=1000) + + + area_dict = { + "areaN": [nodes[0,i] for i in range(n_nodes)], + "areaS": [nodes[n_nodes-1,i] for i in range(n_nodes)], + "areaNW": [nodes[0,0]], + "areaSE": [nodes[n_nodes-1, n_nodes-1]] + } + + W.adddemand_nodes2nodes(area_dict["areaN"], area_dict["areaS"], 0, 3000, volume=7000) - W.exec_simulation() - W.analyzer.print_simple_stats() + W.exec_simulation() + W.analyzer.print_simple_stats() - df = W.analyzer.area_to_pandas(list(area_dict.values()), list(area_dict.keys()), border_include=True) - print(df) + df = W.analyzer.area_to_pandas(list(area_dict.values()), list(area_dict.keys()), border_include=True) + print(df) - assert equal_tolerance(df["traffic_volume"][df["area"] == "areaN"].values[0], 6900) - assert equal_tolerance(df["traffic_volume"][df["area"] == "areaS"].values[0], 6300) - assert equal_tolerance(df["total_travel_time"][df["area"] == "areaN"].values[0], 800000, rel_tol=0.3) - assert equal_tolerance(df["average_delay"][df["area"] == "areaN"].values[0], 0.73, abs_tol=0.2) + rec_volume_areaN.append(df["traffic_volume"][df["area"] == "areaN"].values[0]) + rec_volume_areaS.append(df["traffic_volume"][df["area"] == "areaS"].values[0]) + rec_ttt_areaN.append(df["total_travel_time"][df["area"] == "areaN"].values[0]) + rec_delay_areaN.append(df["average_delay"][df["area"] == "areaN"].values[0]) + assert equal_tolerance(average(rec_volume_areaN), 6880) + assert equal_tolerance(average(rec_volume_areaS), 6380) + assert equal_tolerance(average(rec_ttt_areaN), 840000) + assert equal_tolerance(average(rec_delay_areaN), 0.77, abs_tol=0.1) @pytest.mark.flaky(reruns=10) def test_vehicle_group_stats(): diff --git a/uxsim/Utilities/Utilities.py b/uxsim/Utilities/Utilities.py index ee6f900..9dca6f4 100644 --- a/uxsim/Utilities/Utilities.py +++ b/uxsim/Utilities/Utilities.py @@ -3,6 +3,9 @@ This contains functions that are not essential for simulation but useful to specific analysis. """ import networkx as nx +import numpy as np +from scipy.sparse import csr_matrix +from scipy.sparse.csgraph import dijkstra def generate_grid_network(W, imax, jmax, **kwargs): """ @@ -152,4 +155,72 @@ def enumerate_k_shortest_routes_on_t(W, source, target, t, k=1, cost_function=la if return_cost: return routes, costs else: - return routes \ No newline at end of file + return routes + +def get_shortest_path_distance_between_all_nodes(W, return_matrix=False): + """ + Get the shortest distances (in meters) between all node pairs based on link lengths + + Parameters + ---------- + W : World + The World object. + return_matrix : bool, optional + Whether to return the distance matrix as a numpy array. Default is False. + + Returns + ------- + dict or numpy array + Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False. + Returns a numpy array of distances between nodes whose index is node.id pair if `return_matrix` is True. + """ + num_nodes = len(W.NODES) + distances = np.full((num_nodes, num_nodes), np.inf) # Initialize with infinity + + # Fill in the distances based on the link lengths + for link in W.LINKS: + i = link.start_node.id + j = link.end_node.id + distances[i, j] = min(distances[i, j], link.length) + + # Use Dijkstra algorithm to compute shortest distances + distances = dijkstra(csr_matrix(distances), directed=True, return_predecessors=False) + + if return_matrix == True: + return distances + else: + distances_dict = dict() + for node1 in W.NODES: + for node2 in W.NODES: + distances_dict[node1, node2] = distances[node1.id, node2.id] + distances_dict[node1.name, node2.name] = distances[node1.id, node2.id] + return distances_dict + +def get_shortest_path_instantaneous_travel_time_between_all_nodes(W, return_matrix=False): + """ + Get the shortest instantaneous travel time (in seconds) between all node pairs based on the current instantaneous travel time of each link. + + Parameters + ---------- + W : World + The World object. + return_matrix : bool, optional + Whether to return the distance matrix as a numpy array. Default is False. + + Returns + ------- + dict or numpy array + Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False. + Returns a numpy array of distances between nodes whose index is node.id pair if `return_matrix` is True. + """ + distances = W.ROUTECHOICE.dist + + if return_matrix == True: + return distances + else: + distances_dict = dict() + for node1 in W.NODES: + for node2 in W.NODES: + distances_dict[node1, node2] = distances[node1.id, node2.id] + distances_dict[node1.name, node2.name] = distances[node1.id, node2.id] + return distances_dict \ No newline at end of file diff --git a/uxsim/uxsim.py b/uxsim/uxsim.py index a4726cb..b3aeada 100644 --- a/uxsim/uxsim.py +++ b/uxsim/uxsim.py @@ -10,7 +10,7 @@ import numpy as np import matplotlib.pyplot as plt from scipy.sparse import csr_matrix -from scipy.sparse.csgraph import floyd_warshall, dijkstra +from scipy.sparse.csgraph import dijkstra import dill as pickle from .analyzer import * @@ -2208,70 +2208,6 @@ def get_nodes_in_area(W, x, y, r): if (node.x-x)**2 + (node.y-y)**2 < r**2: nodes.append(node) return nodes - - def get_shortest_path_distance_between_all_nodes(W, return_matrix=False): - """ - Get the shortest distances (in meters) between all node pairs based on link lengths - - Parameters - ---------- - return_matrix : bool, optional - Whether to return the distance matrix as a numpy array. Default is False. - - Returns - ------- - dict or numpy array - Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False. - Returns a numpy array of distances between nodes whose index is node.id pair if `return_matrix` is True. - """ - num_nodes = len(W.NODES) - distances = np.full((num_nodes, num_nodes), np.inf) # Initialize with infinity - - # Fill in the distances based on the link lengths - for link in W.LINKS: - i = link.start_node.id - j = link.end_node.id - distances[i, j] = min(distances[i, j], link.length) - - # Use Dijkstra algorithm to compute shortest distances - distances = dijkstra(csr_matrix(distances), directed=True, return_predecessors=False) - - if return_matrix == True: - return distances - else: - distances_dict = dict() - for node1 in W.NODES: - for node2 in W.NODES: - distances_dict[node1, node2] = distances[node1.id, node2.id] - distances_dict[node1.name, node2.name] = distances[node1.id, node2.id] - return distances_dict - - def get_shortest_path_instantaneous_travel_time_between_all_nodes(W, return_matrix=False): - """ - Get the shortest instantaneous travel time (in seconds) between all node pairs based on the current instantaneous travel time of each link. - - Parameters - ---------- - return_matrix : bool, optional - Whether to return the distance matrix as a numpy array. Default is False. - - Returns - ------- - dict or numpy array - Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False. - Returns a numpy array of distances between nodes whose index is node.id pair if `return_matrix` is True. - """ - distances = W.ROUTECHOICE.dist - - if return_matrix == True: - return distances - else: - distances_dict = dict() - for node1 in W.NODES: - for node2 in W.NODES: - distances_dict[node1, node2] = distances[node1.id, node2.id] - distances_dict[node1.name, node2.name] = distances[node1.id, node2.id] - return distances_dict def load_scenario_from_csv(W, fname_node, fname_link, fname_demand, tmax=None): """