diff --git a/gridfinder/__init__.py b/gridfinder/__init__.py index 055f0c9..cb2345a 100644 --- a/gridfinder/__init__.py +++ b/gridfinder/__init__.py @@ -9,7 +9,6 @@ from importlib.metadata import version -from ._util import * # NoQA from .gridfinder import * # NoQA from .post import * # NoQA from .prepare import * # NoQA diff --git a/gridfinder/_util.py b/gridfinder/_util.py index 5fd9959..85bb3f4 100644 --- a/gridfinder/_util.py +++ b/gridfinder/_util.py @@ -1,45 +1,47 @@ """ Utility module used internally. - -Functions: - -- save_raster -- clip_line_poly -- clip_raster """ -from pathlib import Path import json +from pathlib import Path +from typing import Optional, Tuple, Union import geopandas as gpd +import numpy as np import rasterio +from affine import Affine +from geopandas import GeoDataFrame +from pyproj import Proj +from rasterio.io import DatasetReader from rasterio.mask import mask -def save_raster(path, raster, affine, crs=None, nodata=0): +def save_raster( + path: Union[str, Path], + raster: np.ndarray, + affine: Affine, + crs: Optional[Union[str, Proj]] = None, + nodata: int = 0, +) -> None: """Save a raster to the specified file. Parameters ---------- - file : str - Output file path - raster : numpy.array - 2D numpy array containing raster values - affine: affine.Affine - Affine transformation for the raster - crs: str, proj.Proj, optional (default EPSG4326) - CRS for the raster + file : Output file path + raster : 2D numpy array containing raster values + affine : Affine transformation for the raster + crs: CRS for the raster (default EPSG4326) """ - path = Path(path) - if not path.parents[0].exists(): - path.parents[0].mkdir(parents=True, exist_ok=True) + p_path = Path(path) + if not p_path.parents[0].exists(): + p_path.parents[0].mkdir(parents=True, exist_ok=True) if not crs: crs = "+proj=latlong" filtered_out = rasterio.open( - path, + p_path, "w", driver="GTiff", height=raster.shape[0], @@ -54,59 +56,48 @@ def save_raster(path, raster, affine, crs=None, nodata=0): filtered_out.close() -# clip_raster is copied from openelec.clustering -def clip_raster(raster, boundary, boundary_layer=None): +def clip_raster( + raster: Union[str, Path, DatasetReader], + boundary: Union[str, Path, GeoDataFrame], + boundary_layer: Optional[str] = None, +) -> Tuple[ + np.ndarray, + Affine, + dict, +]: """Clip the raster to the given administrative boundary. Parameters ---------- - raster : string, pathlib.Path or rasterio.io.DataSetReader - Location of or already opened raster. - boundary : string, pathlib.Path or geopandas.GeoDataFrame - The polygon by which to clip the raster. - boundary_layer : string, optional - For multi-layer files (like GeoPackage), specify the layer to be used. - + raster : Location of or already opened raster. + boundary : The polygon by which to clip the raster. + boundary_layer : For multi-layer files (like GeoPackage), specify layer to be used. Returns ------- - tuple - Three elements: - clipped : numpy.ndarray - Contents of clipped raster. - affine : affine.Affine() - Information for mapping pixel coordinates - to a coordinate system. - crs : dict - Dict of the form {'init': 'epsg:4326'} defining the coordinate - reference system of the raster. - + clipped : Contents of clipped raster. + affine : The affine + crs : form {'init': 'epsg:4326'} """ - if isinstance(raster, Path): - raster = str(raster) - if isinstance(raster, str): - raster = rasterio.open(raster) + raster_ds = raster if isinstance(raster, DatasetReader) else rasterio.open(raster) - if isinstance(boundary, Path): - boundary = str(boundary) - if isinstance(boundary, str): - if ".gpkg" in boundary: - driver = "GPKG" - else: - driver = None # default to shapefile - boundary_layer = "" # because shapefiles have no layers - - boundary = gpd.read_file(boundary, layer=boundary_layer, driver=driver) + boundary_gdf: GeoDataFrame = ( + boundary + if isinstance(boundary, GeoDataFrame) + else gpd.read_file(boundary, layer=boundary_layer) + ) - if not (boundary.crs == raster.crs or boundary.crs == raster.crs.data): - boundary = boundary.to_crs(crs=raster.crs) - coords = [json.loads(boundary.to_json())["features"][0]["geometry"]] + if not ( + boundary_gdf.crs == raster_ds.crs or boundary_gdf.crs == raster_ds.crs.data + ): + boundary_gdf = boundary_gdf.to_crs(crs=raster_ds.crs) # type: ignore + coords = [json.loads(boundary_gdf.to_json())["features"][0]["geometry"]] # mask/clip the raster using rasterio.mask - clipped, affine = mask(dataset=raster, shapes=coords, crop=True) + clipped, affine = mask(dataset=raster_ds, shapes=coords, crop=True) if len(clipped.shape) >= 3: clipped = clipped[0] - return clipped, affine, raster.crs + return clipped, affine, raster_ds.crs diff --git a/gridfinder/gridfinder.py b/gridfinder/gridfinder.py index 5f39a11..6968e48 100644 --- a/gridfinder/gridfinder.py +++ b/gridfinder/gridfinder.py @@ -1,48 +1,40 @@ """ Implements Dijkstra's algorithm on a cost-array to create an MST. - -Functions: - -- get_targets_costs -- estimate_mem_use -- optimise """ import os +import pickle import sys +from heapq import heapify, heappop, heappush from math import sqrt -from heapq import heapify, heappush, heappop -import pickle +from typing import Optional, Tuple import numpy as np import rasterio -from IPython.display import display, Markdown +from affine import Affine +from IPython.display import Markdown, display from gridfinder._util import save_raster sys.setrecursionlimit(100000) -def get_targets_costs(targets_in, costs_in): +def get_targets_costs( + targets_in: str, costs_in: str +) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int], Affine]: """Load the targets and costs arrays from the given file paths. Parameters ---------- - targets_in : str - Path for targets raster. - costs_in : str - Path for costs raster. + targets_in : Path for targets raster. + costs_in : Path for costs raster. Returns ------- - targets : numpy array - 2D array of targets - costs: numpy array - 2D array of costs - start: tuple - Two-element tuple with row, col of starting point. - affine : affine.Affine - Affine transformation for the rasters. + targets : 2D array of targets + costs: 2D array of costs + start: tuple with row, col of starting point. + affine : Affine transformation for the rasters. """ targets_ra = rasterio.open(targets_in) @@ -61,20 +53,17 @@ def get_targets_costs(targets_in, costs_in): return targets, costs, start, affine -def estimate_mem_use(targets, costs): +def estimate_mem_use(targets: np.ndarray, costs: np.ndarray) -> float: """Estimate memory usage in GB, probably not very accurate. Parameters ---------- - targets : numpy array - 2D array of targets. - costs : numpy array - 2D array of costs. + targets : 2D array of targets. + costs : 2D array of costs. Returns ------- - est_mem : float - Estimated memory requirement in GB. + est_mem : Estimated memory requirement in GB. """ # make sure these match the ones used in optimise below @@ -89,31 +78,27 @@ def estimate_mem_use(targets, costs): def optimise( - targets, - costs, - start, - jupyter=False, - animate=False, - affine=None, - animate_path=None, - silent=False, -): + targets: np.ndarray, + costs: np.ndarray, + start: Tuple[int, int], + jupyter: bool = False, + animate: bool = False, + affine: Optional[Affine] = None, + animate_path: Optional[str] = None, + silent: bool = False, +) -> np.ndarray: """Run the Dijkstra algorithm for the supplied arrays. Parameters ---------- - targets : numpy array - 2D array of targets. - costs : numpy array - 2D array of costs. - start : tuple - Two-element tuple with row, col of starting point. - jupyter : boolean, optional (default False) - Whether the code is being run from a Jupyter Notebook. + targets : 2D array of targets. + costs : 2D array of costs. + start : tuple with row, col of starting point. + jupyter : Whether the code is being run from a Jupyter Notebook. Returns ------- - dist : numpy array + dist : 2D array with the distance (in cells) of each point from a 'found' on-grid point. Values of 0 imply that cell is part of an MV grid line. """ @@ -135,13 +120,12 @@ def optimise( queue = [[0, start]] heapify(queue) - def zero_and_heap_path(loc): + def zero_and_heap_path(loc: Tuple[int, int]) -> None: """Zero the location's distance value and follow upstream doing same. Parameters ---------- - loc : tuple - row, col of current point. + loc : row, col of current point. """ if not dist[loc] == 0: diff --git a/gridfinder/post.py b/gridfinder/post.py index a206304..f758551 100644 --- a/gridfinder/post.py +++ b/gridfinder/post.py @@ -1,48 +1,45 @@ """ Post-processing for gridfinder package. - -Functions: - -- threshold -- thin -- raster_to_lines -- accuracy -- true_positives -- false_negatives -- flip_arr_values """ from pathlib import Path +from typing import Optional, Tuple, Union +import geopandas as gpd import numpy as np import pandas as pd -import geopandas as gpd -from skimage.morphology import skeletonize -import shapely.wkt -from shapely.geometry import Point, LineString import rasterio +import shapely.wkt +from affine import Affine from rasterio.features import rasterize from rasterio.transform import xy +from shapely.geometry import LineString, Point +from skimage.morphology import skeletonize -def threshold(dists_in, cutoff=0.0): +def threshold( + dists_in: Union[str, Path, np.ndarray], cutoff: float = 0.0 +) -> Tuple[np.ndarray, Optional[Affine]]: """Convert distance array into binary array of connected locations. Parameters ---------- - dists_in : path-like or numpy array - 2D array output from gridfinder algorithm. - cutoff : float, optional (default 0.5.) - Cutoff value below which consider the cells to be grid. + dists_in : 2D array output from gridfinder algorithm. + cutoff : Cutoff value below which consider the cells to be grid. Returns ------- - guess : numpy array - Binary representation of input array. - affine: affine.Affine - Affine transformation for raster. + guess : Binary representation of input array. + affine: Affine transformation for raster. """ - if isinstance(dists_in, (str, Path)): + if isinstance(dists_in, np.ndarray): + guess = dists_in.copy() + guess[dists_in > cutoff] = 0 + guess[dists_in <= cutoff] = 1 + + return guess, None + + else: dists_rd = rasterio.open(dists_in) dists_r = dists_rd.read(1) affine = dists_rd.transform @@ -53,35 +50,26 @@ def threshold(dists_in, cutoff=0.0): return guess, affine - elif isinstance(dists_in, np.ndarray): - guess = dists_in.copy() - guess[dists_in > cutoff] = 0 - guess[dists_in <= cutoff] = 1 - - return guess - - else: - raise ValueError - -def thin(guess_in): +def thin(guess_in: Union[str, Path, np.ndarray]) -> Tuple[np.ndarray, Optional[Affine]]: """ Use scikit-image skeletonize to 'thin' the guess raster. Parameters ---------- - guess_in : path-like or 2D array - Output from threshold(). + guess_in : Output from threshold(). Returns ------- - guess_skel : numpy array - Thinned version. - affine : Affine - Only if path-like supplied. + guess_skel : Thinned version. + affine : Only if path-like supplied. """ - if isinstance(guess_in, (str, Path)): + if isinstance(guess_in, np.ndarray): + guess_skel = skeletonize(guess_in) + guess_skel = guess_skel.astype("int32") + return guess_skel, None + else: guess_rd = rasterio.open(guess_in) guess_arr = guess_rd.read(1) affine = guess_rd.transform @@ -91,29 +79,18 @@ def thin(guess_in): return guess_skel, affine - elif isinstance(guess_in, np.ndarray): - guess_skel = skeletonize(guess_in) - guess_skel = guess_skel.astype("int32") - - return guess_skel - - else: - raise ValueError - -def raster_to_lines(guess_skel_in): +def raster_to_lines(guess_skel_in: Union[str, Path]) -> gpd.GeoDataFrame: """ Convert thinned raster to linestring geometry. Parameters ---------- - guess_skel_in : path-like - Output from thin(). + guess_skel_in : Output from thin(). Returns ------- - guess_gdf : GeoDataFrame - Converted to geometry. + guess_gdf : Converted to geometry. """ rast = rasterio.open(guess_skel_in) @@ -172,19 +149,20 @@ def raster_to_lines(guess_skel_in): return guess_gdf -def accuracy(grid_in, guess_in, aoi_in, buffer_amount=0.01): +def accuracy( + grid_in: Union[str, Path], + guess_in: Union[str, Path], + aoi_in: Union[str, Path], + buffer_amount: float = 0.01, +) -> Tuple[float, float]: """Measure accuracy against a specified grid 'truth' file. Parameters ---------- - grid_in : str, Path - Path to vector truth file. - guess_in : str, Path - Path to guess output from guess2geom. - aoi_in : str, Path - Path to AOI feature. - buffer_amount : float, optional (default 0.01.) - Leeway in decimal degrees in calculating equivalence. + grid_in : Path to vector truth file. + guess_in : Path to guess output from guess2geom. + aoi_in : Path to AOI feature. + buffer_amount : Leeway in decimal degrees in calculating equivalence. 0.01 DD equals approximately 1 mile at the equator. """ @@ -229,20 +207,17 @@ def accuracy(grid_in, guess_in, aoi_in, buffer_amount=0.01): return tp, fn -def true_positives(guesses, truths): +def true_positives(guesses: np.ndarray, truths: np.ndarray) -> float: """Calculate true positives, used by accuracy(). Parameters ---------- - guesses : numpy array - Output from model. - truths : numpy array - Truth feature converted to array. + guesses : Output from model. + truths : Truth feature converted to array. Returns ------- - tp : float - Ratio of true positives. + tp : Ratio of true positives. """ yes_guesses = 0 @@ -264,20 +239,17 @@ def true_positives(guesses, truths): return tp -def false_negatives(guesses, truths): +def false_negatives(guesses: np.ndarray, truths: np.ndarray) -> float: """Calculate false negatives, used by accuracy(). Parameters ---------- - guesses : numpy array - Output from model. - truths : numpy array - Truth feature converted to array. + guesses : Output from model. + truths : Truth feature converted to array. Returns ------- - fn : float - Ratio of false negatives. + fn : Ratio of false negatives. """ actual_grid = 0 @@ -318,7 +290,7 @@ def false_negatives(guesses, truths): return fn -def flip_arr_values(arr): +def flip_arr_values(arr: np.ndarray) -> np.ndarray: """Simple helper function used by accuracy()""" arr[arr == 1] = 2 diff --git a/gridfinder/prepare.py b/gridfinder/prepare.py index b96411b..120a770 100644 --- a/gridfinder/prepare.py +++ b/gridfinder/prepare.py @@ -1,47 +1,40 @@ """ Prepare input layers for gridfinder. - -Functions: - -- clip_rasters -- merge_rasters -- filter_func -- create_filter -- prepare_ntl -- drop_zero_pop -- prepare_roads """ +import json import os from math import sqrt -import json from pathlib import Path - -import numpy as np -from scipy import signal +from typing import Optional, Tuple, Union import fiona +import geopandas as gpd +import numpy as np import rasterio -from rasterio.mask import mask -from rasterio.features import rasterize +from geopandas.geodataframe import GeoDataFrame from rasterio import Affine -from rasterio.warp import reproject, Resampling +from rasterio.features import rasterize +from rasterio.mask import mask +from rasterio.warp import Resampling, reproject +from scipy import signal -import geopandas as gpd -from gridfinder._util import save_raster, clip_raster +from gridfinder._util import clip_raster, save_raster -def clip_rasters(folder_in, folder_out, aoi_in, debug=False): +def clip_rasters( + folder_in: Union[str, Path], + folder_out: Union[str, Path], + aoi_in: Union[str, Path], + debug: bool = False, +) -> None: """Read continental rasters one at a time, clip to AOI and save Parameters ---------- - folder_in : str, Path - Path to directory containing rasters. - folder_out : str, Path - Path to directory to save clipped rasters. - aoi_in : str, Path - Path to an AOI file (readable by Fiona) to use for clipping. + folder_in : Path to directory containing rasters. + folder_out : Path to directory to save clipped rasters. + aoi_in : Path to an AOI file (readable by Fiona) to use for clipping. """ if isinstance(aoi_in, gpd.GeoDataFrame): @@ -61,28 +54,27 @@ def clip_rasters(folder_in, folder_out, aoi_in, debug=False): if ntl.ndim == 3: ntl = ntl[0] - save_raster(folder_out / file_path, ntl, affine) + save_raster(Path(folder_out) / file_path, ntl, affine) -def merge_rasters(folder, percentile=70): +def merge_rasters( + folder: Union[str, Path], + percentile: int = 70, +) -> Tuple[np.ndarray, Optional[Affine]]: """Merge a set of monthly rasters keeping the nth percentile value. Used to remove transient features from time-series data. Parameters ---------- - folder : str, Path - Folder containing rasters to be merged. - percentile : int, optional (default 70.) - Percentile value to use when merging using np.nanpercentile. + folder : Folder containing rasters to be merged. + percentile : Percentile value to use when merging using np.nanpercentile. Lower values will result in lower values/brightness. Returns ------- - raster_merged : numpy array - The merged array. - affine : affine.Affine - The affine transformation for the merged raster. + raster_merged : The merged array. + affine : The affine transformation for the merged raster. """ affine = None @@ -103,12 +95,12 @@ def merge_rasters(folder, percentile=70): return raster_merged, affine -def filter_func(i, j): +def filter_func(i: float, j: float) -> float: """Function used in creating raster filter.""" d_rows = abs(i - 20) d_cols = abs(j - 20) - d = sqrt(d_rows ** 2 + d_cols ** 2) + d = sqrt(d_rows**2 + d_cols**2) if d == 0: return 0.0 @@ -116,7 +108,7 @@ def filter_func(i, j): return 1 / (1 + d / 2) ** 3 -def create_filter(): +def create_filter() -> np.ndarray: """Create and return a numpy array filter to be applied to the raster.""" vec_filter_func = np.vectorize(filter_func) ntl_filter = np.fromfunction(vec_filter_func, (41, 41), dtype=float) @@ -126,32 +118,31 @@ def create_filter(): return ntl_filter -def prepare_ntl(ntl_in, aoi_in, ntl_filter=None, threshold=0.1, upsample_by=2): +def prepare_ntl( + ntl_in: Union[str, Path], + aoi_in: Union[str, Path], + ntl_filter: Optional[np.ndarray] = None, + threshold: float = 0.1, + upsample_by: int = 2, +) -> Tuple[np.ndarray, Affine]: """Convert the supplied NTL raster and output an array of electrified cells as targets for the algorithm. Parameters ---------- - ntl_in : str, Path - Path to an NTL raster file. - aoi_in : str, Path - Path to a Fiona-readable AOI file. - ntl_filter : numpy array, optional (defaults to create_filter()) - The filter will be convolved over the raster. - threshold : float, optional (default 0.1.) - The threshold to apply after filtering, values above + ntl_in : Path to an NTL raster file. + aoi_in : Path to a Fiona-readable AOI file. + ntl_filter : The filter will be convolved over the raster. + threshold : The threshold to apply after filtering, values above are considered electrified. - upsample_by : int, optional (default 2.) - The factor by which to upsample the input raster, applied to both axes + upsample_by : The factor by which to upsample the input raster, applied to both axes (so a value of 2 results in a raster 4 times bigger). This is to allow the roads detail to be captured in higher resolution. Returns ------- - ntl_thresh : numpy array - Array of cells of value 0 (not electrified) or 1 (electrified). - newaff : affine.Affine - Affine raster transformation for the returned array. + ntl_thresh : Array of cells of value 0 (not electrified) or 1 (electrified). + newaff : Affine raster transformation for the returned array. """ if isinstance(aoi_in, gpd.GeoDataFrame): @@ -212,22 +203,22 @@ def prepare_ntl(ntl_in, aoi_in, ntl_filter=None, threshold=0.1, upsample_by=2): return ntl_thresh, newaff -def drop_zero_pop(targets_in, pop_in, aoi): +def drop_zero_pop( + targets_in: Union[str, Path], + pop_in: Union[str, Path], + aoi: Union[str, Path, GeoDataFrame], +) -> np.ndarray: """Drop electrified cells with no other evidence of human activity. Parameters ---------- - targets_in : str, Path - Path to output from prepare_ntl() - pop_in : str, Path - Path to a population raster such as GHS or HRSL. - aoi : str, Path or GeoDataFrame - An AOI to use to clip the population raster. + targets_in : Path to output from prepare_ntl() + pop_in : Path to a population raster such as GHS or HRSL. + aoi : An AOI to use to clip the population raster. Returns ------- - targets : numpy array - Array with zero population sites dropped. + Array with zero population sites dropped. """ if isinstance(aoi, (str, Path)): @@ -265,7 +256,7 @@ def drop_zero_pop(targets_in, pop_in, aoi): max_i = targets.shape[0] max_j = targets.shape[1] - def add_around(blob, cell): + def add_around(blob: list, cell: Tuple[int, int]) -> list: blob.append(cell) skip.append(cell) @@ -305,26 +296,25 @@ def add_around(blob, cell): return targets -def prepare_roads(roads_in, aoi_in, ntl_in): +def prepare_roads( + roads_in: Union[str, Path], + aoi_in: Union[str, Path, GeoDataFrame], + ntl_in: Union[str, Path], +) -> Tuple[np.ndarray, Affine]: """Prepare a roads feature layer for use in algorithm. Parameters ---------- - roads_in : str, Path - Path to a roads feature layer. This implementation is specific to + roads_in : Path to a roads feature layer. This implementation is specific to OSM data and won't assign proper weights to other data inputs. - aoi_in : str, Path or GeoDataFrame - AOI to clip roads. - ntl_in : str, Path - Path to a raster file, only used for correct shape and + aoi_in : AOI to clip roads. + ntl_in : Path to a raster file, only used for correct shape and affine of roads raster. Returns ------- - roads_raster : numpy array - Roads as a raster array with the value being the cost of traversing. - affine : affine.Affine - Affine raster transformation for the new raster (same as ntl_in). + roads_raster : Roads as a raster array with the value being the cost of traversing. + affine : Affine raster transformation for the new raster (same as ntl_in). """ ntl_rd = rasterio.open(ntl_in) diff --git a/gridfinder/py.typed b/gridfinder/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml index 6e14662..46b1826 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,7 @@ dev = [ "descartes ~= 1.1.0", "folium ~= 0.15", "matplotlib ~= 3.8", + "ruff ~= 0.2", "seaborn ~= 0.13", ] diff --git a/test.sh b/test.sh index d423709..8fa681b 100755 --- a/test.sh +++ b/test.sh @@ -12,3 +12,5 @@ sed -i -e 's/plt.imshow(.*)//g' example.py # run script python example.py + +rm example.py example.py-e