Skip to content

Commit

Permalink
Merge pull request #137 from toruseo/develop
Browse files Browse the repository at this point in the history
relocate 'get_shortest_path_*()' to `Utilities` submodule
  • Loading branch information
toruseo authored Sep 17, 2024
2 parents a32f4d5 + b1c7872 commit dc35cd3
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 112 deletions.
104 changes: 58 additions & 46 deletions tests/test_other_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import pytest
from numpy import *
from uxsim import *
from uxsim.Utilities import *

Expand Down Expand Up @@ -405,7 +406,7 @@ def test_shortest_path_costs():

W.analyzer.cumulative_curves(figsize=(4,2))

spd = W.get_shortest_path_distance_between_all_nodes()
spd = get_shortest_path_distance_between_all_nodes(W)
assert spd["orig", "dest"] == 2000
assert spd["orig", "mid1"] == 1000
assert spd["orig", "mid2"] == 1000
Expand All @@ -415,11 +416,11 @@ def test_shortest_path_costs():
assert spd[orig, dest] == 2000
assert spd[dest, orig] == np.Inf

spd = W.get_shortest_path_distance_between_all_nodes(return_matrix=True)
spd = get_shortest_path_distance_between_all_nodes(W, return_matrix=True)
assert spd[0, 3] == 2000
assert spd[3, 0] == np.Inf

spt = W.get_shortest_path_instantaneous_travel_time_between_all_nodes()
spt = get_shortest_path_instantaneous_travel_time_between_all_nodes(W)
assert equal_tolerance(spt["orig", "dest"], 150)
assert equal_tolerance(spt["orig", "mid1"], 50, rel_tol=0.2)
assert equal_tolerance(spt["orig", "mid2"], 150)
Expand All @@ -429,7 +430,7 @@ def test_shortest_path_costs():
assert equal_tolerance(spt[orig, dest], 150)
assert spt[dest, orig] == np.Inf

spt = W.get_shortest_path_instantaneous_travel_time_between_all_nodes(return_matrix=True)
spt = get_shortest_path_instantaneous_travel_time_between_all_nodes(W, return_matrix=True)
assert equal_tolerance(spt[0, 3], 150)
assert spt[3, 0] == np.Inf

Expand Down Expand Up @@ -569,55 +570,66 @@ def test_area2area_demand_and_stats():

@pytest.mark.flaky(reruns=10)
def test_area_stats():
W = World(
name="",
deltan=10,
tmax=3000,
print_mode=1, save_mode=1, show_mode=0,
random_seed=None,
)

n_nodes = 4
imax = n_nodes
jmax = n_nodes
nodes = {}
for i in range(imax):
for j in range(jmax):
nodes[i,j] = W.addNode(f"n{(i,j)}", i, j, flow_capacity=1.6)

links = {}
for i in range(imax):
for j in range(jmax):
if i != imax-1:
links[i,j,i+1,j] = W.addLink(f"l{(i,j,i+1,j)}", nodes[i,j], nodes[i+1,j], length=1000)
if i != 0:
links[i,j,i-1,j] = W.addLink(f"l{(i,j,i-1,j)}", nodes[i,j], nodes[i-1,j], length=1000)
if j != jmax-1:
links[i,j,i,j+1] = W.addLink(f"l{(i,j,i,j+1)}", nodes[i,j], nodes[i,j+1], length=1000)
if j != 0:
links[i,j,i,j-1] = W.addLink(f"l{(i,j,i,j-1)}", nodes[i,j], nodes[i,j-1], length=1000)

rec_volume_areaN = []
rec_volume_areaS = []
rec_ttt_areaN = []
rec_delay_areaN = []

area_dict = {
"areaN": [nodes[0,i] for i in range(n_nodes)],
"areaS": [nodes[n_nodes-1,i] for i in range(n_nodes)],
"areaNW": [nodes[0,0]],
"areaSE": [nodes[n_nodes-1, n_nodes-1]]
}
for i in range(10):
W = World(
name="",
deltan=10,
tmax=3000,
print_mode=1, save_mode=1, show_mode=0,
random_seed=None,
)

W.adddemand_nodes2nodes(area_dict["areaN"], area_dict["areaS"], 0, 3000, volume=7000)
n_nodes = 4
imax = n_nodes
jmax = n_nodes
nodes = {}
for i in range(imax):
for j in range(jmax):
nodes[i,j] = W.addNode(f"n{(i,j)}", i, j, flow_capacity=1.6)

links = {}
for i in range(imax):
for j in range(jmax):
if i != imax-1:
links[i,j,i+1,j] = W.addLink(f"l{(i,j,i+1,j)}", nodes[i,j], nodes[i+1,j], length=1000)
if i != 0:
links[i,j,i-1,j] = W.addLink(f"l{(i,j,i-1,j)}", nodes[i,j], nodes[i-1,j], length=1000)
if j != jmax-1:
links[i,j,i,j+1] = W.addLink(f"l{(i,j,i,j+1)}", nodes[i,j], nodes[i,j+1], length=1000)
if j != 0:
links[i,j,i,j-1] = W.addLink(f"l{(i,j,i,j-1)}", nodes[i,j], nodes[i,j-1], length=1000)


area_dict = {
"areaN": [nodes[0,i] for i in range(n_nodes)],
"areaS": [nodes[n_nodes-1,i] for i in range(n_nodes)],
"areaNW": [nodes[0,0]],
"areaSE": [nodes[n_nodes-1, n_nodes-1]]
}

W.adddemand_nodes2nodes(area_dict["areaN"], area_dict["areaS"], 0, 3000, volume=7000)

W.exec_simulation()
W.analyzer.print_simple_stats()
W.exec_simulation()
W.analyzer.print_simple_stats()

df = W.analyzer.area_to_pandas(list(area_dict.values()), list(area_dict.keys()), border_include=True)
print(df)
df = W.analyzer.area_to_pandas(list(area_dict.values()), list(area_dict.keys()), border_include=True)
print(df)

assert equal_tolerance(df["traffic_volume"][df["area"] == "areaN"].values[0], 6900)
assert equal_tolerance(df["traffic_volume"][df["area"] == "areaS"].values[0], 6300)
assert equal_tolerance(df["total_travel_time"][df["area"] == "areaN"].values[0], 800000, rel_tol=0.3)
assert equal_tolerance(df["average_delay"][df["area"] == "areaN"].values[0], 0.73, abs_tol=0.2)
rec_volume_areaN.append(df["traffic_volume"][df["area"] == "areaN"].values[0])
rec_volume_areaS.append(df["traffic_volume"][df["area"] == "areaS"].values[0])
rec_ttt_areaN.append(df["total_travel_time"][df["area"] == "areaN"].values[0])
rec_delay_areaN.append(df["average_delay"][df["area"] == "areaN"].values[0])

assert equal_tolerance(average(rec_volume_areaN), 6880)
assert equal_tolerance(average(rec_volume_areaS), 6380)
assert equal_tolerance(average(rec_ttt_areaN), 840000)
assert equal_tolerance(average(rec_delay_areaN), 0.77, abs_tol=0.1)

@pytest.mark.flaky(reruns=10)
def test_vehicle_group_stats():
Expand Down
73 changes: 72 additions & 1 deletion uxsim/Utilities/Utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
This contains functions that are not essential for simulation but useful to specific analysis.
"""
import networkx as nx
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse.csgraph import dijkstra

def generate_grid_network(W, imax, jmax, **kwargs):
"""
Expand Down Expand Up @@ -152,4 +155,72 @@ def enumerate_k_shortest_routes_on_t(W, source, target, t, k=1, cost_function=la
if return_cost:
return routes, costs
else:
return routes
return routes

def get_shortest_path_distance_between_all_nodes(W, return_matrix=False):
"""
Get the shortest distances (in meters) between all node pairs based on link lengths
Parameters
----------
W : World
The World object.
return_matrix : bool, optional
Whether to return the distance matrix as a numpy array. Default is False.
Returns
-------
dict or numpy array
Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False.
Returns a numpy array of distances between nodes whose index is node.id pair if `return_matrix` is True.
"""
num_nodes = len(W.NODES)
distances = np.full((num_nodes, num_nodes), np.inf) # Initialize with infinity

# Fill in the distances based on the link lengths
for link in W.LINKS:
i = link.start_node.id
j = link.end_node.id
distances[i, j] = min(distances[i, j], link.length)

# Use Dijkstra algorithm to compute shortest distances
distances = dijkstra(csr_matrix(distances), directed=True, return_predecessors=False)

if return_matrix == True:
return distances
else:
distances_dict = dict()
for node1 in W.NODES:
for node2 in W.NODES:
distances_dict[node1, node2] = distances[node1.id, node2.id]
distances_dict[node1.name, node2.name] = distances[node1.id, node2.id]
return distances_dict

def get_shortest_path_instantaneous_travel_time_between_all_nodes(W, return_matrix=False):
"""
Get the shortest instantaneous travel time (in seconds) between all node pairs based on the current instantaneous travel time of each link.
Parameters
----------
W : World
The World object.
return_matrix : bool, optional
Whether to return the distance matrix as a numpy array. Default is False.
Returns
-------
dict or numpy array
Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False.
Returns a numpy array of distances between nodes whose index is node.id pair if `return_matrix` is True.
"""
distances = W.ROUTECHOICE.dist

if return_matrix == True:
return distances
else:
distances_dict = dict()
for node1 in W.NODES:
for node2 in W.NODES:
distances_dict[node1, node2] = distances[node1.id, node2.id]
distances_dict[node1.name, node2.name] = distances[node1.id, node2.id]
return distances_dict
66 changes: 1 addition & 65 deletions uxsim/uxsim.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np
import matplotlib.pyplot as plt
from scipy.sparse import csr_matrix
from scipy.sparse.csgraph import floyd_warshall, dijkstra
from scipy.sparse.csgraph import dijkstra
import dill as pickle

from .analyzer import *
Expand Down Expand Up @@ -2208,70 +2208,6 @@ def get_nodes_in_area(W, x, y, r):
if (node.x-x)**2 + (node.y-y)**2 < r**2:
nodes.append(node)
return nodes

def get_shortest_path_distance_between_all_nodes(W, return_matrix=False):
"""
Get the shortest distances (in meters) between all node pairs based on link lengths
Parameters
----------
return_matrix : bool, optional
Whether to return the distance matrix as a numpy array. Default is False.
Returns
-------
dict or numpy array
Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False.
Returns a numpy array of distances between nodes whose index is node.id pair if `return_matrix` is True.
"""
num_nodes = len(W.NODES)
distances = np.full((num_nodes, num_nodes), np.inf) # Initialize with infinity

# Fill in the distances based on the link lengths
for link in W.LINKS:
i = link.start_node.id
j = link.end_node.id
distances[i, j] = min(distances[i, j], link.length)

# Use Dijkstra algorithm to compute shortest distances
distances = dijkstra(csr_matrix(distances), directed=True, return_predecessors=False)

if return_matrix == True:
return distances
else:
distances_dict = dict()
for node1 in W.NODES:
for node2 in W.NODES:
distances_dict[node1, node2] = distances[node1.id, node2.id]
distances_dict[node1.name, node2.name] = distances[node1.id, node2.id]
return distances_dict

def get_shortest_path_instantaneous_travel_time_between_all_nodes(W, return_matrix=False):
"""
Get the shortest instantaneous travel time (in seconds) between all node pairs based on the current instantaneous travel time of each link.
Parameters
----------
return_matrix : bool, optional
Whether to return the distance matrix as a numpy array. Default is False.
Returns
-------
dict or numpy array
Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False.
Returns a numpy array of distances between nodes whose index is node.id pair if `return_matrix` is True.
"""
distances = W.ROUTECHOICE.dist

if return_matrix == True:
return distances
else:
distances_dict = dict()
for node1 in W.NODES:
for node2 in W.NODES:
distances_dict[node1, node2] = distances[node1.id, node2.id]
distances_dict[node1.name, node2.name] = distances[node1.id, node2.id]
return distances_dict

def load_scenario_from_csv(W, fname_node, fname_link, fname_demand, tmax=None):
"""
Expand Down

0 comments on commit dc35cd3

Please sign in to comment.