Skip to content

Commit

Permalink
chore: add type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
carderne committed Feb 16, 2024
1 parent f4dcd99 commit c12061b
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 265 deletions.
1 change: 0 additions & 1 deletion gridfinder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from importlib.metadata import version

from ._util import * # NoQA
from .gridfinder import * # NoQA
from .post import * # NoQA
from .prepare import * # NoQA
Expand Down
109 changes: 50 additions & 59 deletions gridfinder/_util.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,47 @@
"""
Utility module used internally.
Functions:
- save_raster
- clip_line_poly
- clip_raster
"""

from pathlib import Path
import json
from pathlib import Path
from typing import Optional, Tuple, Union

import geopandas as gpd
import numpy as np
import rasterio
from affine import Affine
from geopandas import GeoDataFrame
from pyproj import Proj
from rasterio.io import DatasetReader
from rasterio.mask import mask


def save_raster(path, raster, affine, crs=None, nodata=0):
def save_raster(
path: Union[str, Path],
raster: np.ndarray,
affine: Affine,
crs: Optional[Union[str, Proj]] = None,
nodata: int = 0,
) -> None:
"""Save a raster to the specified file.
Parameters
----------
file : str
Output file path
raster : numpy.array
2D numpy array containing raster values
affine: affine.Affine
Affine transformation for the raster
crs: str, proj.Proj, optional (default EPSG4326)
CRS for the raster
file : Output file path
raster : 2D numpy array containing raster values
affine : Affine transformation for the raster
crs: CRS for the raster (default EPSG4326)
"""

path = Path(path)
if not path.parents[0].exists():
path.parents[0].mkdir(parents=True, exist_ok=True)
p_path = Path(path)
if not p_path.parents[0].exists():
p_path.parents[0].mkdir(parents=True, exist_ok=True)

if not crs:
crs = "+proj=latlong"

filtered_out = rasterio.open(
path,
p_path,
"w",
driver="GTiff",
height=raster.shape[0],
Expand All @@ -54,59 +56,48 @@ def save_raster(path, raster, affine, crs=None, nodata=0):
filtered_out.close()


# clip_raster is copied from openelec.clustering
def clip_raster(raster, boundary, boundary_layer=None):
def clip_raster(
raster: Union[str, Path, DatasetReader],
boundary: Union[str, Path, GeoDataFrame],
boundary_layer: Optional[str] = None,
) -> Tuple[
np.ndarray,
Affine,
dict,
]:
"""Clip the raster to the given administrative boundary.
Parameters
----------
raster : string, pathlib.Path or rasterio.io.DataSetReader
Location of or already opened raster.
boundary : string, pathlib.Path or geopandas.GeoDataFrame
The polygon by which to clip the raster.
boundary_layer : string, optional
For multi-layer files (like GeoPackage), specify the layer to be used.
raster : Location of or already opened raster.
boundary : The polygon by which to clip the raster.
boundary_layer : For multi-layer files (like GeoPackage), specify layer to be used.
Returns
-------
tuple
Three elements:
clipped : numpy.ndarray
Contents of clipped raster.
affine : affine.Affine()
Information for mapping pixel coordinates
to a coordinate system.
crs : dict
Dict of the form {'init': 'epsg:4326'} defining the coordinate
reference system of the raster.
clipped : Contents of clipped raster.
affine : The affine
crs : form {'init': 'epsg:4326'}
"""

if isinstance(raster, Path):
raster = str(raster)
if isinstance(raster, str):
raster = rasterio.open(raster)
raster_ds = raster if isinstance(raster, DatasetReader) else rasterio.open(raster)

if isinstance(boundary, Path):
boundary = str(boundary)
if isinstance(boundary, str):
if ".gpkg" in boundary:
driver = "GPKG"
else:
driver = None # default to shapefile
boundary_layer = "" # because shapefiles have no layers

boundary = gpd.read_file(boundary, layer=boundary_layer, driver=driver)
boundary_gdf: GeoDataFrame = (
boundary
if isinstance(boundary, GeoDataFrame)
else gpd.read_file(boundary, layer=boundary_layer)
)

if not (boundary.crs == raster.crs or boundary.crs == raster.crs.data):
boundary = boundary.to_crs(crs=raster.crs)
coords = [json.loads(boundary.to_json())["features"][0]["geometry"]]
if not (
boundary_gdf.crs == raster_ds.crs or boundary_gdf.crs == raster_ds.crs.data
):
boundary_gdf = boundary_gdf.to_crs(crs=raster_ds.crs) # type: ignore
coords = [json.loads(boundary_gdf.to_json())["features"][0]["geometry"]]

# mask/clip the raster using rasterio.mask
clipped, affine = mask(dataset=raster, shapes=coords, crop=True)
clipped, affine = mask(dataset=raster_ds, shapes=coords, crop=True)

if len(clipped.shape) >= 3:
clipped = clipped[0]

return clipped, affine, raster.crs
return clipped, affine, raster_ds.crs
84 changes: 34 additions & 50 deletions gridfinder/gridfinder.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,40 @@
"""
Implements Dijkstra's algorithm on a cost-array to create an MST.
Functions:
- get_targets_costs
- estimate_mem_use
- optimise
"""

import os
import pickle
import sys
from heapq import heapify, heappop, heappush
from math import sqrt
from heapq import heapify, heappush, heappop
import pickle
from typing import Optional, Tuple

import numpy as np
import rasterio
from IPython.display import display, Markdown
from affine import Affine
from IPython.display import Markdown, display

from gridfinder._util import save_raster

sys.setrecursionlimit(100000)


def get_targets_costs(targets_in, costs_in):
def get_targets_costs(
targets_in: str, costs_in: str
) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int], Affine]:
"""Load the targets and costs arrays from the given file paths.
Parameters
----------
targets_in : str
Path for targets raster.
costs_in : str
Path for costs raster.
targets_in : Path for targets raster.
costs_in : Path for costs raster.
Returns
-------
targets : numpy array
2D array of targets
costs: numpy array
2D array of costs
start: tuple
Two-element tuple with row, col of starting point.
affine : affine.Affine
Affine transformation for the rasters.
targets : 2D array of targets
costs: 2D array of costs
start: tuple with row, col of starting point.
affine : Affine transformation for the rasters.
"""

targets_ra = rasterio.open(targets_in)
Expand All @@ -61,20 +53,17 @@ def get_targets_costs(targets_in, costs_in):
return targets, costs, start, affine


def estimate_mem_use(targets, costs):
def estimate_mem_use(targets: np.ndarray, costs: np.ndarray) -> float:
"""Estimate memory usage in GB, probably not very accurate.
Parameters
----------
targets : numpy array
2D array of targets.
costs : numpy array
2D array of costs.
targets : 2D array of targets.
costs : 2D array of costs.
Returns
-------
est_mem : float
Estimated memory requirement in GB.
est_mem : Estimated memory requirement in GB.
"""

# make sure these match the ones used in optimise below
Expand All @@ -89,31 +78,27 @@ def estimate_mem_use(targets, costs):


def optimise(
targets,
costs,
start,
jupyter=False,
animate=False,
affine=None,
animate_path=None,
silent=False,
):
targets: np.ndarray,
costs: np.ndarray,
start: Tuple[int, int],
jupyter: bool = False,
animate: bool = False,
affine: Optional[Affine] = None,
animate_path: Optional[str] = None,
silent: bool = False,
) -> np.ndarray:
"""Run the Dijkstra algorithm for the supplied arrays.
Parameters
----------
targets : numpy array
2D array of targets.
costs : numpy array
2D array of costs.
start : tuple
Two-element tuple with row, col of starting point.
jupyter : boolean, optional (default False)
Whether the code is being run from a Jupyter Notebook.
targets : 2D array of targets.
costs : 2D array of costs.
start : tuple with row, col of starting point.
jupyter : Whether the code is being run from a Jupyter Notebook.
Returns
-------
dist : numpy array
dist :
2D array with the distance (in cells) of each point from a 'found'
on-grid point. Values of 0 imply that cell is part of an MV grid line.
"""
Expand All @@ -135,13 +120,12 @@ def optimise(
queue = [[0, start]]
heapify(queue)

def zero_and_heap_path(loc):
def zero_and_heap_path(loc: Tuple[int, int]) -> None:
"""Zero the location's distance value and follow upstream doing same.
Parameters
----------
loc : tuple
row, col of current point.
loc : row, col of current point.
"""

if not dist[loc] == 0:
Expand Down
Loading

0 comments on commit c12061b

Please sign in to comment.