Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move SQL functions out of this project and into goat-core for consistency #7

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pyarrow = "^14.0.1"
celery = "^5.3.6"
redis = "^5.0.1"
tqdm = "^4.66.1"
sentry-sdk = {extras = ["celery", "fastapi"], version = "^2.14.0"}


[tool.poetry.group.dev.dependencies]
Expand Down
15 changes: 10 additions & 5 deletions src/core/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Dict, Optional
from uuid import UUID

from pydantic import BaseSettings, PostgresDsn, validator

Expand All @@ -13,7 +14,9 @@ class SyncPostgresDsn(PostgresDsn):


class Settings(BaseSettings):
DEBUG_MODE: bool = True
# Monitoring
SENTRY_DSN: Optional[str] = None
ENVIRONMENT: Optional[str] = "dev"

CUSTOMER_SCHEMA: str = "customer"
USER_DATA_SCHEMA: str = "user_data"
Expand All @@ -22,13 +25,15 @@ class Settings(BaseSettings):
PROJECT_NAME: Optional[str] = "GOAT Routing API"
CACHE_DIR: str = "/app/src/cache"

STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID = 36126
STREET_NETWORK_NODE_DEFAULT_LAYER_PROJECT_ID = 37319

NETWORK_REGION_TABLE = "basic.geofence_active_mobility"

CATCHMENT_AREA_CAR_BUFFER_DEFAULT_SPEED = 80 # km/h
CATCHMENT_AREA_HOLE_THRESHOLD_SQM = 10000 # 100m x 100m
CATCHMENT_AREA_HOLE_THRESHOLD_SQM = 200000 # 20 hectares, ~450m x 450m

BASE_STREET_NETWORK: Optional[UUID] = "903ecdca-b717-48db-bbce-0219e41439cf"
DEFAULT_STREET_NETWORK_NODE_LAYER_PROJECT_ID = (
37319 # Hardcoded until node layers are added to GOAT projects by default
)

DATA_INSERT_BATCH_SIZE = 800

Expand Down
45 changes: 17 additions & 28 deletions src/core/street_network/street_network_cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from uuid import UUID

import polars as pl
from polars import DataFrame
Expand All @@ -15,56 +16,50 @@ def __init__(self):

def _get_edge_cache_file_name(
self,
edge_layer_project_id: int,
edge_layer_id: UUID,
h3_short: str,
):
"""Get edge cache file path for the specified H3_3 cell."""

return os.path.join(
settings.CACHE_DIR,
f"{edge_layer_project_id}_{h3_short}_edge.parquet",
f"{str(edge_layer_id)}_{h3_short}_edge.parquet",
)

def _get_node_cache_file_name(
self,
node_layer_project_id: int,
node_layer_id: UUID,
h3_short: str,
):
"""Get node cache file path for the specified H3_3 cell."""

return os.path.join(
settings.CACHE_DIR,
f"{node_layer_project_id}_{h3_short}_node.parquet",
f"{node_layer_id}_{h3_short}_node.parquet",
)

def edge_cache_exists(self, edge_layer_project_id: int, h3_short: str):
def edge_cache_exists(self, edge_layer_id: UUID, h3_short: str):
"""Check if edge data for the specified H3_3 cell is cached."""

edge_cache_file = self._get_edge_cache_file_name(
edge_layer_project_id, h3_short
)
edge_cache_file = self._get_edge_cache_file_name(edge_layer_id, h3_short)
return os.path.exists(edge_cache_file)

def node_cache_exists(self, node_layer_project_id: int, h3_short: str):
def node_cache_exists(self, node_layer_id: UUID, h3_short: str):
"""Check if node data for the specified H3_3 cell is cached."""

node_cache_file = self._get_node_cache_file_name(
node_layer_project_id, h3_short
)
node_cache_file = self._get_node_cache_file_name(node_layer_id, h3_short)
return os.path.exists(node_cache_file)

def read_edge_cache(
self,
edge_layer_project_id: int,
edge_layer_id: UUID,
h3_short: str,
):
"""Read edge data for the specified H3_3 cell from cache."""

edge_df: DataFrame = None

edge_cache_file = self._get_edge_cache_file_name(
edge_layer_project_id, h3_short
)
edge_cache_file = self._get_edge_cache_file_name(edge_layer_id, h3_short)

try:
with open(edge_cache_file, "rb") as file:
Expand All @@ -78,16 +73,14 @@ def read_edge_cache(

def read_node_cache(
self,
node_layer_project_id: int,
node_layer_id: UUID,
h3_short: str,
):
"""Read node data for the specified H3_3 cell from cache."""

node_df: DataFrame = None

node_cache_file = self._get_node_cache_file_name(
node_layer_project_id, h3_short
)
node_cache_file = self._get_node_cache_file_name(node_layer_id, h3_short)

try:
with open(node_cache_file, "rb") as file:
Expand All @@ -101,15 +94,13 @@ def read_node_cache(

def write_edge_cache(
self,
edge_layer_project_id: int,
edge_layer_id: UUID,
h3_short: str,
edge_df: DataFrame,
):
"""Write edge data for the specified H3_3 cell into cache."""

edge_cache_file = self._get_edge_cache_file_name(
edge_layer_project_id, h3_short
)
edge_cache_file = self._get_edge_cache_file_name(edge_layer_id, h3_short)

try:
with open(edge_cache_file, "wb") as file:
Expand All @@ -124,15 +115,13 @@ def write_edge_cache(

def write_node_cache(
self,
node_layer_project_id: int,
node_layer_id: UUID,
h3_short: str,
node_df: DataFrame,
):
"""Write node data for the specified H3_3 cell into cache."""

node_cache_file = self._get_node_cache_file_name(
node_layer_project_id, h3_short
)
node_cache_file = self._get_node_cache_file_name(node_layer_id, h3_short)

try:
with open(node_cache_file, "wb") as file:
Expand Down
102 changes: 36 additions & 66 deletions src/core/street_network/street_network_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,12 @@ class StreetNetworkUtil:
def __init__(self, db_connection: AsyncSession):
self.db_connection = db_connection

async def _get_layer_and_user_id(self, layer_project_id: int):
"""Get the layer ID and user ID of the specified layer project ID."""
async def _get_user_id(self, layer_id: UUID):
"""Get the user ID of the specified layer ID."""

layer_id: UUID = None
user_id: UUID = None

try:
# Get the associated layer ID
result = await self.db_connection.execute(
text(
f"""SELECT layer_id
FROM {settings.CUSTOMER_SCHEMA}.layer_project
WHERE id = {layer_project_id};"""
)
)
layer_id = UUID(str(result.fetchone()[0]))

# Get the user ID of the layer
result = await self.db_connection.execute(
text(
Expand All @@ -41,55 +30,47 @@ async def _get_layer_and_user_id(self, layer_project_id: int):
)
user_id = UUID(str(result.fetchone()[0]))
except Exception:
raise ValueError(
f"Could not fetch layer and user ID for layer project ID {layer_project_id}."
)
raise ValueError(f"Could not fetch user ID for layer ID {layer_id}.")

return layer_id, user_id
return user_id

async def _get_street_network_tables(
self,
street_network_edge_layer_project_id: int,
street_network_node_layer_project_id: int,
edge_layer_id: UUID,
node_layer_id: UUID,
):
"""Get table names and layer IDs of the edge and node tables."""

edge_table: str = None
edge_layer_id: UUID = None
node_table: str = None
node_layer_id: UUID = None

# Get edge table name if a layer project ID is specified
if street_network_edge_layer_project_id:
# Get edge table name if a layer ID is specified
if edge_layer_id:
try:
# Get the edge layer ID and associated user ID
edge_layer_id, user_id = await self._get_layer_and_user_id(
street_network_edge_layer_project_id
)
user_id = await self._get_user_id(edge_layer_id)

# Produce the edge table name
edge_table = f"{settings.USER_DATA_SCHEMA}.street_network_line_{str(user_id).replace('-', '')}"
except Exception:
raise ValueError(
f"Could not fetch edge table name for layer project ID {street_network_edge_layer_project_id}."
f"Could not fetch edge table name for layer ID {edge_layer_id}."
)

# Get node table name if a layer project ID is specified
if street_network_node_layer_project_id:
# Get node table name if a layer ID is specified
if node_layer_id:
try:
# Get the node layer ID and associated user ID
node_layer_id, user_id = await self._get_layer_and_user_id(
street_network_node_layer_project_id
)
user_id = await self._get_user_id(node_layer_id)

# Produce the node table name
node_table = f"{settings.USER_DATA_SCHEMA}.street_network_point_{str(user_id).replace('-', '')}"
except Exception:
raise ValueError(
f"Could not fetch node table name for layer project ID {street_network_node_layer_project_id}."
f"Could not fetch node table name for layer ID {node_layer_id}."
)

return edge_table, edge_layer_id, node_table, node_layer_id
return edge_table, node_table

async def _get_street_network_region_h3_3_cells(self, region_geofence_table: str):
"""Get list of H3_3 cells covering the street network region."""
Expand Down Expand Up @@ -118,8 +99,8 @@ async def _get_street_network_region_h3_3_cells(self, region_geofence_table: str

async def fetch(
self,
edge_layer_project_id: int,
node_layer_project_id: int,
edge_layer_id: UUID,
node_layer_id: UUID,
region_geofence_table: str,
):
"""Fetch street network from specified layer and load into Polars dataframes."""
Expand All @@ -139,28 +120,22 @@ async def fetch(
# Get table names and layer IDs of the edge and node tables
(
street_network_edge_table,
street_network_edge_layer_id,
street_network_node_table,
street_network_node_layer_id,
) = await self._get_street_network_tables(
edge_layer_project_id, node_layer_project_id
)
) = await self._get_street_network_tables(edge_layer_id, node_layer_id)

# Initialize cache
street_network_cache = StreetNetworkCache()

try:
for h3_short in street_network_region_h3_3_cells:
if edge_layer_project_id is not None:
if street_network_cache.edge_cache_exists(
edge_layer_project_id, h3_short
):
if edge_layer_id is not None:
if street_network_cache.edge_cache_exists(edge_layer_id, h3_short):
# Read edge data from cache
edge_df = street_network_cache.read_edge_cache(
edge_layer_project_id, h3_short
edge_layer_id, h3_short
)
else:
if settings.DEBUG_MODE:
if settings.ENVIRONMENT == "dev":
print(
f"Fetching street network edge data for H3_3 cell {h3_short}"
)
Expand All @@ -174,7 +149,7 @@ async def fetch(
maxspeed_backward, source, target, h3_3, h3_6
FROM {street_network_edge_table}
WHERE h3_3 = {h3_short}
AND layer_id = '{str(street_network_edge_layer_id)}'
AND layer_id = '{str(edge_layer_id)}'
""",
uri=settings.POSTGRES_DATABASE_URI,
schema_overrides=SEGMENT_DATA_SCHEMA,
Expand All @@ -185,22 +160,20 @@ async def fetch(

# Write edge data into cache
street_network_cache.write_edge_cache(
edge_layer_project_id, h3_short, edge_df
edge_layer_id, h3_short, edge_df
)
# Update street network edge dictionary and memory usage
street_network_edge[h3_short] = edge_df
street_network_size += edge_df.estimated_size("gb")

if node_layer_project_id is not None:
if street_network_cache.node_cache_exists(
node_layer_project_id, h3_short
):
if node_layer_id is not None:
if street_network_cache.node_cache_exists(node_layer_id, h3_short):
# Read node data from cache
node_df = street_network_cache.read_node_cache(
node_layer_project_id, h3_short
node_layer_id, h3_short
)
else:
if settings.DEBUG_MODE:
if settings.ENVIRONMENT == "dev":
print(
f"Fetching street network node data for H3_3 cell {h3_short}"
)
Expand All @@ -211,15 +184,15 @@ async def fetch(
SELECT node_id AS id, h3_3, h3_6
FROM {street_network_node_table}
WHERE h3_3 = {h3_short}
AND layer_id = '{str(street_network_node_layer_id)}'
AND layer_id = '{str(node_layer_id)}'
""",
uri=settings.POSTGRES_DATABASE_URI,
schema_overrides=CONNECTOR_DATA_SCHEMA,
)

# Write node data into cache
street_network_cache.write_node_cache(
node_layer_project_id, h3_short, node_df
node_layer_id, h3_short, node_df
)

# Update street network node dictionary and memory usage
Expand All @@ -231,23 +204,20 @@ async def fetch(
)

# Raise error if a edge layer project ID is specified but no edge data is fetched
if edge_layer_project_id is not None and len(street_network_edge) == 0:
if edge_layer_id is not None and len(street_network_edge) == 0:
raise RuntimeError(
f"Failed to fetch street network edge data for layer project ID {edge_layer_project_id}."
f"Failed to fetch street network edge data for layer project ID {edge_layer_id}."
)

# Raise error if a node layer project ID is specified but no node data is fetched
if node_layer_project_id is not None and len(street_network_node) == 0:
if node_layer_id is not None and len(street_network_node) == 0:
raise RuntimeError(
f"Failed to fetch street network node data for layer project ID {node_layer_project_id}."
f"Failed to fetch street network node data for layer project ID {node_layer_id}."
)

end_time = time.time()

if settings.DEBUG_MODE:
print(
f"Street network load time: {round((end_time - start_time) / 60, 1)} min"
)
print(f"Street network in-memory size: {round(street_network_size, 1)} GB")
print(f"Street network load time: {round((end_time - start_time) / 60, 1)} min")
print(f"Street network in-memory size: {round(street_network_size, 1)} GB")

return street_network_edge, street_network_node
Loading