From 3850d58a0a373d216b64c0e3d40632b2339e591d Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 24 Jul 2024 09:27:58 -0700 Subject: [PATCH 01/37] Adds geospatial infra for reading rasters/vectors from objectstore in an easy way --- .vscode/settings.json | 4 + api/app/.env.example | 1 + api/app/utils/geospatial.py | 165 ++++++++++++++++++++++++++++++++++++ 3 files changed, 170 insertions(+) create mode 100644 api/app/utils/geospatial.py diff --git a/.vscode/settings.json b/.vscode/settings.json index ac35de154..3a857d490 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -65,7 +65,10 @@ "firezone", "GDPS", "GEOGCS", + "geopackage", "geospatial", + "geotiff", + "gpkg", "grib", "gribs", "hourlies", @@ -83,6 +86,7 @@ "PRIMEM", "PROJCS", "RDPS", + "reproject", "rocketchat", "sfms", "sqlalchemy", diff --git a/api/app/.env.example b/api/app/.env.example index d9f8cfebf..ec235c083 100644 --- a/api/app/.env.example +++ b/api/app/.env.example @@ -60,4 +60,5 @@ OBJECT_STORE_SECRET=object_store_secret OBJECT_STORE_BUCKET=object_store_bucket DEM_NAME=dem_mosaic_250_max.tif TPI_DEM_NAME=bc_dem_50m_tpi.tif +CLASSIFIED_TPI_DEM_NAME=bc_dem_50m_tpi_win100_classified.tif SENTRY_DSN=some_dsn \ No newline at end of file diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py new file mode 100644 index 000000000..4e55bcda7 --- /dev/null +++ b/api/app/utils/geospatial.py @@ -0,0 +1,165 @@ +import os +from dataclasses import dataclass +import logging +from typing import Any, Optional +from aiobotocore.client import AioBaseClient +from botocore.exceptions import ClientError +from osgeo import gdal + +logger = logging.getLogger(__name__) + + +@dataclass +class Vector2RasterOptions: + """ + Inputs required for burning vector geospatial data into a raster + """ + + source_geotransform: Any + source_projection: Any + source_x_size: int + source_y_size: int + + +@dataclass +class GeospatialOptions: + """ + Options supplied by caller to declare optional outputs + """ + + vector_options: Optional[Vector2RasterOptions] = None + include_geotransform: Optional[bool] = True + include_projection: Optional[bool] = True + include_x_size: Optional[bool] = None + include_y_size: Optional[bool] = None + + +@dataclass +class GeospatialReadResult: + """ + Raster data array and caller requested metadata + """ + + data_array: Any + data_geotransform: Optional[Any] + data_projection: Optional[Any] + data_x_size: Optional[int] + data_y_size: Optional[int] + + +def get_geospatial_metadata(data_source, options: Optional[GeospatialOptions]): + """ + Extracts metadata defined by the options parameter from the data_source raster + + :param data_source: input raster to extract metadata from + :param options: defines the metadata the caller is interested in extracting from the raster + :return: optional metadata of geotransform, project, x and y sizes from the raster + """ + geotransform = None + projection = None + x_size = None + y_size = None + if options is None: + return (geotransform, projection.x_size, y_size) + + if options.include_geotransform: + geotransform = data_source.GetGeoTransform() + + if options.include_projection: + projection = data_source.GetProjection() + + if options.include_x_size: + x_size = data_source.RasterXSize + + if options.include_y_size: + y_size = data_source.RasterYSize + + return (geotransform, projection, x_size, y_size) + + +def read_raster_data(data_source): + """ + Read raster data and return as array from the data source + + :param data_source: data source to read the data from + :return: raster data as array + """ + data_band = data_source.GetRasterBand(1) + data_array = data_band.ReadAsArray() + return data_array + + +def get_raster_data(data_source, options: Optional[GeospatialOptions]) -> GeospatialReadResult: + """ + Returns the raster data and optional metadata the caller is interested defined by the options parameter + + :param data_source: raster data source + :param options: options defining the metadata the caller is interested in + :return: raster data array and potential metadata + """ + (data_geotransform, data_projection, data_x_size, data_y_size) = get_geospatial_metadata(data_source, options) + data_array = read_raster_data(data_source) + data_source = None + del data_source + return GeospatialReadResult(data_array=data_array, data_geotransform=data_geotransform, data_projection=data_projection, data_x_size=data_x_size, data_y_size=data_y_size) + + +def _get_raster_data_source(key: str, s3_data, options: Optional[GeospatialOptions]): + """ + Retrieves geospatial file from S3. If the options include Vector2RasterOptions, it will transform + the vector into a raster and return the raster datasource. If no Vector2RasterOptions are included, + it will assume the file is a raster and try to read it as a raster. The caller is expected to know whether + the geospatial file is a raster or vector and supply the appropriate options. + + :param key: the object store key pointing to the desired geospatial file + :param s3_data: the object store content blob + :param options: options defining the desired metadata as well as vector options if the file is a vector + :return: raster data source + """ + mem_path = f"/vsimem/{key}" + gdal.FileFromMemBuffer(mem_path, s3_data) + raster_ds = None + if options is not None and options.vector_options is not None: + vector_ds = gdal.OpenEx(mem_path, gdal.OF_VECTOR) + # peel off path, then extension, then attach .tif extension + filename = ((key.split("/")[-1]).split(".")[0]) + ".tif" + target_path = os.path.join(os.getcwd(), f"{filename}") + + # Create the output raster + driver = gdal.GetDriverByName("GTiff") + output_raster_ds = driver.Create(target_path, options.vector_options.source_x_size, options.vector_options.source_y_size, 1, gdal.GDT_Float32) + # Set the geotransform and projection on the output raster + output_raster_ds.SetGeoTransform(options.vector_options.source_geotransform) + output_raster_ds.SetProjection(options.vector_options.source_projection) + gdal.Rasterize(output_raster_ds, vector_ds, bands=[1]) + raster_ds = output_raster_ds + else: + raster_ds = gdal.Open(mem_path, gdal.GA_ReadOnly) + + gdal.Unlink(mem_path) + return raster_ds + + +async def read_raster_into_memory(client: AioBaseClient, bucket: str, key: str, options: Optional[GeospatialOptions]) -> Optional[GeospatialReadResult]: + """ + Reads in the desired geospatial data as raw content bytes then returns the content as a data array and metadata + defined by the options the caller is interested in. + + :param client: object store client + :param bucket: the bucket the data is in + :param key: the key identifying the geospatial data in the bucket + :param options: options defining the desired metadata the user is interested in retrieving and vector + options if the geospatial file is a vector file + :return: raster data array and optional metadata + """ + try: + s3_source = await client.get_object(Bucket=bucket, Key=key) + s3_data = await s3_source["Body"].read() + data_source = _get_raster_data_source(key, s3_data, options) + return get_raster_data(data_source, options) + except ClientError as ex: + if ex.response["Error"]["Code"] == "NoSuchKey": + logger.info("No object found for key: %s", key) + return None + else: + raise From 354806a8fa5e2e42c280daa99a5abc7536500e5f Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 24 Jul 2024 12:43:38 -0700 Subject: [PATCH 02/37] Make rasterization in memory --- api/app/utils/geospatial.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index 4e55bcda7..bbcc38fe7 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -123,11 +123,11 @@ def _get_raster_data_source(key: str, s3_data, options: Optional[GeospatialOptio vector_ds = gdal.OpenEx(mem_path, gdal.OF_VECTOR) # peel off path, then extension, then attach .tif extension filename = ((key.split("/")[-1]).split(".")[0]) + ".tif" - target_path = os.path.join(os.getcwd(), f"{filename}") + mem_path = f"/vsimem/{filename}" # Create the output raster driver = gdal.GetDriverByName("GTiff") - output_raster_ds = driver.Create(target_path, options.vector_options.source_x_size, options.vector_options.source_y_size, 1, gdal.GDT_Float32) + output_raster_ds = driver.Create(mem_path, options.vector_options.source_x_size, options.vector_options.source_y_size, 1, gdal.GDT_Byte) # Set the geotransform and projection on the output raster output_raster_ds.SetGeoTransform(options.vector_options.source_geotransform) output_raster_ds.SetProjection(options.vector_options.source_projection) From 2e76f0a0d6654117aef1526c2b8ff6262c5c9dad Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 24 Jul 2024 12:53:03 -0700 Subject: [PATCH 03/37] Put code in a module with parameters --- .../auto_spatial_advisory/tpi_area_summary.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 api/app/auto_spatial_advisory/tpi_area_summary.py diff --git a/api/app/auto_spatial_advisory/tpi_area_summary.py b/api/app/auto_spatial_advisory/tpi_area_summary.py new file mode 100644 index 000000000..d503dfd62 --- /dev/null +++ b/api/app/auto_spatial_advisory/tpi_area_summary.py @@ -0,0 +1,45 @@ +from datetime import date +import numpy as np +from app.auto_spatial_advisory.run_type import RunType +from app.utils.s3 import get_client +from app import config +from app.utils.geospatial import GeospatialOptions, Vector2RasterOptions, read_raster_into_memory + + +async def summarize_advisory_area_by_tpi(forecast_or_actual: RunType, for_date: date): + """ + Masks classified HFI against classified TPI and counts frequency of TPI classes with classified HFI pixels. + + :return: Mapping of TPI class to HFI pixel count + """ + pmtiles_filename = f'hfi{for_date.strftime("%Y%m%d")}.pmtiles' + iso_date = for_date.isoformat() + + async with get_client() as (client, bucket): + tpi_result = await read_raster_into_memory( + client, + bucket, + f'dem/tpi/{config.get("CLASSIFIED_TPI_DEM_NAME")}', + GeospatialOptions(include_geotransform=True, include_projection=True, include_x_size=True, include_y_size=True), + ) + hfi_result = await read_raster_into_memory( + client, + bucket, + f"psu/pmtiles/hfi/{forecast_or_actual.value}/{iso_date}/{pmtiles_filename}", + GeospatialOptions( + vector_options=Vector2RasterOptions( + source_geotransform=tpi_result.data_geotransform, + source_projection=tpi_result.data_projection, + source_x_size=tpi_result.data_x_size, + source_y_size=tpi_result.data_y_size, + ) + ), + ) + # transforms all pixels that are 255 to 1, this is what we get from pmtiles + hfi_4k_or_above = np.where(hfi_result.data_array == 255, 1, hfi_result.data_array) + hfi_masked_tpi = np.multiply(tpi_result.data_array, hfi_4k_or_above) + # Get unique values and their counts + tpi_classes, counts = np.unique(hfi_masked_tpi, return_counts=True) + tpi_class_freq_dist = dict(zip(tpi_classes, counts)) + + return tpi_class_freq_dist From cedd2d0b81e52006026bfe2f12c1dde3030437c9 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 25 Jul 2024 12:57:09 -0700 Subject: [PATCH 04/37] Update api/app/utils/geospatial.py Co-authored-by: dgboss --- api/app/utils/geospatial.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index bbcc38fe7..a3029de5e 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -60,7 +60,7 @@ def get_geospatial_metadata(data_source, options: Optional[GeospatialOptions]): x_size = None y_size = None if options is None: - return (geotransform, projection.x_size, y_size) + return (geotransform, projection, x_size, y_size) if options.include_geotransform: geotransform = data_source.GetGeoTransform() From 615e2b1bb8966d2ca1986253abfe559532bf5796 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 25 Jul 2024 13:53:11 -0700 Subject: [PATCH 05/37] Parameterize raster band --- api/app/utils/geospatial.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index a3029de5e..980792822 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -1,4 +1,3 @@ -import os from dataclasses import dataclass import logging from typing import Any, Optional @@ -77,14 +76,15 @@ def get_geospatial_metadata(data_source, options: Optional[GeospatialOptions]): return (geotransform, projection, x_size, y_size) -def read_raster_data(data_source): +def read_raster_data(data_source, band=1): """ Read raster data and return as array from the data source :param data_source: data source to read the data from + :param band: the band from the data source to read from :return: raster data as array """ - data_band = data_source.GetRasterBand(1) + data_band = data_source.GetRasterBand(band) data_array = data_band.ReadAsArray() return data_array From 7e10bbd8b280a8d1abc84b66a30a863c98d75126 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 25 Jul 2024 14:13:07 -0700 Subject: [PATCH 06/37] Use separate path variables for in memory s3 blob and in memory generated raster --- api/app/utils/geospatial.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index 980792822..1e7ffc259 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -116,27 +116,28 @@ def _get_raster_data_source(key: str, s3_data, options: Optional[GeospatialOptio :param options: options defining the desired metadata as well as vector options if the file is a vector :return: raster data source """ - mem_path = f"/vsimem/{key}" - gdal.FileFromMemBuffer(mem_path, s3_data) + s3_data_mem_path = f"/vsimem/{key}" + gdal.FileFromMemBuffer(s3_data_mem_path, s3_data) raster_ds = None if options is not None and options.vector_options is not None: - vector_ds = gdal.OpenEx(mem_path, gdal.OF_VECTOR) + vector_ds = gdal.OpenEx(s3_data_mem_path, gdal.OF_VECTOR) # peel off path, then extension, then attach .tif extension filename = ((key.split("/")[-1]).split(".")[0]) + ".tif" - mem_path = f"/vsimem/{filename}" + raster_mem_path = f"/vsimem/{filename}" # Create the output raster driver = gdal.GetDriverByName("GTiff") - output_raster_ds = driver.Create(mem_path, options.vector_options.source_x_size, options.vector_options.source_y_size, 1, gdal.GDT_Byte) + output_raster_ds = driver.Create(raster_mem_path, options.vector_options.source_x_size, options.vector_options.source_y_size, 1, gdal.GDT_Byte) # Set the geotransform and projection on the output raster output_raster_ds.SetGeoTransform(options.vector_options.source_geotransform) output_raster_ds.SetProjection(options.vector_options.source_projection) gdal.Rasterize(output_raster_ds, vector_ds, bands=[1]) raster_ds = output_raster_ds + gdal.Unlink(raster_mem_path) else: - raster_ds = gdal.Open(mem_path, gdal.GA_ReadOnly) + raster_ds = gdal.Open(s3_data_mem_path, gdal.GA_ReadOnly) - gdal.Unlink(mem_path) + gdal.Unlink(s3_data_mem_path) return raster_ds From c57f4b3a5f1c8a3ffb08a19240935504b6311e1b Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 25 Jul 2024 14:44:10 -0700 Subject: [PATCH 07/37] Add advisory TPI table --- .vscode/settings.json | 5 +- .../6a635a12a71d_add_advisory_tpi_table.py | 73 ++++++++ api/app/db/models/auto_spatial_advisory.py | 158 ++++++++++-------- 3 files changed, 163 insertions(+), 73 deletions(-) create mode 100644 api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py diff --git a/.vscode/settings.json b/.vscode/settings.json index 3a857d490..c404f0eef 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -54,6 +54,7 @@ ], "typescript.preferences.importModuleSpecifier": "non-relative", "cSpell.words": [ + "actuals", "aiobotocore", "Albers", "allclose", @@ -62,8 +63,10 @@ "botocore", "cffdrs", "excinfo", + "FBAN", "firezone", "GDPS", + "geoalchemy", "GEOGCS", "geopackage", "geospatial", @@ -73,10 +76,10 @@ "gribs", "hourlies", "HRDPS", + "luxon", "maxx", "maxy", "miny", - "luxon", "ndarray", "numba", "osgeo", diff --git a/api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py b/api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py new file mode 100644 index 000000000..bf1ea58a5 --- /dev/null +++ b/api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py @@ -0,0 +1,73 @@ +"""add advisory tpi table + +Revision ID: 6a635a12a71d +Revises: be128a7bb4fd +Create Date: 2024-07-25 14:42:55.305830 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "6a635a12a71d" +down_revision = "be128a7bb4fd" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic ### + op.create_table( + "advisory_tpi_stats", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("advisory_shape_id", sa.Integer(), nullable=False), + sa.Column("threshold", sa.Integer(), nullable=False), + sa.Column("run_parameters", sa.Integer(), nullable=False), + sa.Column("valley_bottom", sa.Integer(), nullable=False), + sa.Column("mid_slope", sa.Integer(), nullable=False), + sa.Column("upper_slope", sa.Integer(), nullable=False), + sa.Column("raster_height", sa.Integer(), nullable=False), + sa.Column("raster_width", sa.Integer(), nullable=False), + sa.Column("pixel_size_metres", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["advisory_shape_id"], + ["advisory_shapes.id"], + ), + sa.ForeignKeyConstraint( + ["run_parameters"], + ["run_parameters.id"], + ), + sa.ForeignKeyConstraint( + ["threshold"], + ["advisory_hfi_classification_threshold.id"], + ), + sa.PrimaryKeyConstraint("id"), + comment="Elevation TPI stats per fire shape", + ) + op.create_index(op.f("ix_advisory_tpi_stats_advisory_shape_id"), "advisory_tpi_stats", ["advisory_shape_id"], unique=False) + op.create_index(op.f("ix_advisory_tpi_stats_id"), "advisory_tpi_stats", ["id"], unique=False) + op.create_index(op.f("ix_advisory_tpi_stats_mid_slope"), "advisory_tpi_stats", ["mid_slope"], unique=False) + op.create_index(op.f("ix_advisory_tpi_stats_pixel_size_metres"), "advisory_tpi_stats", ["pixel_size_metres"], unique=False) + op.create_index(op.f("ix_advisory_tpi_stats_raster_height"), "advisory_tpi_stats", ["raster_height"], unique=False) + op.create_index(op.f("ix_advisory_tpi_stats_raster_width"), "advisory_tpi_stats", ["raster_width"], unique=False) + op.create_index(op.f("ix_advisory_tpi_stats_run_parameters"), "advisory_tpi_stats", ["run_parameters"], unique=False) + op.create_index(op.f("ix_advisory_tpi_stats_upper_slope"), "advisory_tpi_stats", ["upper_slope"], unique=False) + op.create_index(op.f("ix_advisory_tpi_stats_valley_bottom"), "advisory_tpi_stats", ["valley_bottom"], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic ### + op.drop_index(op.f("ix_advisory_tpi_stats_valley_bottom"), table_name="advisory_tpi_stats") + op.drop_index(op.f("ix_advisory_tpi_stats_upper_slope"), table_name="advisory_tpi_stats") + op.drop_index(op.f("ix_advisory_tpi_stats_run_parameters"), table_name="advisory_tpi_stats") + op.drop_index(op.f("ix_advisory_tpi_stats_raster_width"), table_name="advisory_tpi_stats") + op.drop_index(op.f("ix_advisory_tpi_stats_raster_height"), table_name="advisory_tpi_stats") + op.drop_index(op.f("ix_advisory_tpi_stats_pixel_size_metres"), table_name="advisory_tpi_stats") + op.drop_index(op.f("ix_advisory_tpi_stats_mid_slope"), table_name="advisory_tpi_stats") + op.drop_index(op.f("ix_advisory_tpi_stats_id"), table_name="advisory_tpi_stats") + op.drop_index(op.f("ix_advisory_tpi_stats_advisory_shape_id"), table_name="advisory_tpi_stats") + op.drop_table("advisory_tpi_stats") + # ### end Alembic commands ### diff --git a/api/app/db/models/auto_spatial_advisory.py b/api/app/db/models/auto_spatial_advisory.py index 940233699..b3cd8bf9e 100644 --- a/api/app/db/models/auto_spatial_advisory.py +++ b/api/app/db/models/auto_spatial_advisory.py @@ -1,5 +1,5 @@ import enum -from sqlalchemy import (Integer, Date, String, Float, Column, Index, ForeignKey, Enum, UniqueConstraint) +from sqlalchemy import Integer, Date, String, Float, Column, Index, ForeignKey, Enum, UniqueConstraint from app.db.models.common import TZTimeStamp from geoalchemy2 import Geometry from app.db.models import Base @@ -7,40 +7,43 @@ from app.geospatial import NAD83_BC_ALBERS from sqlalchemy.dialects import postgresql + class ShapeTypeEnum(enum.Enum): - """ Define different shape types. e.g. "Zone", "Fire Centre" - later we may add - "Incident"/"Fire", "Custom" etc. etc. """ + """Define different shape types. e.g. "Zone", "Fire Centre" - later we may add + "Incident"/"Fire", "Custom" etc. etc.""" + fire_centre = 1 fire_zone = 2 fire_zone_unit = 3 class RunTypeEnum(enum.Enum): - """ Define different run types. e.g. "Forecast", "Actual" """ + """Define different run types. e.g. "Forecast", "Actual" """ + forecast = "forecast" actual = "actual" class ShapeType(Base): - """ Identify some kind of area type, e.g. "Zone", or "Fire" """ - __tablename__ = 'advisory_shape_types' - __table_args__ = ( - {'comment': 'Identify kind of advisory area (e.g. Zone, Fire etc.)'} - ) + """Identify some kind of area type, e.g. "Zone", or "Fire" """ + + __tablename__ = "advisory_shape_types" + __table_args__ = {"comment": "Identify kind of advisory area (e.g. Zone, Fire etc.)"} id = Column(Integer, primary_key=True) name = Column(Enum(ShapeTypeEnum), nullable=False, unique=True, index=True) class Shape(Base): - """ Identify some area of interest with respect to advisories. """ - __tablename__ = 'advisory_shapes' + """Identify some area of interest with respect to advisories.""" + + __tablename__ = "advisory_shapes" __table_args__ = ( # we may have to re-visit this constraint - but for the time being, the idea is # that for any given type of area, it has to be unique for the kind of thing that # it is. e.g. a zone has some id. - UniqueConstraint('source_identifier', 'shape_type'), - {'comment': 'Record identifying some area of interest with respect to advisories'} + UniqueConstraint("source_identifier", "shape_type"), + {"comment": "Record identifying some area of interest with respect to advisories"}, ) id = Column(Integer, primary_key=True) @@ -52,23 +55,22 @@ class Shape(Base): # Have to make this column nullable to start because the table already exists. Will be # modified in subsequent migration to nullable=False combustible_area = Column(Float, nullable=True) - geom = Column(Geometry('MULTIPOLYGON', spatial_index=False, srid=NAD83_BC_ALBERS), nullable=False) + geom = Column(Geometry("MULTIPOLYGON", spatial_index=False, srid=NAD83_BC_ALBERS), nullable=False) label = Column(String, nullable=True, index=False) fire_centre = Column(Integer, ForeignKey(FireCentre.id), nullable=True, index=True) # Explict creation of index due to issue with alembic + geoalchemy. -Index('idx_advisory_shapes_geom', Shape.geom, postgresql_using='gist') +Index("idx_advisory_shapes_geom", Shape.geom, postgresql_using="gist") class HfiClassificationThreshold(Base): - __tablename__ = 'advisory_hfi_classification_threshold' - __table_args__ = ( - {'comment': 'The Operational Safe Works Standards specifies that an hfi of greater than ' - '4000 should result in an advisory. However in order for an FBAN to create ' - 'useful information, there are other thresholds of concern. E.g. > 10000' - } - ) + __tablename__ = "advisory_hfi_classification_threshold" + __table_args__ = { + "comment": "The Operational Safe Works Standards specifies that an hfi of greater than " + "4000 should result in an advisory. However in order for an FBAN to create " + "useful information, there are other thresholds of concern. E.g. > 10000" + } id = Column(Integer, primary_key=True, index=True) # e.g. '4000 < hfi < 10000' or 'hfi >= 10000' description = Column(String, nullable=False) @@ -77,45 +79,44 @@ class HfiClassificationThreshold(Base): class ClassifiedHfi(Base): - """ HFI classified into different groups. + """HFI classified into different groups. NOTE: In actual fact, forecasts and actuals can be run multiple times per day, - but we only care about the most recent one, so we only store the date, not the timesamp. + but we only care about the most recent one, so we only store the date, not the timestamp. """ - __tablename__ = 'advisory_classified_hfi' - __table_args__ = ( - {'comment': 'HFI classification for some forecast/advisory run on some day, for some date'} - ) + + __tablename__ = "advisory_classified_hfi" + __table_args__ = {"comment": "HFI classification for some forecast/advisory run on some day, for some date"} id = Column(Integer, primary_key=True, index=True) threshold = Column(Integer, ForeignKey(HfiClassificationThreshold.id), nullable=False, index=True) run_type = Column(Enum(RunTypeEnum), nullable=False, index=True) run_datetime = Column(TZTimeStamp, nullable=False) for_date = Column(Date, nullable=False) - geom = Column(Geometry('POLYGON', spatial_index=False, srid=NAD83_BC_ALBERS)) + geom = Column(Geometry("POLYGON", spatial_index=False, srid=NAD83_BC_ALBERS)) -# Explict creation of index due to issue with alembic + geoalchemy. -Index('idx_advisory_classified_hfi_geom', ClassifiedHfi.geom, postgresql_using='gist') +# Explicit creation of index due to issue with alembic + geoalchemy. +Index("idx_advisory_classified_hfi_geom", ClassifiedHfi.geom, postgresql_using="gist") class FuelType(Base): - """ Identify some kind of fuel type. """ - __tablename__ = 'advisory_fuel_types' - __table_args__ = ( - {'comment': 'Identify some kind of fuel type'} - ) + """Identify some kind of fuel type.""" + + __tablename__ = "advisory_fuel_types" + __table_args__ = {"comment": "Identify some kind of fuel type"} id = Column(Integer, primary_key=True, index=True) fuel_type_id = Column(Integer, nullable=False, index=True) - geom = Column(Geometry('POLYGON', spatial_index=False, srid=NAD83_BC_ALBERS)) + geom = Column(Geometry("POLYGON", spatial_index=False, srid=NAD83_BC_ALBERS)) # Explict creation of index due to issue with alembic + geoalchemy. -Index('idx_advisory_fuel_types_geom', FuelType.geom, postgresql_using='gist') +Index("idx_advisory_fuel_types_geom", FuelType.geom, postgresql_using="gist") class SFMSFuelType(Base): - """ Fuel types used by SFMS system """ - __tablename__ = 'sfms_fuel_types' - __table_args__ = ({'comment': 'Fuel types used by SFMS to calculate HFI spatially'}) + """Fuel types used by SFMS system""" + + __tablename__ = "sfms_fuel_types" + __table_args__ = {"comment": "Fuel types used by SFMS to calculate HFI spatially"} id = Column(Integer, primary_key=True, index=True) fuel_type_id = Column(Integer, nullable=False, index=True) fuel_type_code = Column(String, nullable=False) @@ -123,27 +124,25 @@ class SFMSFuelType(Base): class RunParameters(Base): - """ Combination of type of run (actual vs forecast), run datetime and for date.""" - __tablename__ = 'run_parameters' - __table_args__ = ( - UniqueConstraint('run_type', 'run_datetime', 'for_date'), - {'comment': 'A combination of run type, run datetime and for date.'} - ) + """Combination of type of run (actual vs forecast), run datetime and for date.""" + + __tablename__ = "run_parameters" + __table_args__ = (UniqueConstraint("run_type", "run_datetime", "for_date"), {"comment": "A combination of run type, run datetime and for date."}) id = Column(Integer, primary_key=True, index=True) - run_type = Column(postgresql.ENUM('actual', 'forecast', name='runtypeenum', - create_type=False), nullable=False, index=True) + run_type = Column(postgresql.ENUM("actual", "forecast", name="runtypeenum", create_type=False), nullable=False, index=True) run_datetime = Column(TZTimeStamp, nullable=False, index=True) for_date = Column(Date, nullable=False, index=True) + class HighHfiArea(Base): - """ Area exceeding HFI thresholds per fire zone. """ - __tablename__ = 'high_hfi_area' - __table_args__ = ( - {'comment': 'Area under advisory/warning per fire zone. advisory_area refers to the total area ' - 'in a fire zone with HFI values between 4000 - 10000 and warn_area refers to the total ' - 'area in a fire zone with HFI values exceeding 10000.' - } - ) + """Area exceeding HFI thresholds per fire zone.""" + + __tablename__ = "high_hfi_area" + __table_args__ = { + "comment": "Area under advisory/warning per fire zone. advisory_area refers to the total area " + "in a fire zone with HFI values between 4000 - 10000 and warn_area refers to the total " + "area in a fire zone with HFI values exceeding 10000." + } id = Column(Integer, primary_key=True, index=True) advisory_shape_id = Column(Integer, ForeignKey(Shape.id), nullable=False) @@ -153,16 +152,13 @@ class HighHfiArea(Base): class AdvisoryElevationStats(Base): - """ + """ Summary statistics about the elevation of area with high hfi (4k-10k and >10k) per fire shape based on the set run_type, for_date and run_datetime. """ - __tablename__ = 'advisory_elevation_stats' - __table_args__ = ( - { - 'comment': 'Elevation stats per fire shape by advisory threshold' - } - ) + + __tablename__ = "advisory_elevation_stats" + __table_args__ = {"comment": "Elevation stats per fire shape by advisory threshold"} id = Column(Integer, primary_key=True, index=True) advisory_shape_id = Column(Integer, ForeignKey(Shape.id), nullable=False, index=True) threshold = Column(Integer, ForeignKey(HfiClassificationThreshold.id), nullable=False) @@ -175,20 +171,38 @@ class AdvisoryElevationStats(Base): class AdvisoryFuelStats(Base): - """ + """ Summary statistics about the fuel type of area with high hfi (4k-10k and >10k) per fire shape based on the set run_type, for_date and run_datetime. """ - __tablename__ = 'advisory_fuel_stats' - __table_args__ = ( - UniqueConstraint('advisory_shape_id', 'threshold', 'run_parameters', 'fuel_type'), - { - 'comment': 'Fuel type stats per fire shape by advisory threshold' - } - ) + + __tablename__ = "advisory_fuel_stats" + __table_args__ = (UniqueConstraint("advisory_shape_id", "threshold", "run_parameters", "fuel_type"), {"comment": "Fuel type stats per fire shape by advisory threshold"}) id = Column(Integer, primary_key=True, index=True) advisory_shape_id = Column(Integer, ForeignKey(Shape.id), nullable=False, index=True) threshold = Column(Integer, ForeignKey(HfiClassificationThreshold.id), nullable=False) run_parameters = Column(Integer, ForeignKey(RunParameters.id), nullable=False, index=True) fuel_type = Column(Integer, ForeignKey(SFMSFuelType.id), nullable=False, index=True) area = Column(Float, nullable=False) + + +class AdvisoryTPIStats(Base): + """ + Summary statistics about the elevation based on a classified Topographic Position Index (TPI). + Classification of the TPI are bucketed into 1 = valley bottom, 2 = mid slope, 3 = upper slope. + Each record includes the raster pixel count of the above classifications as well as the raster pixel size and resolution in metres + and an advisory shape the stats are related to. + """ + + __tablename__ = "advisory_tpi_stats" + __table_args__ = {"comment": "Elevation TPI stats per fire shape"} + id = Column(Integer, primary_key=True, index=True) + advisory_shape_id = Column(Integer, ForeignKey(Shape.id), nullable=False, index=True) + threshold = Column(Integer, ForeignKey(HfiClassificationThreshold.id), nullable=False) + run_parameters = Column(Integer, ForeignKey(RunParameters.id), nullable=False, index=True) + valley_bottom = Column(Integer, nullable=False, index=True) + mid_slope = Column(Integer, nullable=False, index=True) + upper_slope = Column(Integer, nullable=False, index=True) + raster_height = Column(Integer, nullable=False, index=True) + raster_width = Column(Integer, nullable=False, index=True) + pixel_size_metres = Column(Integer, nullable=False, index=True) From 0d291ea2748a62bed12f8fb16ad3df126862888f Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 25 Jul 2024 16:05:37 -0700 Subject: [PATCH 08/37] Introduce cutline warp to intersect rasters by fire zone shape --- .vscode/settings.json | 2 ++ api/app/auto_spatial_advisory/elevation.py | 2 +- api/app/utils/geospatial.py | 23 +++++++++++++++++++--- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index c404f0eef..8ad49569e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -62,6 +62,7 @@ "APCP", "botocore", "cffdrs", + "cutline", "excinfo", "FBAN", "firezone", @@ -80,6 +81,7 @@ "maxx", "maxy", "miny", + "morecast", "ndarray", "numba", "osgeo", diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index e97b4fdeb..8acd98b40 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -205,7 +205,7 @@ def intersect_raster_by_firezone(threshold: int, advisory_shape_id: int, source_ :param threshold: The current threshold being processed, 1 = 4k-10k, 2 = > 10k :param advisory_shape_id: The id of the fire zone (aka advisory_shape object) to clip with - :param source_identifier: The source identifer of the fire zone. + :param source_identifier: The source identifier of the fire zone. :param raster_path: The path to the raster to be clipped. :param temp_dir: A temporary location for storing intermediate files """ diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index 1e7ffc259..f7b1c638f 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -5,6 +5,8 @@ from botocore.exceptions import ClientError from osgeo import gdal +from app.db.database import DB_READ_STRING + logger = logging.getLogger(__name__) @@ -40,6 +42,7 @@ class GeospatialReadResult: """ data_array: Any + data_source: Any data_geotransform: Optional[Any] data_projection: Optional[Any] data_x_size: Optional[int] @@ -99,9 +102,9 @@ def get_raster_data(data_source, options: Optional[GeospatialOptions]) -> Geospa """ (data_geotransform, data_projection, data_x_size, data_y_size) = get_geospatial_metadata(data_source, options) data_array = read_raster_data(data_source) - data_source = None - del data_source - return GeospatialReadResult(data_array=data_array, data_geotransform=data_geotransform, data_projection=data_projection, data_x_size=data_x_size, data_y_size=data_y_size) + return GeospatialReadResult( + data_array=data_array, data_source=data_source, data_geotransform=data_geotransform, data_projection=data_projection, data_x_size=data_x_size, data_y_size=data_y_size + ) def _get_raster_data_source(key: str, s3_data, options: Optional[GeospatialOptions]): @@ -164,3 +167,17 @@ async def read_raster_into_memory(client: AioBaseClient, bucket: str, key: str, return None else: raise + + +def cut_raster_by_shape_id(advisory_shape_id: int, source_identifier: str, data_source: Any, options: Optional[GeospatialOptions]) -> GeospatialReadResult: + """ + Given a raster dataset and a fire zone id, use gdal.Warp to clip out a fire zone from which we can retrieve stats. + + :param advisory_shape_id: The id of the fire zone (aka advisory_shape object) to clip with + :param source_identifier: The source identifier of the fire zone. + :param data_source: The source raster to be clipped. + """ + output_path = f"/vsimem/firezone_{source_identifier}.tif" + warp_options = gdal.WarpOptions(format="GTiff", cutlineDSName=DB_READ_STRING, cutlineSQL=f"SELECT geom FROM advisory_shapes WHERE id={advisory_shape_id}", cropToCutline=True) + output_dataset = gdal.Warp(output_path, data_source, options=warp_options) + return get_raster_data(output_dataset, options) From 16ccfadee732db52f03d4f260fa000836e3cdea1 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 25 Jul 2024 16:26:34 -0700 Subject: [PATCH 09/37] Add TPI processing logic --- api/app/auto_spatial_advisory/elevation.py | 57 ++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index 8acd98b40..d9b7b0d98 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -5,6 +5,7 @@ import logging import os import tempfile +from typing import Dict import numpy as np from osgeo import gdal from sqlalchemy.ext.asyncio import AsyncSession @@ -17,6 +18,7 @@ from app.db.database import get_async_read_session_scope, get_async_write_session_scope, DB_READ_STRING from app.db.models.auto_spatial_advisory import AdvisoryElevationStats from app.utils.s3 import get_client +from app.utils.geospatial import GeospatialOptions, Vector2RasterOptions, cut_raster_by_shape_id, read_raster_into_memory logger = logging.getLogger(__name__) @@ -178,6 +180,61 @@ def apply_threshold_mask_to_dem(threshold: int, mask_path: str, temp_dir: str): return masked_dem_path +async def process_tpi_by_firezone(run_parameters_id: int): + """ + Given run parameters, lookup associated snow-masked HFI and static classified TPI geospatial data. + Cut out each fire zone shape from the above and intersect the TPI and HFI pixels, counting each pixel contributing to the TPI class. + Capture all fire zone stats keyed by its source_identifier. + + :param run_parameters_id: The RunParameter object id associated with this run_type, for_date and run_datetime + :return: dictionary of {source_identifier: {1: X, 2: Y, 3: Z}}, where 1 = valley bottom, 2 = mid slope, 3 = upper slope + and X, Y, Z are pixel counts at each of those elevation classes respectively. + """ + raster_options = GeospatialOptions(include_geotransform=True, include_projection=True, include_x_size=True, include_y_size=True) + fire_zone_stats: Dict[int, Dict[int, int]] = {} + + async with get_client() as (client, bucket): + tpi_result = await read_raster_into_memory( + client, + bucket, + f'dem/tpi/{config.get("CLASSIFIED_TPI_DEM_NAME")}', + raster_options, + ) + hfi_result = await read_raster_into_memory( + client, + bucket, + "psu/pmtiles/hfi/actual/2024-07-18/hfi20240718.pmtiles", # TODO inform hfi vector lookup by run_parameters_id + GeospatialOptions( + vector_options=Vector2RasterOptions( + source_geotransform=tpi_result.data_geotransform, + source_projection=tpi_result.data_projection, + source_x_size=tpi_result.data_x_size, + source_y_size=tpi_result.data_y_size, + ) + ), + ) + async with get_async_write_session_scope() as session: + stmt = text("SELECT id, source_identifier FROM advisory_shapes;") + result = await session.execute(stmt) + rows = result.all() + + for row in rows: + cut_tpi_result = cut_raster_by_shape_id(row[0], row[1], tpi_result.data_source, raster_options) + cut_hfi_result = cut_raster_by_shape_id(row[0], row[1], hfi_result.data_source, raster_options) + # transforms all pixels that are 255 to 1, this is what we get from pmtiles + hfi_4k_or_above = np.where(cut_hfi_result.data_array == 255, 1, cut_hfi_result.data_array) + hfi_masked_tpi = np.multiply(cut_tpi_result.data_array, hfi_4k_or_above) + # Get unique values and their counts + tpi_classes, counts = np.unique(hfi_masked_tpi, return_counts=True) + tpi_class_freq_dist = dict(zip(tpi_classes, counts)) + + # Drop TPI class 4, this is the no data value from the TPI raster + tpi_class_freq_dist.pop(4, None) + fire_zone_stats[row[1]] = tpi_class_freq_dist + + return fire_zone_stats + + async def process_elevation_by_firezone(threshold: int, masked_dem_path: str, run_parameters_id: int): """ Given a tif that only contains elevations values at pixels where HFI exceeds the threshold, calculate statistics From c79722fd93d7182f71c503dff1e2e26474256831 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Mon, 29 Jul 2024 17:54:22 -0700 Subject: [PATCH 10/37] Remove pixel height and width from table --- api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py | 6 ------ api/app/db/models/auto_spatial_advisory.py | 2 -- 2 files changed, 8 deletions(-) diff --git a/api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py b/api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py index bf1ea58a5..c5120ae11 100644 --- a/api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py +++ b/api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py @@ -28,8 +28,6 @@ def upgrade(): sa.Column("valley_bottom", sa.Integer(), nullable=False), sa.Column("mid_slope", sa.Integer(), nullable=False), sa.Column("upper_slope", sa.Integer(), nullable=False), - sa.Column("raster_height", sa.Integer(), nullable=False), - sa.Column("raster_width", sa.Integer(), nullable=False), sa.Column("pixel_size_metres", sa.Integer(), nullable=False), sa.ForeignKeyConstraint( ["advisory_shape_id"], @@ -50,8 +48,6 @@ def upgrade(): op.create_index(op.f("ix_advisory_tpi_stats_id"), "advisory_tpi_stats", ["id"], unique=False) op.create_index(op.f("ix_advisory_tpi_stats_mid_slope"), "advisory_tpi_stats", ["mid_slope"], unique=False) op.create_index(op.f("ix_advisory_tpi_stats_pixel_size_metres"), "advisory_tpi_stats", ["pixel_size_metres"], unique=False) - op.create_index(op.f("ix_advisory_tpi_stats_raster_height"), "advisory_tpi_stats", ["raster_height"], unique=False) - op.create_index(op.f("ix_advisory_tpi_stats_raster_width"), "advisory_tpi_stats", ["raster_width"], unique=False) op.create_index(op.f("ix_advisory_tpi_stats_run_parameters"), "advisory_tpi_stats", ["run_parameters"], unique=False) op.create_index(op.f("ix_advisory_tpi_stats_upper_slope"), "advisory_tpi_stats", ["upper_slope"], unique=False) op.create_index(op.f("ix_advisory_tpi_stats_valley_bottom"), "advisory_tpi_stats", ["valley_bottom"], unique=False) @@ -63,8 +59,6 @@ def downgrade(): op.drop_index(op.f("ix_advisory_tpi_stats_valley_bottom"), table_name="advisory_tpi_stats") op.drop_index(op.f("ix_advisory_tpi_stats_upper_slope"), table_name="advisory_tpi_stats") op.drop_index(op.f("ix_advisory_tpi_stats_run_parameters"), table_name="advisory_tpi_stats") - op.drop_index(op.f("ix_advisory_tpi_stats_raster_width"), table_name="advisory_tpi_stats") - op.drop_index(op.f("ix_advisory_tpi_stats_raster_height"), table_name="advisory_tpi_stats") op.drop_index(op.f("ix_advisory_tpi_stats_pixel_size_metres"), table_name="advisory_tpi_stats") op.drop_index(op.f("ix_advisory_tpi_stats_mid_slope"), table_name="advisory_tpi_stats") op.drop_index(op.f("ix_advisory_tpi_stats_id"), table_name="advisory_tpi_stats") diff --git a/api/app/db/models/auto_spatial_advisory.py b/api/app/db/models/auto_spatial_advisory.py index b3cd8bf9e..539ca790a 100644 --- a/api/app/db/models/auto_spatial_advisory.py +++ b/api/app/db/models/auto_spatial_advisory.py @@ -203,6 +203,4 @@ class AdvisoryTPIStats(Base): valley_bottom = Column(Integer, nullable=False, index=True) mid_slope = Column(Integer, nullable=False, index=True) upper_slope = Column(Integer, nullable=False, index=True) - raster_height = Column(Integer, nullable=False, index=True) - raster_width = Column(Integer, nullable=False, index=True) pixel_size_metres = Column(Integer, nullable=False, index=True) From 55c42a58aa2e5acea26fdb6897f4ecd93ebef233 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Mon, 29 Jul 2024 18:17:59 -0700 Subject: [PATCH 11/37] Warping only rasters --- .vscode/settings.json | 1 + api/app/auto_spatial_advisory/elevation.py | 11 +++-- .../auto_spatial_advisory/tpi_area_summary.py | 45 ------------------- api/app/utils/geospatial.py | 40 ++++++++--------- 4 files changed, 26 insertions(+), 71 deletions(-) delete mode 100644 api/app/auto_spatial_advisory/tpi_area_summary.py diff --git a/.vscode/settings.json b/.vscode/settings.json index bfe65fefd..f1e4f992f 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -83,6 +83,7 @@ "miny", "morecast", "ndarray", + "Neighbour", "numba", "osgeo", "PMTILES", diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index d9b7b0d98..2b79b4813 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -18,7 +18,7 @@ from app.db.database import get_async_read_session_scope, get_async_write_session_scope, DB_READ_STRING from app.db.models.auto_spatial_advisory import AdvisoryElevationStats from app.utils.s3 import get_client -from app.utils.geospatial import GeospatialOptions, Vector2RasterOptions, cut_raster_by_shape_id, read_raster_into_memory +from app.utils.geospatial import GeospatialOptions, WarpOptions, cut_raster_by_shape_id, read_raster_into_memory logger = logging.getLogger(__name__) @@ -203,9 +203,9 @@ async def process_tpi_by_firezone(run_parameters_id: int): hfi_result = await read_raster_into_memory( client, bucket, - "psu/pmtiles/hfi/actual/2024-07-18/hfi20240718.pmtiles", # TODO inform hfi vector lookup by run_parameters_id + "psu/rasters/hfi/actual/2024-09-03/snow_masked_hfi20230903.pmtiles", # TODO inform hfi vector lookup by run_parameters_id GeospatialOptions( - vector_options=Vector2RasterOptions( + warp_options=WarpOptions( source_geotransform=tpi_result.data_geotransform, source_projection=tpi_result.data_projection, source_x_size=tpi_result.data_x_size, @@ -213,6 +213,7 @@ async def process_tpi_by_firezone(run_parameters_id: int): ) ), ) + async with get_async_write_session_scope() as session: stmt = text("SELECT id, source_identifier FROM advisory_shapes;") result = await session.execute(stmt) @@ -221,9 +222,7 @@ async def process_tpi_by_firezone(run_parameters_id: int): for row in rows: cut_tpi_result = cut_raster_by_shape_id(row[0], row[1], tpi_result.data_source, raster_options) cut_hfi_result = cut_raster_by_shape_id(row[0], row[1], hfi_result.data_source, raster_options) - # transforms all pixels that are 255 to 1, this is what we get from pmtiles - hfi_4k_or_above = np.where(cut_hfi_result.data_array == 255, 1, cut_hfi_result.data_array) - hfi_masked_tpi = np.multiply(cut_tpi_result.data_array, hfi_4k_or_above) + hfi_masked_tpi = np.multiply(cut_tpi_result.data_array, cut_hfi_result) # Get unique values and their counts tpi_classes, counts = np.unique(hfi_masked_tpi, return_counts=True) tpi_class_freq_dist = dict(zip(tpi_classes, counts)) diff --git a/api/app/auto_spatial_advisory/tpi_area_summary.py b/api/app/auto_spatial_advisory/tpi_area_summary.py deleted file mode 100644 index d503dfd62..000000000 --- a/api/app/auto_spatial_advisory/tpi_area_summary.py +++ /dev/null @@ -1,45 +0,0 @@ -from datetime import date -import numpy as np -from app.auto_spatial_advisory.run_type import RunType -from app.utils.s3 import get_client -from app import config -from app.utils.geospatial import GeospatialOptions, Vector2RasterOptions, read_raster_into_memory - - -async def summarize_advisory_area_by_tpi(forecast_or_actual: RunType, for_date: date): - """ - Masks classified HFI against classified TPI and counts frequency of TPI classes with classified HFI pixels. - - :return: Mapping of TPI class to HFI pixel count - """ - pmtiles_filename = f'hfi{for_date.strftime("%Y%m%d")}.pmtiles' - iso_date = for_date.isoformat() - - async with get_client() as (client, bucket): - tpi_result = await read_raster_into_memory( - client, - bucket, - f'dem/tpi/{config.get("CLASSIFIED_TPI_DEM_NAME")}', - GeospatialOptions(include_geotransform=True, include_projection=True, include_x_size=True, include_y_size=True), - ) - hfi_result = await read_raster_into_memory( - client, - bucket, - f"psu/pmtiles/hfi/{forecast_or_actual.value}/{iso_date}/{pmtiles_filename}", - GeospatialOptions( - vector_options=Vector2RasterOptions( - source_geotransform=tpi_result.data_geotransform, - source_projection=tpi_result.data_projection, - source_x_size=tpi_result.data_x_size, - source_y_size=tpi_result.data_y_size, - ) - ), - ) - # transforms all pixels that are 255 to 1, this is what we get from pmtiles - hfi_4k_or_above = np.where(hfi_result.data_array == 255, 1, hfi_result.data_array) - hfi_masked_tpi = np.multiply(tpi_result.data_array, hfi_4k_or_above) - # Get unique values and their counts - tpi_classes, counts = np.unique(hfi_masked_tpi, return_counts=True) - tpi_class_freq_dist = dict(zip(tpi_classes, counts)) - - return tpi_class_freq_dist diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index f7b1c638f..a7901b285 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -11,9 +11,9 @@ @dataclass -class Vector2RasterOptions: +class WarpOptions: """ - Inputs required for burning vector geospatial data into a raster + Inputs warping raster based on attributes from a source raster. """ source_geotransform: Any @@ -28,7 +28,7 @@ class GeospatialOptions: Options supplied by caller to declare optional outputs """ - vector_options: Optional[Vector2RasterOptions] = None + warp_options: Optional[WarpOptions] = None include_geotransform: Optional[bool] = True include_projection: Optional[bool] = True include_x_size: Optional[bool] = None @@ -109,34 +109,34 @@ def get_raster_data(data_source, options: Optional[GeospatialOptions]) -> Geospa def _get_raster_data_source(key: str, s3_data, options: Optional[GeospatialOptions]): """ - Retrieves geospatial file from S3. If the options include Vector2RasterOptions, it will transform - the vector into a raster and return the raster datasource. If no Vector2RasterOptions are included, - it will assume the file is a raster and try to read it as a raster. The caller is expected to know whether - the geospatial file is a raster or vector and supply the appropriate options. + Retrieves geospatial file from S3. If the options include WarpOptions, it will warp + the raster based on those options. If no WarpOptions are included, + it simply read and return the raster data as is. :param key: the object store key pointing to the desired geospatial file :param s3_data: the object store content blob - :param options: options defining the desired metadata as well as vector options if the file is a vector + :param options: options defining the desired metadata as well as warp options if desired :return: raster data source """ s3_data_mem_path = f"/vsimem/{key}" gdal.FileFromMemBuffer(s3_data_mem_path, s3_data) raster_ds = None - if options is not None and options.vector_options is not None: - vector_ds = gdal.OpenEx(s3_data_mem_path, gdal.OF_VECTOR) + if options is not None and options.warp_options is not None: + raster_ds = gdal.Open(s3_data_mem_path, gdal.GA_ReadOnly) # peel off path, then extension, then attach .tif extension filename = ((key.split("/")[-1]).split(".")[0]) + ".tif" - raster_mem_path = f"/vsimem/{filename}" - - # Create the output raster - driver = gdal.GetDriverByName("GTiff") - output_raster_ds = driver.Create(raster_mem_path, options.vector_options.source_x_size, options.vector_options.source_y_size, 1, gdal.GDT_Byte) - # Set the geotransform and projection on the output raster - output_raster_ds.SetGeoTransform(options.vector_options.source_geotransform) - output_raster_ds.SetProjection(options.vector_options.source_projection) - gdal.Rasterize(output_raster_ds, vector_ds, bands=[1]) + warped_mem_path = f"/vsimem/{filename}" + gdal.Warp( + warped_mem_path, + raster_ds, + dstSRS=options.warp_options.source_projection, + xRes=options.warp_options.source_x_size, + yRes=options.warp_options.source_y_size, + resampleAlg=gdal.GRA_NearestNeighbour, + ) + output_raster_ds = gdal.Open(warped_mem_path, gdal.GA_ReadOnly) raster_ds = output_raster_ds - gdal.Unlink(raster_mem_path) + gdal.Unlink(warped_mem_path) else: raster_ds = gdal.Open(s3_data_mem_path, gdal.GA_ReadOnly) From 41bbb07b90304b95d52d72b5edc2251a2a4a74ff Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 30 Jul 2024 14:49:54 -0700 Subject: [PATCH 12/37] Hook up rest of pipeline --- ...=> 144eb6a781dd_add_advisory_tpi_table.py} | 13 +- api/app/auto_spatial_advisory/elevation.py | 64 ++++- .../process_elevation_hfi.py | 18 +- api/app/db/crud/auto_spatial_advisory.py | 253 +++++++++--------- api/app/db/models/auto_spatial_advisory.py | 5 +- 5 files changed, 197 insertions(+), 156 deletions(-) rename api/alembic/versions/{6a635a12a71d_add_advisory_tpi_table.py => 144eb6a781dd_add_advisory_tpi_table.py} (89%) diff --git a/api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py b/api/alembic/versions/144eb6a781dd_add_advisory_tpi_table.py similarity index 89% rename from api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py rename to api/alembic/versions/144eb6a781dd_add_advisory_tpi_table.py index c5120ae11..62b230597 100644 --- a/api/alembic/versions/6a635a12a71d_add_advisory_tpi_table.py +++ b/api/alembic/versions/144eb6a781dd_add_advisory_tpi_table.py @@ -1,8 +1,8 @@ """add advisory tpi table -Revision ID: 6a635a12a71d +Revision ID: 144eb6a781dd Revises: be128a7bb4fd -Create Date: 2024-07-25 14:42:55.305830 +Create Date: 2024-07-30 14:44:42.050355 """ @@ -11,7 +11,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = "6a635a12a71d" +revision = "144eb6a781dd" down_revision = "be128a7bb4fd" branch_labels = None depends_on = None @@ -23,7 +23,6 @@ def upgrade(): "advisory_tpi_stats", sa.Column("id", sa.Integer(), nullable=False), sa.Column("advisory_shape_id", sa.Integer(), nullable=False), - sa.Column("threshold", sa.Integer(), nullable=False), sa.Column("run_parameters", sa.Integer(), nullable=False), sa.Column("valley_bottom", sa.Integer(), nullable=False), sa.Column("mid_slope", sa.Integer(), nullable=False), @@ -37,10 +36,6 @@ def upgrade(): ["run_parameters"], ["run_parameters.id"], ), - sa.ForeignKeyConstraint( - ["threshold"], - ["advisory_hfi_classification_threshold.id"], - ), sa.PrimaryKeyConstraint("id"), comment="Elevation TPI stats per fire shape", ) @@ -55,7 +50,7 @@ def upgrade(): def downgrade(): - # ### commands auto generated by Alembic ### + # ### commands auto generated by Alembic ### op.drop_index(op.f("ix_advisory_tpi_stats_valley_bottom"), table_name="advisory_tpi_stats") op.drop_index(op.f("ix_advisory_tpi_stats_upper_slope"), table_name="advisory_tpi_stats") op.drop_index(op.f("ix_advisory_tpi_stats_run_parameters"), table_name="advisory_tpi_stats") diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index 2b79b4813..9a32c8bf0 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -14,9 +14,10 @@ from app import config from app.auto_spatial_advisory.classify_hfi import classify_hfi from app.auto_spatial_advisory.run_type import RunType -from app.db.crud.auto_spatial_advisory import get_run_parameters_id, save_advisory_elevation_stats +from app.db.crud.auto_spatial_advisory import get_run_parameters_id, save_advisory_elevation_stats, save_advisory_elevation_tpi_stats from app.db.database import get_async_read_session_scope, get_async_write_session_scope, DB_READ_STRING -from app.db.models.auto_spatial_advisory import AdvisoryElevationStats +from app.db.models.auto_spatial_advisory import AdvisoryElevationStats, AdvisoryTPIStats +from app.auto_spatial_advisory.hfi_filepath import get_raster_filepath, get_raster_tif_filename from app.utils.s3 import get_client from app.utils.geospatial import GeospatialOptions, WarpOptions, cut_raster_by_shape_id, read_raster_into_memory @@ -25,6 +26,35 @@ DEM_GDAL_SOURCE = None +async def process_elevation_tpi(run_type: RunType, run_datetime: datetime, for_date: date): + """ + Create new elevation statistics records for the given parameters. + + :param hfi_s3_key: the object store key pointing to the hfi tif to intersect with tpi layer + :param run_type: The type of run to process. (is it a forecast or actual run?) + :param run_datetime: The date and time of the run to process. (when was the hfi file created?) + :param for_date: The date of the hfi to process. (when is the hfi for?) + """ + logger.info("Processing elevation stats %s for run date: %s, for date: %s", run_type, run_datetime, for_date) + perf_start = perf_counter() + # Get the id from run_parameters associated with the provided run_type, for_date and for_datetime + async with get_async_read_session_scope() as session: + run_parameters_id = await get_run_parameters_id(session, run_type, run_datetime, for_date) + + stmt = select(AdvisoryTPIStats).where(AdvisoryTPIStats.run_parameters == run_parameters_id) + + exists = (await session.execute(stmt)).scalars().first() is not None + if not exists: + fire_zone_stats = await process_tpi_by_firezone(run_type, run_datetime.date(), for_date) + await store_elevation_tpi_stats(session, run_parameters_id, fire_zone_stats) + else: + logger.info("Elevation stats already computed") + + perf_end = perf_counter() + delta = perf_end - perf_start + logger.info("%f delta count before and after processing elevation stats", delta) + + async def process_elevation(source_path: str, run_type: RunType, run_datetime: datetime, for_date: date): """Create new elevation statistics records for the given parameters. @@ -180,7 +210,7 @@ def apply_threshold_mask_to_dem(threshold: int, mask_path: str, temp_dir: str): return masked_dem_path -async def process_tpi_by_firezone(run_parameters_id: int): +async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: date) -> Dict[int, Dict[int, int]]: """ Given run parameters, lookup associated snow-masked HFI and static classified TPI geospatial data. Cut out each fire zone shape from the above and intersect the TPI and HFI pixels, counting each pixel contributing to the TPI class. @@ -193,6 +223,9 @@ async def process_tpi_by_firezone(run_parameters_id: int): raster_options = GeospatialOptions(include_geotransform=True, include_projection=True, include_x_size=True, include_y_size=True) fire_zone_stats: Dict[int, Dict[int, int]] = {} + hfi_raster_filename = get_raster_tif_filename(for_date) + hfi_raster_key = get_raster_filepath(run_date, run_type, hfi_raster_filename) + async with get_client() as (client, bucket): tpi_result = await read_raster_into_memory( client, @@ -203,7 +236,7 @@ async def process_tpi_by_firezone(run_parameters_id: int): hfi_result = await read_raster_into_memory( client, bucket, - "psu/rasters/hfi/actual/2024-09-03/snow_masked_hfi20230903.pmtiles", # TODO inform hfi vector lookup by run_parameters_id + hfi_raster_key, GeospatialOptions( warp_options=WarpOptions( source_geotransform=tpi_result.data_geotransform, @@ -317,3 +350,26 @@ async def store_elevation_stats(session: AsyncSession, threshold: int, shape_id: threshold=threshold, ) await save_advisory_elevation_stats(session, advisory_elevation_stats) + + +async def store_elevation_tpi_stats(session: AsyncSession, run_parameters_id: int, fire_zone_stats: Dict[int, Dict[int, int]]): + """ + Writes elevation TPI statistics to the database. + + :param shape_id: The advisory shape id. + :param run_parameters_id: The RunParameter object id associated with this run_type, for_date and run_datetime + :param fire_zone_stats: Dictionary keying shape id to a dictionary of classified tpi hfi pixel counts + """ + advisory_tpi_stats_list = [] + for shape_id, tpi_freq_count in fire_zone_stats: + advisory_tpi_stats = AdvisoryTPIStats( + advisory_shape_id=shape_id, + run_parameters=run_parameters_id, + valley_bottom=tpi_freq_count[1], + mid_slope=tpi_freq_count[2], + upper_slope=tpi_freq_count[3], + pixel_size_metres=2000, + ) + advisory_tpi_stats_list.append(advisory_tpi_stats) + + await save_advisory_elevation_tpi_stats(session, advisory_tpi_stats_list) diff --git a/api/app/auto_spatial_advisory/process_elevation_hfi.py b/api/app/auto_spatial_advisory/process_elevation_hfi.py index a1a800be8..c9d153446 100644 --- a/api/app/auto_spatial_advisory/process_elevation_hfi.py +++ b/api/app/auto_spatial_advisory/process_elevation_hfi.py @@ -1,31 +1,29 @@ -""" Code relating to processing HFI data related to elevation -""" +"""Code relating to processing HFI data related to elevation""" + import logging from datetime import date, datetime from time import perf_counter -from app.auto_spatial_advisory.common import get_s3_key -from app.auto_spatial_advisory.elevation import process_elevation +from app.auto_spatial_advisory.elevation import process_elevation_tpi from app.auto_spatial_advisory.run_type import RunType logger = logging.getLogger(__name__) async def process_hfi_elevation(run_type: RunType, run_date: date, run_datetime: datetime, for_date: date): - """ Create a new elevation based hfi analysis records for the given date. + """Create a new elevation based hfi analysis records for the given date. :param run_type: The type of run to process. (is it a forecast or actual run?) :param run_date: The date of the run to process. (when was the hfi file created?) :param for_date: The date of the hfi to process. (when is the hfi for?) """ - logger.info('Processing HFI elevation %s for run date: %s, for date: %s', run_type, run_date, for_date) + logger.info("Processing HFI elevation %s for run date: %s, for date: %s", run_type, run_date, for_date) perf_start = perf_counter() - key = get_s3_key(run_type, run_date, for_date) - logger.info(f'Key to HFI in object storage: {key}') + logger.info(f"Key to HFI in object storage: {key}") - await process_elevation(key, run_type, run_datetime, for_date) + await process_elevation_tpi(run_type, run_datetime, for_date) perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after processing HFI elevation', delta) + logger.info("%f delta count before and after processing HFI elevation", delta) diff --git a/api/app/db/crud/auto_spatial_advisory.py b/api/app/db/crud/auto_spatial_advisory.py index fb46a57c4..81e7f320d 100644 --- a/api/app/db/crud/auto_spatial_advisory.py +++ b/api/app/db/crud/auto_spatial_advisory.py @@ -9,23 +9,33 @@ from sqlalchemy.engine.row import Row from app.auto_spatial_advisory.run_type import RunType from app.db.models.auto_spatial_advisory import ( - AdvisoryFuelStats, Shape, ClassifiedHfi, HfiClassificationThreshold, SFMSFuelType, RunTypeEnum, - FuelType, HighHfiArea, RunParameters, AdvisoryElevationStats, ShapeType) + AdvisoryFuelStats, + Shape, + ClassifiedHfi, + HfiClassificationThreshold, + SFMSFuelType, + RunTypeEnum, + FuelType, + HighHfiArea, + RunParameters, + AdvisoryElevationStats, + AdvisoryTPIStats, + ShapeType, +) from app.db.models.hfi_calc import FireCentre logger = logging.getLogger(__name__) class HfiClassificationThresholdEnum(Enum): - """ Enum for the different HFI classification thresholds. """ - ADVISORY = 'advisory' - WARNING = 'warning' + """Enum for the different HFI classification thresholds.""" + ADVISORY = "advisory" + WARNING = "warning" -async def get_hfi_classification_threshold(session: AsyncSession, - name: HfiClassificationThresholdEnum) -> HfiClassificationThreshold: - stmt = select(HfiClassificationThreshold).where( - HfiClassificationThreshold.name == name.value) + +async def get_hfi_classification_threshold(session: AsyncSession, name: HfiClassificationThresholdEnum) -> HfiClassificationThreshold: + stmt = select(HfiClassificationThreshold).where(HfiClassificationThreshold.name == name.value) result = await session.execute(stmt) return result.scalars().first() @@ -39,7 +49,7 @@ async def save_fuel_type(session: AsyncSession, fuel_type: FuelType): async def get_fire_zone_unit_shape_type_id(session: AsyncSession): - statement = select(ShapeType).where(ShapeType.name == 'fire_zone_unit') + statement = select(ShapeType).where(ShapeType.name == "fire_zone_unit") result = await session.execute(statement) return result.scalars().first().id @@ -51,7 +61,7 @@ async def get_fire_zone_units(session: AsyncSession, fire_zone_type_id: int): async def get_combustible_area(session: AsyncSession): - """ Get the combustible area for each "shape". This is slow, and we don't expect it to run + """Get the combustible area for each "shape". This is slow, and we don't expect it to run in real time. This method isn't being used right now, but you can calculate the combustible area for each @@ -70,20 +80,19 @@ async def get_combustible_area(session: AsyncSession): ``` """ - logger.info('starting zone/combustible area intersection query') + logger.info("starting zone/combustible area intersection query") perf_start = perf_counter() - stmt = select(Shape.id, - Shape.source_identifier, - Shape.geom.ST_Area().label('zone_area'), - FuelType.geom.ST_Union().ST_Intersection(Shape.geom).ST_Area().label('combustible_area'))\ - .join(FuelType, FuelType.geom.ST_Intersects(Shape.geom))\ - .where(FuelType.fuel_type_id.not_in((-10000, 99, 100, 102, 103)))\ + stmt = ( + select(Shape.id, Shape.source_identifier, Shape.geom.ST_Area().label("zone_area"), FuelType.geom.ST_Union().ST_Intersection(Shape.geom).ST_Area().label("combustible_area")) + .join(FuelType, FuelType.geom.ST_Intersects(Shape.geom)) + .where(FuelType.fuel_type_id.not_in((-10000, 99, 100, 102, 103))) .group_by(Shape.id) + ) result = await session.execute(stmt) all_combustible = result.all() perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after hfi area + zone/area intersection query', delta) + logger.info("%f delta count before and after hfi area + zone/area intersection query", delta) return all_combustible @@ -91,7 +100,7 @@ async def get_all_hfi_thresholds(session: AsyncSession) -> List[HfiClassificatio """ Retrieve all records from advisory_hfi_classification_threshold table. """ - logger.info('retrieving HFI classification threshold info...') + logger.info("retrieving HFI classification threshold info...") stmt = select(HfiClassificationThreshold) result = await session.execute(stmt) @@ -99,9 +108,7 @@ async def get_all_hfi_thresholds(session: AsyncSession) -> List[HfiClassificatio for row in result.all(): threshold_object = row[0] - thresholds.append(HfiClassificationThreshold(id=threshold_object.id, - description=threshold_object.description, - name=threshold_object.name)) + thresholds.append(HfiClassificationThreshold(id=threshold_object.id, description=threshold_object.description, name=threshold_object.name)) return thresholds @@ -110,7 +117,7 @@ async def get_all_sfms_fuel_types(session: AsyncSession) -> List[SFMSFuelType]: """ Retrieve all records from sfms_fuel_types table """ - logger.info('retrieving SFMS fuel types info...') + logger.info("retrieving SFMS fuel types info...") stmt = select(SFMSFuelType) result = await session.execute(stmt) @@ -118,9 +125,7 @@ async def get_all_sfms_fuel_types(session: AsyncSession) -> List[SFMSFuelType]: for row in result.all(): fuel_type_object = row[0] - fuel_types.append(SFMSFuelType(fuel_type_id=fuel_type_object.fuel_type_id, - fuel_type_code=fuel_type_object.fuel_type_code, - description=fuel_type_object.description)) + fuel_types.append(SFMSFuelType(fuel_type_id=fuel_type_object.fuel_type_id, fuel_type_code=fuel_type_object.fuel_type_code, description=fuel_type_object.description)) return fuel_types @@ -144,87 +149,86 @@ async def get_precomputed_high_hfi_fuel_type_areas_for_shape(session: AsyncSessi all_results = result.all() perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after fuel types/high hfi/zone query', delta) + logger.info("%f delta count before and after fuel types/high hfi/zone query", delta) return all_results - -async def get_high_hfi_fuel_types_for_shape(session: AsyncSession, - run_type: RunTypeEnum, - run_datetime: datetime, - for_date: date, - shape_id: int) -> List[Row]: +async def get_high_hfi_fuel_types_for_shape(session: AsyncSession, run_type: RunTypeEnum, run_datetime: datetime, for_date: date, shape_id: int) -> List[Row]: """ Union of fuel types by fuel_type_id (1 multipolygon for each fuel type) Intersected with union of ClassifiedHfi for given run_type, run_datetime, and for_date for both 4K-10K and 10K+ HFI values Intersected with fire zone geom for a specific fire zone identified by ID """ - logger.info('starting fuel types/high hfi/zone intersection query for fire zone %s', str(shape_id)) + logger.info("starting fuel types/high hfi/zone intersection query for fire zone %s", str(shape_id)) perf_start = perf_counter() - stmt = select(Shape.source_identifier, FuelType.fuel_type_id, ClassifiedHfi.threshold, - func.sum(FuelType.geom.ST_Intersection(ClassifiedHfi.geom.ST_Intersection(Shape.geom)).ST_Area()).label('area'))\ - .join_from(ClassifiedHfi, Shape, ClassifiedHfi.geom.ST_Intersects(Shape.geom))\ - .join_from(ClassifiedHfi, FuelType, ClassifiedHfi.geom.ST_Intersects(FuelType.geom))\ - .where(ClassifiedHfi.run_type == run_type.value, ClassifiedHfi.for_date == for_date, - ClassifiedHfi.run_datetime == run_datetime, Shape.source_identifier == str(shape_id))\ - .group_by(Shape.source_identifier)\ - .group_by(FuelType.fuel_type_id)\ - .group_by(ClassifiedHfi.threshold)\ - .order_by(FuelType.fuel_type_id)\ + stmt = ( + select( + Shape.source_identifier, + FuelType.fuel_type_id, + ClassifiedHfi.threshold, + func.sum(FuelType.geom.ST_Intersection(ClassifiedHfi.geom.ST_Intersection(Shape.geom)).ST_Area()).label("area"), + ) + .join_from(ClassifiedHfi, Shape, ClassifiedHfi.geom.ST_Intersects(Shape.geom)) + .join_from(ClassifiedHfi, FuelType, ClassifiedHfi.geom.ST_Intersects(FuelType.geom)) + .where(ClassifiedHfi.run_type == run_type.value, ClassifiedHfi.for_date == for_date, ClassifiedHfi.run_datetime == run_datetime, Shape.source_identifier == str(shape_id)) + .group_by(Shape.source_identifier) + .group_by(FuelType.fuel_type_id) + .group_by(ClassifiedHfi.threshold) + .order_by(FuelType.fuel_type_id) .order_by(ClassifiedHfi.threshold) + ) result = await session.execute(stmt) perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after fuel types/high hfi/zone intersection query', delta) + logger.info("%f delta count before and after fuel types/high hfi/zone intersection query", delta) return result.all() -async def get_high_hfi_fuel_types(session: AsyncSession, - run_type: RunTypeEnum, - run_datetime: datetime, - for_date: date) -> List[Row]: +async def get_high_hfi_fuel_types(session: AsyncSession, run_type: RunTypeEnum, run_datetime: datetime, for_date: date) -> List[Row]: """ Union of fuel types by fuel_type_id (1 multipolygon for each fuel type) Intersected with union of ClassifiedHfi for given run_type, run_datetime, and for_date for both 4K-10K and 10K+ HFI values """ - logger.info('starting fuel types/high hfi/zone intersection query') + logger.info("starting fuel types/high hfi/zone intersection query") perf_start = perf_counter() - stmt = select(Shape.source_identifier, FuelType.fuel_type_id, ClassifiedHfi.threshold, - func.sum(FuelType.geom.ST_Intersection(ClassifiedHfi.geom.ST_Intersection(Shape.geom)).ST_Area()).label('area'))\ - .join_from(ClassifiedHfi, Shape, ClassifiedHfi.geom.ST_Intersects(Shape.geom))\ - .join_from(ClassifiedHfi, FuelType, ClassifiedHfi.geom.ST_Intersects(FuelType.geom))\ - .where(ClassifiedHfi.run_type == run_type.value, ClassifiedHfi.for_date == for_date, - ClassifiedHfi.run_datetime == run_datetime)\ - .group_by(Shape.source_identifier)\ - .group_by(FuelType.fuel_type_id)\ - .group_by(ClassifiedHfi.threshold)\ - .order_by(FuelType.fuel_type_id)\ + stmt = ( + select( + Shape.source_identifier, + FuelType.fuel_type_id, + ClassifiedHfi.threshold, + func.sum(FuelType.geom.ST_Intersection(ClassifiedHfi.geom.ST_Intersection(Shape.geom)).ST_Area()).label("area"), + ) + .join_from(ClassifiedHfi, Shape, ClassifiedHfi.geom.ST_Intersects(Shape.geom)) + .join_from(ClassifiedHfi, FuelType, ClassifiedHfi.geom.ST_Intersects(FuelType.geom)) + .where(ClassifiedHfi.run_type == run_type.value, ClassifiedHfi.for_date == for_date, ClassifiedHfi.run_datetime == run_datetime) + .group_by(Shape.source_identifier) + .group_by(FuelType.fuel_type_id) + .group_by(ClassifiedHfi.threshold) + .order_by(FuelType.fuel_type_id) .order_by(ClassifiedHfi.threshold) + ) logger.info(str(stmt)) result = await session.execute(stmt) perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after fuel types/high hfi/zone intersection query', delta) + logger.info("%f delta count before and after fuel types/high hfi/zone intersection query", delta) return result.all() -async def get_hfi_area(session: AsyncSession, - run_type: RunTypeEnum, - run_datetime: datetime, - for_date: date) -> List[Row]: - logger.info('gathering hfi area data') - stmt = select(Shape.id, Shape.source_identifier, Shape.combustible_area, HighHfiArea.id, - HighHfiArea.advisory_shape_id, HighHfiArea.threshold, HighHfiArea.area.label('hfi_area'))\ - .join(HighHfiArea, HighHfiArea.advisory_shape_id == Shape.id)\ - .join(RunParameters, RunParameters.id == HighHfiArea.run_parameters)\ - .where(cast(RunParameters.run_type, String) == run_type.value, - RunParameters.for_date == for_date, - RunParameters.run_datetime == run_datetime) + +async def get_hfi_area(session: AsyncSession, run_type: RunTypeEnum, run_datetime: datetime, for_date: date) -> List[Row]: + logger.info("gathering hfi area data") + stmt = ( + select(Shape.id, Shape.source_identifier, Shape.combustible_area, HighHfiArea.id, HighHfiArea.advisory_shape_id, HighHfiArea.threshold, HighHfiArea.area.label("hfi_area")) + .join(HighHfiArea, HighHfiArea.advisory_shape_id == Shape.id) + .join(RunParameters, RunParameters.id == HighHfiArea.run_parameters) + .where(cast(RunParameters.run_type, String) == run_type.value, RunParameters.for_date == for_date, RunParameters.run_datetime == run_datetime) + ) result = await session.execute(stmt) return result.all() @@ -234,29 +238,20 @@ async def get_run_datetimes(session: AsyncSession, run_type: RunTypeEnum, for_da Retrieve all distinct available run_datetimes for a given run_type and for_date, and return the run_datetimes in descending order (most recent is first) """ - stmt = select(RunParameters.run_datetime)\ - .where(RunParameters.run_type == run_type.value, RunParameters.for_date == for_date)\ - .distinct()\ - .order_by(RunParameters.run_datetime.desc()) + stmt = select(RunParameters.run_datetime).where(RunParameters.run_type == run_type.value, RunParameters.for_date == for_date).distinct().order_by(RunParameters.run_datetime.desc()) result = await session.execute(stmt) return result.all() -async def get_high_hfi_area(session: AsyncSession, - run_type: RunTypeEnum, - run_datetime: datetime, - for_date: date) -> List[Row]: - """ For each fire zone, get the area of HFI polygons in that zone that fall within the +async def get_high_hfi_area(session: AsyncSession, run_type: RunTypeEnum, run_datetime: datetime, for_date: date) -> List[Row]: + """For each fire zone, get the area of HFI polygons in that zone that fall within the 4000 - 10000 range and the area of HFI polygons that exceed the 10000 threshold. """ - stmt = select(HighHfiArea.id, - HighHfiArea.advisory_shape_id, - HighHfiArea.area, - HighHfiArea.threshold)\ - .join(RunParameters)\ - .where(cast(RunParameters.run_type, String) == run_type.value, - RunParameters.for_date == for_date, - RunParameters.run_datetime == run_datetime) + stmt = ( + select(HighHfiArea.id, HighHfiArea.advisory_shape_id, HighHfiArea.area, HighHfiArea.threshold) + .join(RunParameters) + .where(cast(RunParameters.run_type, String) == run_type.value, RunParameters.for_date == for_date, RunParameters.run_datetime == run_datetime) + ) result = await session.execute(stmt) return result.all() @@ -286,53 +281,43 @@ async def save_advisory_fuel_stats(session: AsyncSession, advisory_fuel_stats: L session.add_all(advisory_fuel_stats) -async def calculate_high_hfi_areas(session: AsyncSession, run_type: RunType, run_datetime: datetime, - for_date: date) -> List[Row]: +async def calculate_high_hfi_areas(session: AsyncSession, run_type: RunType, run_datetime: datetime, for_date: date) -> List[Row]: """ - Given a 'run_parameters_id', which represents a unqiue combination of run_type, run_datetime - and for_date, individually sum the areas in each firezone with: - 1. 4000 <= HFI < 10000 (aka 'advisory_area') - 2. HFI >= 10000 (aka 'warn_area') + Given a 'run_parameters_id', which represents a unqiue combination of run_type, run_datetime + and for_date, individually sum the areas in each firezone with: + 1. 4000 <= HFI < 10000 (aka 'advisory_area') + 2. HFI >= 10000 (aka 'warn_area') """ - logger.info('starting high HFI by zone intersection query') + logger.info("starting high HFI by zone intersection query") perf_start = perf_counter() - stmt = select(Shape.id.label('shape_id'), - ClassifiedHfi.threshold.label('threshold'), - func.sum(ClassifiedHfi.geom.ST_Intersection(Shape.geom).ST_Area()).label('area'))\ - .join_from(Shape, ClassifiedHfi, ClassifiedHfi.geom.ST_Intersects(Shape.geom))\ - .where(ClassifiedHfi.run_type == run_type.value)\ - .where(ClassifiedHfi.run_datetime == run_datetime)\ - .where(ClassifiedHfi.for_date == for_date)\ - .group_by(Shape.id)\ + stmt = ( + select(Shape.id.label("shape_id"), ClassifiedHfi.threshold.label("threshold"), func.sum(ClassifiedHfi.geom.ST_Intersection(Shape.geom).ST_Area()).label("area")) + .join_from(Shape, ClassifiedHfi, ClassifiedHfi.geom.ST_Intersects(Shape.geom)) + .where(ClassifiedHfi.run_type == run_type.value) + .where(ClassifiedHfi.run_datetime == run_datetime) + .where(ClassifiedHfi.for_date == for_date) + .group_by(Shape.id) .group_by(ClassifiedHfi.threshold) + ) result = await session.execute(stmt) all_high_hfi = result.all() perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after calculate high HFI by zone intersection query', delta) + logger.info("%f delta count before and after calculate high HFI by zone intersection query", delta) return all_high_hfi -async def get_run_parameters_id(session: AsyncSession, - run_type: RunType, - run_datetime: datetime, - for_date: date) -> List[Row]: - stmt = select(RunParameters.id)\ - .where(cast(RunParameters.run_type, String) == run_type.value, - RunParameters.run_datetime == run_datetime, - RunParameters.for_date == for_date) +async def get_run_parameters_id(session: AsyncSession, run_type: RunType, run_datetime: datetime, for_date: date) -> List[Row]: + stmt = select(RunParameters.id).where(cast(RunParameters.run_type, String) == run_type.value, RunParameters.run_datetime == run_datetime, RunParameters.for_date == for_date) result = await session.execute(stmt) return result.scalar() async def save_run_parameters(session: AsyncSession, run_type: RunType, run_datetime: datetime, for_date: date): - logger.info( - f'Writing run parameters. RunType: {run_type.value}; run_datetime: {run_datetime.isoformat()}; for_date: {for_date.isoformat()}') - stmt = insert(RunParameters)\ - .values(run_type=run_type.value, run_datetime=run_datetime, for_date=for_date)\ - .on_conflict_do_nothing() + logger.info(f"Writing run parameters. RunType: {run_type.value}; run_datetime: {run_datetime.isoformat()}; for_date: {for_date.isoformat()}") + stmt = insert(RunParameters).values(run_type=run_type.value, run_datetime=run_datetime, for_date=for_date).on_conflict_do_nothing() await session.execute(stmt) @@ -340,21 +325,29 @@ async def save_advisory_elevation_stats(session: AsyncSession, advisory_elevatio session.add(advisory_elevation_stats) -async def get_zonal_elevation_stats(session: AsyncSession, - fire_zone_id: int, - run_type: RunType, - run_datetime: datetime, - for_date: date) -> List[Row]: +async def save_advisory_elevation_tpi_stats(session: AsyncSession, advisory_elevation_stats: List[AdvisoryTPIStats]): + session.a(advisory_elevation_stats) + + +async def get_zonal_elevation_stats(session: AsyncSession, fire_zone_id: int, run_type: RunType, run_datetime: datetime, for_date: date) -> List[Row]: run_parameters_id = await get_run_parameters_id(session, run_type, run_datetime, for_date) stmt = select(Shape.id).where(Shape.source_identifier == str(fire_zone_id)) result = await session.execute(stmt) shape_id = result.scalar() - stmt = select(AdvisoryElevationStats.advisory_shape_id, AdvisoryElevationStats.minimum, - AdvisoryElevationStats.quartile_25, AdvisoryElevationStats.median, AdvisoryElevationStats.quartile_75, - AdvisoryElevationStats.maximum, AdvisoryElevationStats.threshold)\ - .where(AdvisoryElevationStats.advisory_shape_id == shape_id, AdvisoryElevationStats.run_parameters == run_parameters_id)\ + stmt = ( + select( + AdvisoryElevationStats.advisory_shape_id, + AdvisoryElevationStats.minimum, + AdvisoryElevationStats.quartile_25, + AdvisoryElevationStats.median, + AdvisoryElevationStats.quartile_75, + AdvisoryElevationStats.maximum, + AdvisoryElevationStats.threshold, + ) + .where(AdvisoryElevationStats.advisory_shape_id == shape_id, AdvisoryElevationStats.run_parameters == run_parameters_id) .order_by(AdvisoryElevationStats.threshold) + ) return await session.execute(stmt) @@ -378,4 +371,4 @@ async def get_provincial_rollup(session: AsyncSession, run_type: RunTypeEnum, ru .join(HighHfiArea, and_(HighHfiArea.advisory_shape_id == Shape.id, HighHfiArea.run_parameters == run_parameter_id), isouter=True) ) result = await session.execute(stmt) - return result.all() \ No newline at end of file + return result.all() diff --git a/api/app/db/models/auto_spatial_advisory.py b/api/app/db/models/auto_spatial_advisory.py index 539ca790a..e706ac388 100644 --- a/api/app/db/models/auto_spatial_advisory.py +++ b/api/app/db/models/auto_spatial_advisory.py @@ -60,7 +60,7 @@ class Shape(Base): fire_centre = Column(Integer, ForeignKey(FireCentre.id), nullable=True, index=True) -# Explict creation of index due to issue with alembic + geoalchemy. +# Explicit creation of index due to issue with alembic + geoalchemy. Index("idx_advisory_shapes_geom", Shape.geom, postgresql_using="gist") @@ -188,7 +188,7 @@ class AdvisoryFuelStats(Base): class AdvisoryTPIStats(Base): """ - Summary statistics about the elevation based on a classified Topographic Position Index (TPI). + Summary statistics about the elevation based on a HFI >4k classified Topographic Position Index (TPI). Classification of the TPI are bucketed into 1 = valley bottom, 2 = mid slope, 3 = upper slope. Each record includes the raster pixel count of the above classifications as well as the raster pixel size and resolution in metres and an advisory shape the stats are related to. @@ -198,7 +198,6 @@ class AdvisoryTPIStats(Base): __table_args__ = {"comment": "Elevation TPI stats per fire shape"} id = Column(Integer, primary_key=True, index=True) advisory_shape_id = Column(Integer, ForeignKey(Shape.id), nullable=False, index=True) - threshold = Column(Integer, ForeignKey(HfiClassificationThreshold.id), nullable=False) run_parameters = Column(Integer, ForeignKey(RunParameters.id), nullable=False, index=True) valley_bottom = Column(Integer, nullable=False, index=True) mid_slope = Column(Integer, nullable=False, index=True) From 825a28664453fdd0635aad7300c8f1b5bb445d6e Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 30 Jul 2024 14:52:36 -0700 Subject: [PATCH 13/37] Use existing process call --- api/app/auto_spatial_advisory/process_elevation_hfi.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/app/auto_spatial_advisory/process_elevation_hfi.py b/api/app/auto_spatial_advisory/process_elevation_hfi.py index c9d153446..52b5abe23 100644 --- a/api/app/auto_spatial_advisory/process_elevation_hfi.py +++ b/api/app/auto_spatial_advisory/process_elevation_hfi.py @@ -20,8 +20,6 @@ async def process_hfi_elevation(run_type: RunType, run_date: date, run_datetime: logger.info("Processing HFI elevation %s for run date: %s, for date: %s", run_type, run_date, for_date) perf_start = perf_counter() - logger.info(f"Key to HFI in object storage: {key}") - await process_elevation_tpi(run_type, run_datetime, for_date) perf_end = perf_counter() From 31ed215e807ef4e7fbb5e4d483ca53b06d785cad Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 31 Jul 2024 16:29:13 -0700 Subject: [PATCH 14/37] Remove unnecessary indices --- ...py => 6910d017b626_add_advisory_tpi_table.py} | 16 ++++------------ api/app/db/models/auto_spatial_advisory.py | 8 ++++---- 2 files changed, 8 insertions(+), 16 deletions(-) rename api/alembic/versions/{144eb6a781dd_add_advisory_tpi_table.py => 6910d017b626_add_advisory_tpi_table.py} (65%) diff --git a/api/alembic/versions/144eb6a781dd_add_advisory_tpi_table.py b/api/alembic/versions/6910d017b626_add_advisory_tpi_table.py similarity index 65% rename from api/alembic/versions/144eb6a781dd_add_advisory_tpi_table.py rename to api/alembic/versions/6910d017b626_add_advisory_tpi_table.py index 62b230597..b14d7223a 100644 --- a/api/alembic/versions/144eb6a781dd_add_advisory_tpi_table.py +++ b/api/alembic/versions/6910d017b626_add_advisory_tpi_table.py @@ -1,8 +1,8 @@ """add advisory tpi table -Revision ID: 144eb6a781dd +Revision ID: 6910d017b626 Revises: be128a7bb4fd -Create Date: 2024-07-30 14:44:42.050355 +Create Date: 2024-07-31 16:27:31.642156 """ @@ -11,7 +11,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = "144eb6a781dd" +revision = "6910d017b626" down_revision = "be128a7bb4fd" branch_labels = None depends_on = None @@ -41,21 +41,13 @@ def upgrade(): ) op.create_index(op.f("ix_advisory_tpi_stats_advisory_shape_id"), "advisory_tpi_stats", ["advisory_shape_id"], unique=False) op.create_index(op.f("ix_advisory_tpi_stats_id"), "advisory_tpi_stats", ["id"], unique=False) - op.create_index(op.f("ix_advisory_tpi_stats_mid_slope"), "advisory_tpi_stats", ["mid_slope"], unique=False) - op.create_index(op.f("ix_advisory_tpi_stats_pixel_size_metres"), "advisory_tpi_stats", ["pixel_size_metres"], unique=False) op.create_index(op.f("ix_advisory_tpi_stats_run_parameters"), "advisory_tpi_stats", ["run_parameters"], unique=False) - op.create_index(op.f("ix_advisory_tpi_stats_upper_slope"), "advisory_tpi_stats", ["upper_slope"], unique=False) - op.create_index(op.f("ix_advisory_tpi_stats_valley_bottom"), "advisory_tpi_stats", ["valley_bottom"], unique=False) # ### end Alembic commands ### def downgrade(): - # ### commands auto generated by Alembic ### - op.drop_index(op.f("ix_advisory_tpi_stats_valley_bottom"), table_name="advisory_tpi_stats") - op.drop_index(op.f("ix_advisory_tpi_stats_upper_slope"), table_name="advisory_tpi_stats") + # ### commands auto generated by Alembic ### op.drop_index(op.f("ix_advisory_tpi_stats_run_parameters"), table_name="advisory_tpi_stats") - op.drop_index(op.f("ix_advisory_tpi_stats_pixel_size_metres"), table_name="advisory_tpi_stats") - op.drop_index(op.f("ix_advisory_tpi_stats_mid_slope"), table_name="advisory_tpi_stats") op.drop_index(op.f("ix_advisory_tpi_stats_id"), table_name="advisory_tpi_stats") op.drop_index(op.f("ix_advisory_tpi_stats_advisory_shape_id"), table_name="advisory_tpi_stats") op.drop_table("advisory_tpi_stats") diff --git a/api/app/db/models/auto_spatial_advisory.py b/api/app/db/models/auto_spatial_advisory.py index e706ac388..ecee5e1e6 100644 --- a/api/app/db/models/auto_spatial_advisory.py +++ b/api/app/db/models/auto_spatial_advisory.py @@ -199,7 +199,7 @@ class AdvisoryTPIStats(Base): id = Column(Integer, primary_key=True, index=True) advisory_shape_id = Column(Integer, ForeignKey(Shape.id), nullable=False, index=True) run_parameters = Column(Integer, ForeignKey(RunParameters.id), nullable=False, index=True) - valley_bottom = Column(Integer, nullable=False, index=True) - mid_slope = Column(Integer, nullable=False, index=True) - upper_slope = Column(Integer, nullable=False, index=True) - pixel_size_metres = Column(Integer, nullable=False, index=True) + valley_bottom = Column(Integer, nullable=False, index=False) + mid_slope = Column(Integer, nullable=False, index=False) + upper_slope = Column(Integer, nullable=False, index=False) + pixel_size_metres = Column(Integer, nullable=False, index=False) From 2caa4a5589c673aacf6c9a45753529bcf440872c Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 1 Aug 2024 16:43:52 -0700 Subject: [PATCH 15/37] Fix processing logic --- api/app/auto_spatial_advisory/elevation.py | 39 ++++++++++---------- api/app/utils/geospatial.py | 42 +++++++++++++++++----- 2 files changed, 53 insertions(+), 28 deletions(-) diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index 9a32c8bf0..4af83459a 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -19,7 +19,7 @@ from app.db.models.auto_spatial_advisory import AdvisoryElevationStats, AdvisoryTPIStats from app.auto_spatial_advisory.hfi_filepath import get_raster_filepath, get_raster_tif_filename from app.utils.s3 import get_client -from app.utils.geospatial import GeospatialOptions, WarpOptions, cut_raster_by_shape_id, read_raster_into_memory +from app.utils.geospatial import GeospatialOptions, WarpOptions, cut_raster_by_shape_id, data_array_to_raster, read_raster_into_memory logger = logging.getLogger(__name__) @@ -233,36 +233,35 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d f'dem/tpi/{config.get("CLASSIFIED_TPI_DEM_NAME")}', raster_options, ) - hfi_result = await read_raster_into_memory( - client, - bucket, - hfi_raster_key, - GeospatialOptions( - warp_options=WarpOptions( - source_geotransform=tpi_result.data_geotransform, - source_projection=tpi_result.data_projection, - source_x_size=tpi_result.data_x_size, - source_y_size=tpi_result.data_y_size, - ) - ), + output_options = GeospatialOptions( + warp_options=WarpOptions( + source_geotransform=tpi_result.data_geotransform, + source_projection=tpi_result.data_projection, + source_x_size=tpi_result.data_x_size, + source_y_size=tpi_result.data_y_size, + ) ) + hfi_result = await read_raster_into_memory(client, bucket, hfi_raster_key, output_options) + + # We only want pixels with HFI >4k as values to intersect with TPI raster + hfi_result.data_array = np.where(hfi_result.data_array >= 1, 1, 0) + hfi_masked_tpi = np.multiply(tpi_result.data_array, hfi_result.data_array) + hfi_masked_tpi_ds = data_array_to_raster(hfi_masked_tpi, hfi_raster_key, output_options) async with get_async_write_session_scope() as session: stmt = text("SELECT id, source_identifier FROM advisory_shapes;") result = await session.execute(stmt) rows = result.all() for row in rows: - cut_tpi_result = cut_raster_by_shape_id(row[0], row[1], tpi_result.data_source, raster_options) - cut_hfi_result = cut_raster_by_shape_id(row[0], row[1], hfi_result.data_source, raster_options) - hfi_masked_tpi = np.multiply(cut_tpi_result.data_array, cut_hfi_result) + cut_hfi_masked_tpi = cut_raster_by_shape_id(row[0], row[1], hfi_masked_tpi_ds, raster_options) # Get unique values and their counts - tpi_classes, counts = np.unique(hfi_masked_tpi, return_counts=True) + tpi_classes, counts = np.unique(cut_hfi_masked_tpi.data_array, return_counts=True) tpi_class_freq_dist = dict(zip(tpi_classes, counts)) - # Drop TPI class 4, this is the no data value from the TPI raster - tpi_class_freq_dist.pop(4, None) - fire_zone_stats[row[1]] = tpi_class_freq_dist + # Drop TPI class 4, this is the no data value from the TPI raster + tpi_class_freq_dist.pop(4, None) + fire_zone_stats[row[1]] = tpi_class_freq_dist return fire_zone_stats diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index a7901b285..caea66241 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -1,9 +1,11 @@ +import os from dataclasses import dataclass import logging from typing import Any, Optional from aiobotocore.client import AioBaseClient from botocore.exceptions import ClientError from osgeo import gdal +import numpy as np from app.db.database import DB_READ_STRING @@ -125,15 +127,19 @@ def _get_raster_data_source(key: str, s3_data, options: Optional[GeospatialOptio raster_ds = gdal.Open(s3_data_mem_path, gdal.GA_ReadOnly) # peel off path, then extension, then attach .tif extension filename = ((key.split("/")[-1]).split(".")[0]) + ".tif" + + x_res = options.warp_options.source_geotransform[1] + y_res = -options.warp_options.source_geotransform[5] + minx = options.warp_options.source_geotransform[0] + maxy = options.warp_options.source_geotransform[3] + maxx = minx + options.warp_options.source_geotransform[1] * options.warp_options.source_x_size + miny = maxy + options.warp_options.source_geotransform[5] * options.warp_options.source_y_size + extent = [minx, miny, maxx, maxy] + warped_mem_path = f"/vsimem/{filename}" - gdal.Warp( - warped_mem_path, - raster_ds, - dstSRS=options.warp_options.source_projection, - xRes=options.warp_options.source_x_size, - yRes=options.warp_options.source_y_size, - resampleAlg=gdal.GRA_NearestNeighbour, - ) + + # Warp to match input option parameters + gdal.Warp(warped_mem_path, raster_ds, dstSRS=options.warp_options.source_projection, outputBounds=extent, xRes=x_res, yRes=y_res, resampleAlg=gdal.GRA_NearestNeighbour) output_raster_ds = gdal.Open(warped_mem_path, gdal.GA_ReadOnly) raster_ds = output_raster_ds gdal.Unlink(warped_mem_path) @@ -181,3 +187,23 @@ def cut_raster_by_shape_id(advisory_shape_id: int, source_identifier: str, data_ warp_options = gdal.WarpOptions(format="GTiff", cutlineDSName=DB_READ_STRING, cutlineSQL=f"SELECT geom FROM advisory_shapes WHERE id={advisory_shape_id}", cropToCutline=True) output_dataset = gdal.Warp(output_path, data_source, options=warp_options) return get_raster_data(output_dataset, options) + + +def data_array_to_raster(data_array: np.ndarray, key: str, options: GeospatialOptions, bands=1): + output_driver = gdal.GetDriverByName("GTiff") + mem_path = f"/vsimem/d2r-{key}" + gdal.FileFromMemBuffer(mem_path, data_array) + target_tiff = output_driver.Create(mem_path, xsize=options.warp_options.source_x_size, ysize=options.warp_options.source_y_size, bands=bands, eType=gdal.GDT_Byte) + # Set the geotransform and projection to the same as the input. + target_tiff.SetGeoTransform(options.warp_options.source_geotransform) + target_tiff.SetProjection(options.warp_options.source_projection) + + # Write the classified data to the band. + target_band = target_tiff.GetRasterBand(1) + target_band.WriteArray(data_array) + + # Important to make sure data is flushed to disk! + target_tiff.FlushCache() + output_raster_ds = gdal.Open(mem_path, gdal.GA_ReadOnly) + gdal.Unlink(mem_path) + return output_raster_ds From 8b51614a12bfacd7649e39cff459ff341cf2ebf9 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 1 Aug 2024 17:34:41 -0700 Subject: [PATCH 16/37] Configure pixel size in metres --- api/app/auto_spatial_advisory/elevation.py | 28 ++++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index 4af83459a..2bb88c8d2 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -1,5 +1,6 @@ """Takes a classified HFI image and calculates basic elevation statistics associated with advisory areas per fire zone.""" +from dataclasses import dataclass from datetime import date, datetime from time import perf_counter import logging @@ -210,15 +211,28 @@ def apply_threshold_mask_to_dem(threshold: int, mask_path: str, temp_dir: str): return masked_dem_path -async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: date) -> Dict[int, Dict[int, int]]: +@dataclass(frozen=True) +class FireZoneTPIStats: + """ + Captures fire zone stats of TPI pixels hitting >4K HFI threshold via + a dictionary, fire_zone_stats, of {source_identifier: {1: X, 2: Y, 3: Z}}, where 1 = valley bottom, 2 = mid slope, 3 = upper slope + and X, Y, Z are pixel counts at each of those elevation classes respectively. + + Also includes the TPI raster's pixel size in metres. + """ + + fire_zone_stats: Dict[int, Dict[int, int]] + pixel_size_metres: int + + +async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: date) -> FireZoneTPIStats: """ Given run parameters, lookup associated snow-masked HFI and static classified TPI geospatial data. Cut out each fire zone shape from the above and intersect the TPI and HFI pixels, counting each pixel contributing to the TPI class. Capture all fire zone stats keyed by its source_identifier. :param run_parameters_id: The RunParameter object id associated with this run_type, for_date and run_datetime - :return: dictionary of {source_identifier: {1: X, 2: Y, 3: Z}}, where 1 = valley bottom, 2 = mid slope, 3 = upper slope - and X, Y, Z are pixel counts at each of those elevation classes respectively. + :return: fire zone TPI status """ raster_options = GeospatialOptions(include_geotransform=True, include_projection=True, include_x_size=True, include_y_size=True) fire_zone_stats: Dict[int, Dict[int, int]] = {} @@ -263,7 +277,7 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d tpi_class_freq_dist.pop(4, None) fire_zone_stats[row[1]] = tpi_class_freq_dist - return fire_zone_stats + return FireZoneTPIStats(fire_zone_stats=fire_zone_stats, pixel_size_metres=tpi_result.data_geotransform[1]) async def process_elevation_by_firezone(threshold: int, masked_dem_path: str, run_parameters_id: int): @@ -351,7 +365,7 @@ async def store_elevation_stats(session: AsyncSession, threshold: int, shape_id: await save_advisory_elevation_stats(session, advisory_elevation_stats) -async def store_elevation_tpi_stats(session: AsyncSession, run_parameters_id: int, fire_zone_stats: Dict[int, Dict[int, int]]): +async def store_elevation_tpi_stats(session: AsyncSession, run_parameters_id: int, fire_zone_tpi_stats: FireZoneTPIStats): """ Writes elevation TPI statistics to the database. @@ -360,14 +374,14 @@ async def store_elevation_tpi_stats(session: AsyncSession, run_parameters_id: in :param fire_zone_stats: Dictionary keying shape id to a dictionary of classified tpi hfi pixel counts """ advisory_tpi_stats_list = [] - for shape_id, tpi_freq_count in fire_zone_stats: + for shape_id, tpi_freq_count in fire_zone_tpi_stats.fire_zone_stats: advisory_tpi_stats = AdvisoryTPIStats( advisory_shape_id=shape_id, run_parameters=run_parameters_id, valley_bottom=tpi_freq_count[1], mid_slope=tpi_freq_count[2], upper_slope=tpi_freq_count[3], - pixel_size_metres=2000, + pixel_size_metres=fire_zone_tpi_stats.pixel_size_metres, ) advisory_tpi_stats_list.append(advisory_tpi_stats) From d56bddfa899eeacac8c4d42e6deb4fcf86eea905 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 14:15:32 -0700 Subject: [PATCH 17/37] Add classified dem name to nats consumer env --- openshift/templates/nats.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/openshift/templates/nats.yaml b/openshift/templates/nats.yaml index b95f52720..648067dbc 100644 --- a/openshift/templates/nats.yaml +++ b/openshift/templates/nats.yaml @@ -332,6 +332,11 @@ objects: configMapKeyRef: name: ${GLOBAL_NAME} key: env.dem_name + - name: CLASSIFIED_TPI_DEM_NAME + valueFrom: + configMapKeyRef: + name: ${GLOBAL_NAME} + key: env.classified_tpi_dem_name ports: - containerPort: 4222 name: client From 7db79a5c1263d9ea1bfdcffd0744fcf5a2bf16e8 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 14:32:43 -0700 Subject: [PATCH 18/37] Bump nats resources --- openshift/templates/nats.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/openshift/templates/nats.yaml b/openshift/templates/nats.yaml index 648067dbc..ad1357b16 100644 --- a/openshift/templates/nats.yaml +++ b/openshift/templates/nats.yaml @@ -259,6 +259,13 @@ objects: - name: ${APP_NAME}-${SUFFIX}-nats-consumer image: ${IMAGE_REGISTRY}/${PROJ_TOOLS}/${IMAGE_NAME}:${IMAGE_TAG} imagePullPolicy: "Always" + resources: + requests: + cpu: "250m" + memory: "1024Mi" + limits: + cpu: "500m" + memory: "2048Mi" command: [ "poetry", From 0f693e4d7ced6fb043271614b872a9ae4e43e1c9 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 14:51:44 -0700 Subject: [PATCH 19/37] Bump resources --- openshift/templates/nats.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openshift/templates/nats.yaml b/openshift/templates/nats.yaml index ad1357b16..5917c6c9b 100644 --- a/openshift/templates/nats.yaml +++ b/openshift/templates/nats.yaml @@ -262,10 +262,10 @@ objects: resources: requests: cpu: "250m" - memory: "1024Mi" + memory: "2048Mi" limits: cpu: "500m" - memory: "2048Mi" + memory: "4096Mi" command: [ "poetry", From ce50d1a3ce653db12eedb792538929a13f83950f Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 15:08:06 -0700 Subject: [PATCH 20/37] Bump resources --- openshift/templates/nats.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openshift/templates/nats.yaml b/openshift/templates/nats.yaml index 5917c6c9b..6b11dbcf3 100644 --- a/openshift/templates/nats.yaml +++ b/openshift/templates/nats.yaml @@ -262,10 +262,10 @@ objects: resources: requests: cpu: "250m" - memory: "2048Mi" + memory: "4096Mi" limits: cpu: "500m" - memory: "4096Mi" + memory: "8192Mi" command: [ "poetry", From 546033db4ad23963fcb0a015ffec4112534754ab Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 15:43:03 -0700 Subject: [PATCH 21/37] Explicitly null references --- api/app/auto_spatial_advisory/elevation.py | 5 +++++ api/app/utils/geospatial.py | 1 + 2 files changed, 6 insertions(+) diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index 2bb88c8d2..12ceab19d 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -261,6 +261,11 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d hfi_result.data_array = np.where(hfi_result.data_array >= 1, 1, 0) hfi_masked_tpi = np.multiply(tpi_result.data_array, hfi_result.data_array) + tpi_result.data_array = None + tpi_result.data_source = None + hfi_result.data_array = None + hfi_result.data_source = None + hfi_masked_tpi_ds = data_array_to_raster(hfi_masked_tpi, hfi_raster_key, output_options) async with get_async_write_session_scope() as session: stmt = text("SELECT id, source_identifier FROM advisory_shapes;") diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index caea66241..874193905 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -104,6 +104,7 @@ def get_raster_data(data_source, options: Optional[GeospatialOptions]) -> Geospa """ (data_geotransform, data_projection, data_x_size, data_y_size) = get_geospatial_metadata(data_source, options) data_array = read_raster_data(data_source) + data_source = None return GeospatialReadResult( data_array=data_array, data_source=data_source, data_geotransform=data_geotransform, data_projection=data_projection, data_x_size=data_x_size, data_y_size=data_y_size ) From aac2f8b0c4601e761e1353f768dcff8f26680f13 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 15:59:50 -0700 Subject: [PATCH 22/37] Clear up more memory references --- api/app/auto_spatial_advisory/elevation.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index 12ceab19d..1a35fac44 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -262,9 +262,13 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d hfi_masked_tpi = np.multiply(tpi_result.data_array, hfi_result.data_array) tpi_result.data_array = None + del tpi_result.data_array tpi_result.data_source = None + del tpi_result.data_source hfi_result.data_array = None + del hfi_result.data_array hfi_result.data_source = None + del hfi_result.data_source hfi_masked_tpi_ds = data_array_to_raster(hfi_masked_tpi, hfi_raster_key, output_options) async with get_async_write_session_scope() as session: @@ -276,6 +280,10 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d cut_hfi_masked_tpi = cut_raster_by_shape_id(row[0], row[1], hfi_masked_tpi_ds, raster_options) # Get unique values and their counts tpi_classes, counts = np.unique(cut_hfi_masked_tpi.data_array, return_counts=True) + cut_hfi_masked_tpi.data_array = None + del cut_hfi_masked_tpi.data_array + cut_hfi_masked_tpi.data_source = None + del cut_hfi_masked_tpi.data_source tpi_class_freq_dist = dict(zip(tpi_classes, counts)) # Drop TPI class 4, this is the no data value from the TPI raster From ab72307a28f6797afb3df2d50212e6c722ae492a Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 21:06:45 -0700 Subject: [PATCH 23/37] Optimize memory consumption --- api/app/auto_spatial_advisory/elevation.py | 99 ++++++++++------------ api/app/utils/geospatial.py | 81 +++++++++++++++++- 2 files changed, 121 insertions(+), 59 deletions(-) diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index 1a35fac44..72c877dd3 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -20,7 +20,7 @@ from app.db.models.auto_spatial_advisory import AdvisoryElevationStats, AdvisoryTPIStats from app.auto_spatial_advisory.hfi_filepath import get_raster_filepath, get_raster_tif_filename from app.utils.s3 import get_client -from app.utils.geospatial import GeospatialOptions, WarpOptions, cut_raster_by_shape_id, data_array_to_raster, read_raster_into_memory +from app.utils.geospatial import raster_mul, warp_to_match_extent logger = logging.getLogger(__name__) @@ -225,72 +225,59 @@ class FireZoneTPIStats: pixel_size_metres: int -async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: date) -> FireZoneTPIStats: +async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: date): """ Given run parameters, lookup associated snow-masked HFI and static classified TPI geospatial data. Cut out each fire zone shape from the above and intersect the TPI and HFI pixels, counting each pixel contributing to the TPI class. Capture all fire zone stats keyed by its source_identifier. - :param run_parameters_id: The RunParameter object id associated with this run_type, for_date and run_datetime + :param run_type: forecast or actual + :param run_date: date the computation ran + :param for_date: date the computation is for :return: fire zone TPI status """ - raster_options = GeospatialOptions(include_geotransform=True, include_projection=True, include_x_size=True, include_y_size=True) - fire_zone_stats: Dict[int, Dict[int, int]] = {} + + gdal.SetConfigOption("AWS_SECRET_ACCESS_KEY", config.get("OBJECT_STORE_SECRET")) + gdal.SetConfigOption("AWS_ACCESS_KEY_ID", config.get("OBJECT_STORE_USER_ID")) + gdal.SetConfigOption("AWS_S3_ENDPOINT", config.get("OBJECT_STORE_SERVER")) + gdal.SetConfigOption("AWS_VIRTUAL_HOSTING", "FALSE") + bucket = config.get("OBJECT_STORE_BUCKET") + dem_file = config.get("CLASSIFIED_TPI_DEM_NAME") + + key = f"/vsis3/{bucket}/dem/tpi/{dem_file}" + tpi_source: gdal.Dataset = gdal.Open(key, gdal.GA_ReadOnly) + pixel_size_metres = tpi_source.GetGeoTransform()[1] + print(pixel_size_metres) hfi_raster_filename = get_raster_tif_filename(for_date) hfi_raster_key = get_raster_filepath(run_date, run_type, hfi_raster_filename) + hfi_key = f"/vsis3/{bucket}/{hfi_raster_key}" + hfi_source: gdal.Dataset = gdal.Open(hfi_key, gdal.GA_ReadOnly) - async with get_client() as (client, bucket): - tpi_result = await read_raster_into_memory( - client, - bucket, - f'dem/tpi/{config.get("CLASSIFIED_TPI_DEM_NAME")}', - raster_options, - ) - output_options = GeospatialOptions( - warp_options=WarpOptions( - source_geotransform=tpi_result.data_geotransform, - source_projection=tpi_result.data_projection, - source_x_size=tpi_result.data_x_size, - source_y_size=tpi_result.data_y_size, - ) - ) - hfi_result = await read_raster_into_memory(client, bucket, hfi_raster_key, output_options) - - # We only want pixels with HFI >4k as values to intersect with TPI raster - hfi_result.data_array = np.where(hfi_result.data_array >= 1, 1, 0) - hfi_masked_tpi = np.multiply(tpi_result.data_array, hfi_result.data_array) - - tpi_result.data_array = None - del tpi_result.data_array - tpi_result.data_source = None - del tpi_result.data_source - hfi_result.data_array = None - del hfi_result.data_array - hfi_result.data_source = None - del hfi_result.data_source - - hfi_masked_tpi_ds = data_array_to_raster(hfi_masked_tpi, hfi_raster_key, output_options) - async with get_async_write_session_scope() as session: - stmt = text("SELECT id, source_identifier FROM advisory_shapes;") - result = await session.execute(stmt) - rows = result.all() - - for row in rows: - cut_hfi_masked_tpi = cut_raster_by_shape_id(row[0], row[1], hfi_masked_tpi_ds, raster_options) - # Get unique values and their counts - tpi_classes, counts = np.unique(cut_hfi_masked_tpi.data_array, return_counts=True) - cut_hfi_masked_tpi.data_array = None - del cut_hfi_masked_tpi.data_array - cut_hfi_masked_tpi.data_source = None - del cut_hfi_masked_tpi.data_source - tpi_class_freq_dist = dict(zip(tpi_classes, counts)) - - # Drop TPI class 4, this is the no data value from the TPI raster - tpi_class_freq_dist.pop(4, None) - fire_zone_stats[row[1]] = tpi_class_freq_dist - - return FireZoneTPIStats(fire_zone_stats=fire_zone_stats, pixel_size_metres=tpi_result.data_geotransform[1]) + warped_mem_path = f"/vsimem/warp_{hfi_raster_filename}" + resized_hfi_source: gdal.Dataset = warp_to_match_extent(hfi_source, tpi_source, warped_mem_path) + hfi_masked_tpi = raster_mul(tpi_source, resized_hfi_source) + resized_hfi_source.Close() + + fire_zone_stats: Dict[int, Dict[int, int]] = {} + async with get_async_write_session_scope() as session: + stmt = text("SELECT id, source_identifier FROM advisory_shapes;") + result = await session.execute(stmt) + + for row in result: + output_path = f"/vsimem/firezone_{row[1]}.tif" + warp_options = gdal.WarpOptions(format="GTiff", cutlineDSName=DB_READ_STRING, cutlineSQL=f"SELECT geom FROM advisory_shapes WHERE id={row[0]}", cropToCutline=True) + cut_hfi_masked_tpi: gdal.Dataset = gdal.Warp(output_path, hfi_masked_tpi, options=warp_options) + # Get unique values and their counts + tpi_classes, counts = np.unique(cut_hfi_masked_tpi.GetRasterBand(1).ReadAsArray(), return_counts=True) + cut_hfi_masked_tpi.Close() + tpi_class_freq_dist = dict(zip(tpi_classes, counts)) + + # Drop TPI class 4, this is the no data value from the TPI raster + tpi_class_freq_dist.pop(4, None) + fire_zone_stats[row[1]] = tpi_class_freq_dist + + return FireZoneTPIStats(fire_zone_stats=fire_zone_stats, pixel_size_metres=pixel_size_metres) async def process_elevation_by_firezone(threshold: int, masked_dem_path: str, run_parameters_id: int): diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index 874193905..95e2ab24c 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -1,4 +1,3 @@ -import os from dataclasses import dataclass import logging from typing import Any, Optional @@ -44,7 +43,7 @@ class GeospatialReadResult: """ data_array: Any - data_source: Any + data_source: gdal.Dataset data_geotransform: Optional[Any] data_projection: Optional[Any] data_x_size: Optional[int] @@ -104,7 +103,6 @@ def get_raster_data(data_source, options: Optional[GeospatialOptions]) -> Geospa """ (data_geotransform, data_projection, data_x_size, data_y_size) = get_geospatial_metadata(data_source, options) data_array = read_raster_data(data_source) - data_source = None return GeospatialReadResult( data_array=data_array, data_source=data_source, data_geotransform=data_geotransform, data_projection=data_projection, data_x_size=data_x_size, data_y_size=data_y_size ) @@ -208,3 +206,80 @@ def data_array_to_raster(data_array: np.ndarray, key: str, options: GeospatialOp output_raster_ds = gdal.Open(mem_path, gdal.GA_ReadOnly) gdal.Unlink(mem_path) return output_raster_ds + + +def warp_to_match_extent(source_raster: gdal.Dataset, raster_to_match: gdal.Dataset, output_path: str) -> gdal.Dataset: + """ + Warp the source_raster to match the extent and projection of the other raster. + + :param source_raster: the raster to warp + :param raster_to_match: the reference raster to match the source against + :param output_path: output path of the resulting raster + :return: warped raster dataset + """ + source_geotransform = raster_to_match.GetGeoTransform() + x_res = source_geotransform[1] + y_res = -source_geotransform[5] + minx = source_geotransform[0] + maxy = source_geotransform[3] + maxx = minx + source_geotransform[1] * raster_to_match.RasterXSize + miny = maxy + source_geotransform[5] * raster_to_match.RasterYSize + extent = [minx, miny, maxx, maxy] + + # Warp to match input option parameters + return gdal.Warp(output_path, source_raster, dstSRS=raster_to_match.GetProjection(), outputBounds=extent, xRes=x_res, yRes=y_res, resampleAlg=gdal.GRA_NearestNeighbour) + + +def raster_mul(tpi_raster: gdal.Dataset, hfi_raster: gdal.Dataset, chunk_size=256) -> gdal.Dataset: + """ + Multiply rasters together by reading in chunks of pixels at a time to avoid loading + the rasters into memory all at once. + + :param tpi_raster: Classified TPI raster to multiply against the classified HFI raster + :param hfi_raster: Classified HFI raster to multiply against the classified TPI raster + :raises ValueError: Raised if the dimensions of the rasters don't match + :return: Multiplied raster result as a raster dataset + """ + # Get raster dimensions + x_size = tpi_raster.RasterXSize + y_size = tpi_raster.RasterYSize + + # Check if the dimensions of both rasters match + if x_size != hfi_raster.RasterXSize or y_size != hfi_raster.RasterYSize: + raise ValueError("The dimensions of the two rasters do not match.") + + # Get the geotransform and projection from the first raster + geotransform = tpi_raster.GetGeoTransform() + projection = tpi_raster.GetProjection() + + # Create the output raster + driver = gdal.GetDriverByName("MEM") + out_raster: gdal.Dataset = driver.Create("memory", x_size, y_size, 1, gdal.GDT_Byte) + + # Set the geotransform and projection + out_raster.SetGeoTransform(geotransform) + out_raster.SetProjection(projection) + + # Process in chunks + for y in range(0, y_size, chunk_size): + y_chunk_size = min(chunk_size, y_size - y) + + for x in range(0, x_size, chunk_size): + x_chunk_size = min(chunk_size, x_size - x) + + # Read chunks from both rasters + tpi_chunk = tpi_raster.GetRasterBand(1).ReadAsArray(x, y, x_chunk_size, y_chunk_size) + hfi_chunk = hfi_raster.GetRasterBand(1).ReadAsArray(x, y, x_chunk_size, y_chunk_size) + + hfi_chunk[hfi_chunk >= 1] = 1 + hfi_chunk[hfi_chunk < 1] = 0 + + # Multiply the chunks + tpi_chunk *= hfi_chunk + + # Write the result to the output raster + out_raster.GetRasterBand(1).WriteArray(tpi_chunk, x, y) + tpi_chunk = None + hfi_chunk = None + + return out_raster From c743b9895fb1217ed111cbf0bae24a4e7f9a6d7d Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 21:09:21 -0700 Subject: [PATCH 24/37] Remove unused functions --- api/app/utils/geospatial.py | 187 ------------------------------------ 1 file changed, 187 deletions(-) diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index 95e2ab24c..47772ae6c 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -1,41 +1,12 @@ from dataclasses import dataclass import logging from typing import Any, Optional -from aiobotocore.client import AioBaseClient -from botocore.exceptions import ClientError from osgeo import gdal -import numpy as np -from app.db.database import DB_READ_STRING logger = logging.getLogger(__name__) -@dataclass -class WarpOptions: - """ - Inputs warping raster based on attributes from a source raster. - """ - - source_geotransform: Any - source_projection: Any - source_x_size: int - source_y_size: int - - -@dataclass -class GeospatialOptions: - """ - Options supplied by caller to declare optional outputs - """ - - warp_options: Optional[WarpOptions] = None - include_geotransform: Optional[bool] = True - include_projection: Optional[bool] = True - include_x_size: Optional[bool] = None - include_y_size: Optional[bool] = None - - @dataclass class GeospatialReadResult: """ @@ -50,164 +21,6 @@ class GeospatialReadResult: data_y_size: Optional[int] -def get_geospatial_metadata(data_source, options: Optional[GeospatialOptions]): - """ - Extracts metadata defined by the options parameter from the data_source raster - - :param data_source: input raster to extract metadata from - :param options: defines the metadata the caller is interested in extracting from the raster - :return: optional metadata of geotransform, project, x and y sizes from the raster - """ - geotransform = None - projection = None - x_size = None - y_size = None - if options is None: - return (geotransform, projection, x_size, y_size) - - if options.include_geotransform: - geotransform = data_source.GetGeoTransform() - - if options.include_projection: - projection = data_source.GetProjection() - - if options.include_x_size: - x_size = data_source.RasterXSize - - if options.include_y_size: - y_size = data_source.RasterYSize - - return (geotransform, projection, x_size, y_size) - - -def read_raster_data(data_source, band=1): - """ - Read raster data and return as array from the data source - - :param data_source: data source to read the data from - :param band: the band from the data source to read from - :return: raster data as array - """ - data_band = data_source.GetRasterBand(band) - data_array = data_band.ReadAsArray() - return data_array - - -def get_raster_data(data_source, options: Optional[GeospatialOptions]) -> GeospatialReadResult: - """ - Returns the raster data and optional metadata the caller is interested defined by the options parameter - - :param data_source: raster data source - :param options: options defining the metadata the caller is interested in - :return: raster data array and potential metadata - """ - (data_geotransform, data_projection, data_x_size, data_y_size) = get_geospatial_metadata(data_source, options) - data_array = read_raster_data(data_source) - return GeospatialReadResult( - data_array=data_array, data_source=data_source, data_geotransform=data_geotransform, data_projection=data_projection, data_x_size=data_x_size, data_y_size=data_y_size - ) - - -def _get_raster_data_source(key: str, s3_data, options: Optional[GeospatialOptions]): - """ - Retrieves geospatial file from S3. If the options include WarpOptions, it will warp - the raster based on those options. If no WarpOptions are included, - it simply read and return the raster data as is. - - :param key: the object store key pointing to the desired geospatial file - :param s3_data: the object store content blob - :param options: options defining the desired metadata as well as warp options if desired - :return: raster data source - """ - s3_data_mem_path = f"/vsimem/{key}" - gdal.FileFromMemBuffer(s3_data_mem_path, s3_data) - raster_ds = None - if options is not None and options.warp_options is not None: - raster_ds = gdal.Open(s3_data_mem_path, gdal.GA_ReadOnly) - # peel off path, then extension, then attach .tif extension - filename = ((key.split("/")[-1]).split(".")[0]) + ".tif" - - x_res = options.warp_options.source_geotransform[1] - y_res = -options.warp_options.source_geotransform[5] - minx = options.warp_options.source_geotransform[0] - maxy = options.warp_options.source_geotransform[3] - maxx = minx + options.warp_options.source_geotransform[1] * options.warp_options.source_x_size - miny = maxy + options.warp_options.source_geotransform[5] * options.warp_options.source_y_size - extent = [minx, miny, maxx, maxy] - - warped_mem_path = f"/vsimem/{filename}" - - # Warp to match input option parameters - gdal.Warp(warped_mem_path, raster_ds, dstSRS=options.warp_options.source_projection, outputBounds=extent, xRes=x_res, yRes=y_res, resampleAlg=gdal.GRA_NearestNeighbour) - output_raster_ds = gdal.Open(warped_mem_path, gdal.GA_ReadOnly) - raster_ds = output_raster_ds - gdal.Unlink(warped_mem_path) - else: - raster_ds = gdal.Open(s3_data_mem_path, gdal.GA_ReadOnly) - - gdal.Unlink(s3_data_mem_path) - return raster_ds - - -async def read_raster_into_memory(client: AioBaseClient, bucket: str, key: str, options: Optional[GeospatialOptions]) -> Optional[GeospatialReadResult]: - """ - Reads in the desired geospatial data as raw content bytes then returns the content as a data array and metadata - defined by the options the caller is interested in. - - :param client: object store client - :param bucket: the bucket the data is in - :param key: the key identifying the geospatial data in the bucket - :param options: options defining the desired metadata the user is interested in retrieving and vector - options if the geospatial file is a vector file - :return: raster data array and optional metadata - """ - try: - s3_source = await client.get_object(Bucket=bucket, Key=key) - s3_data = await s3_source["Body"].read() - data_source = _get_raster_data_source(key, s3_data, options) - return get_raster_data(data_source, options) - except ClientError as ex: - if ex.response["Error"]["Code"] == "NoSuchKey": - logger.info("No object found for key: %s", key) - return None - else: - raise - - -def cut_raster_by_shape_id(advisory_shape_id: int, source_identifier: str, data_source: Any, options: Optional[GeospatialOptions]) -> GeospatialReadResult: - """ - Given a raster dataset and a fire zone id, use gdal.Warp to clip out a fire zone from which we can retrieve stats. - - :param advisory_shape_id: The id of the fire zone (aka advisory_shape object) to clip with - :param source_identifier: The source identifier of the fire zone. - :param data_source: The source raster to be clipped. - """ - output_path = f"/vsimem/firezone_{source_identifier}.tif" - warp_options = gdal.WarpOptions(format="GTiff", cutlineDSName=DB_READ_STRING, cutlineSQL=f"SELECT geom FROM advisory_shapes WHERE id={advisory_shape_id}", cropToCutline=True) - output_dataset = gdal.Warp(output_path, data_source, options=warp_options) - return get_raster_data(output_dataset, options) - - -def data_array_to_raster(data_array: np.ndarray, key: str, options: GeospatialOptions, bands=1): - output_driver = gdal.GetDriverByName("GTiff") - mem_path = f"/vsimem/d2r-{key}" - gdal.FileFromMemBuffer(mem_path, data_array) - target_tiff = output_driver.Create(mem_path, xsize=options.warp_options.source_x_size, ysize=options.warp_options.source_y_size, bands=bands, eType=gdal.GDT_Byte) - # Set the geotransform and projection to the same as the input. - target_tiff.SetGeoTransform(options.warp_options.source_geotransform) - target_tiff.SetProjection(options.warp_options.source_projection) - - # Write the classified data to the band. - target_band = target_tiff.GetRasterBand(1) - target_band.WriteArray(data_array) - - # Important to make sure data is flushed to disk! - target_tiff.FlushCache() - output_raster_ds = gdal.Open(mem_path, gdal.GA_ReadOnly) - gdal.Unlink(mem_path) - return output_raster_ds - - def warp_to_match_extent(source_raster: gdal.Dataset, raster_to_match: gdal.Dataset, output_path: str) -> gdal.Dataset: """ Warp the source_raster to match the extent and projection of the other raster. From bf91c158238adc960ca279f74af9b3aed69594fc Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 21:10:28 -0700 Subject: [PATCH 25/37] Remove unused dataclass --- api/app/utils/geospatial.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index 47772ae6c..19a52654e 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -7,20 +7,6 @@ logger = logging.getLogger(__name__) -@dataclass -class GeospatialReadResult: - """ - Raster data array and caller requested metadata - """ - - data_array: Any - data_source: gdal.Dataset - data_geotransform: Optional[Any] - data_projection: Optional[Any] - data_x_size: Optional[int] - data_y_size: Optional[int] - - def warp_to_match_extent(source_raster: gdal.Dataset, raster_to_match: gdal.Dataset, output_path: str) -> gdal.Dataset: """ Warp the source_raster to match the extent and projection of the other raster. From 8e19699abc1b0694be2c61522d04ce426bce30d1 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 21:33:31 -0700 Subject: [PATCH 26/37] Remove close calls --- api/app/auto_spatial_advisory/elevation.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index 72c877dd3..4555e7805 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -247,7 +247,6 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d key = f"/vsis3/{bucket}/dem/tpi/{dem_file}" tpi_source: gdal.Dataset = gdal.Open(key, gdal.GA_ReadOnly) pixel_size_metres = tpi_source.GetGeoTransform()[1] - print(pixel_size_metres) hfi_raster_filename = get_raster_tif_filename(for_date) hfi_raster_key = get_raster_filepath(run_date, run_type, hfi_raster_filename) @@ -257,7 +256,7 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d warped_mem_path = f"/vsimem/warp_{hfi_raster_filename}" resized_hfi_source: gdal.Dataset = warp_to_match_extent(hfi_source, tpi_source, warped_mem_path) hfi_masked_tpi = raster_mul(tpi_source, resized_hfi_source) - resized_hfi_source.Close() + resized_hfi_source = None fire_zone_stats: Dict[int, Dict[int, int]] = {} async with get_async_write_session_scope() as session: @@ -270,7 +269,7 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d cut_hfi_masked_tpi: gdal.Dataset = gdal.Warp(output_path, hfi_masked_tpi, options=warp_options) # Get unique values and their counts tpi_classes, counts = np.unique(cut_hfi_masked_tpi.GetRasterBand(1).ReadAsArray(), return_counts=True) - cut_hfi_masked_tpi.Close() + cut_hfi_masked_tpi = None tpi_class_freq_dist = dict(zip(tpi_classes, counts)) # Drop TPI class 4, this is the no data value from the TPI raster From 7fe6d06a455eb8066193527ca6a195002be29d3b Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 22:37:18 -0700 Subject: [PATCH 27/37] Fix minor bugs --- api/app/auto_spatial_advisory/elevation.py | 14 +++++++------- api/app/db/crud/auto_spatial_advisory.py | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index 4555e7805..2d228f991 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -39,7 +39,7 @@ async def process_elevation_tpi(run_type: RunType, run_datetime: datetime, for_d logger.info("Processing elevation stats %s for run date: %s, for date: %s", run_type, run_datetime, for_date) perf_start = perf_counter() # Get the id from run_parameters associated with the provided run_type, for_date and for_datetime - async with get_async_read_session_scope() as session: + async with get_async_write_session_scope() as session: run_parameters_id = await get_run_parameters_id(session, run_type, run_datetime, for_date) stmt = select(AdvisoryTPIStats).where(AdvisoryTPIStats.run_parameters == run_parameters_id) @@ -246,7 +246,7 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d key = f"/vsis3/{bucket}/dem/tpi/{dem_file}" tpi_source: gdal.Dataset = gdal.Open(key, gdal.GA_ReadOnly) - pixel_size_metres = tpi_source.GetGeoTransform()[1] + pixel_size_metres = int(tpi_source.GetGeoTransform()[1]) hfi_raster_filename = get_raster_tif_filename(for_date) hfi_raster_key = get_raster_filepath(run_date, run_type, hfi_raster_filename) @@ -373,13 +373,13 @@ async def store_elevation_tpi_stats(session: AsyncSession, run_parameters_id: in :param fire_zone_stats: Dictionary keying shape id to a dictionary of classified tpi hfi pixel counts """ advisory_tpi_stats_list = [] - for shape_id, tpi_freq_count in fire_zone_tpi_stats.fire_zone_stats: + for shape_id, tpi_freq_count in fire_zone_tpi_stats.fire_zone_stats.items(): advisory_tpi_stats = AdvisoryTPIStats( - advisory_shape_id=shape_id, + advisory_shape_id=int(shape_id), run_parameters=run_parameters_id, - valley_bottom=tpi_freq_count[1], - mid_slope=tpi_freq_count[2], - upper_slope=tpi_freq_count[3], + valley_bottom=tpi_freq_count.get(1, 0), + mid_slope=tpi_freq_count.get(2, 0), + upper_slope=tpi_freq_count.get(3, 0), pixel_size_metres=fire_zone_tpi_stats.pixel_size_metres, ) advisory_tpi_stats_list.append(advisory_tpi_stats) diff --git a/api/app/db/crud/auto_spatial_advisory.py b/api/app/db/crud/auto_spatial_advisory.py index 81e7f320d..a6831261c 100644 --- a/api/app/db/crud/auto_spatial_advisory.py +++ b/api/app/db/crud/auto_spatial_advisory.py @@ -326,7 +326,7 @@ async def save_advisory_elevation_stats(session: AsyncSession, advisory_elevatio async def save_advisory_elevation_tpi_stats(session: AsyncSession, advisory_elevation_stats: List[AdvisoryTPIStats]): - session.a(advisory_elevation_stats) + session.add_all(advisory_elevation_stats) async def get_zonal_elevation_stats(session: AsyncSession, fire_zone_id: int, run_type: RunType, run_datetime: datetime, for_date: date) -> List[Row]: From d76f45986563e4d52fd66909e547135e9918ef0e Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 23:10:57 -0700 Subject: [PATCH 28/37] Configurable memory --- .github/workflows/deployment.yml | 2 +- openshift/scripts/oc_provision_nats.sh | 2 ++ openshift/templates/nats.yaml | 11 +++++++++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/.github/workflows/deployment.yml b/.github/workflows/deployment.yml index 9dc919dcf..f9cdc4073 100644 --- a/.github/workflows/deployment.yml +++ b/.github/workflows/deployment.yml @@ -257,7 +257,7 @@ jobs: shell: bash run: | oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}" - PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_nats.sh ${SUFFIX} apply + PROJ_DEV="e1e498-dev" MEMORY_REQUEST=250Mi MEMORY_LIMIT=500Mi bash openshift/scripts/oc_provision_nats.sh ${SUFFIX} apply scan-dev: name: ZAP Baseline Scan Dev diff --git a/openshift/scripts/oc_provision_nats.sh b/openshift/scripts/oc_provision_nats.sh index 58db2c6be..458aa47b9 100755 --- a/openshift/scripts/oc_provision_nats.sh +++ b/openshift/scripts/oc_provision_nats.sh @@ -29,6 +29,8 @@ OC_PROCESS="oc -n ${PROJ_TARGET} process -f ${PATH_NATS} \ -p IMAGE_NAME=${APP_NAME}-api-${SUFFIX} \ -p IMAGE_TAG=${SUFFIX} \ -p POSTGRES_DATABASE=wps \ + ${MEMORY_LIMIT:+ "-p MEMORY_LIMIT=${MEMORY_LIMIT} \ + ${MEMORY_REQUEST:+ "-p MEMORY_REQUEST=${MEMORY_REQUEST} \ -p CRUNCHYDB_USER=wps-crunchydb-${SUFFIX}-pguser-wps-crunchydb-${SUFFIX} \ -p APP_NAME=${APP_NAME}" diff --git a/openshift/templates/nats.yaml b/openshift/templates/nats.yaml index 6b11dbcf3..48f0f2640 100644 --- a/openshift/templates/nats.yaml +++ b/openshift/templates/nats.yaml @@ -36,6 +36,13 @@ parameters: required: true - name: POSTGRES_DATABASE required: true + - name: MEMORY_REQUEST + required: true + value: 3800Mi + - name: MEMORY_LIMIT + required: true + value: 5000Mi + objects: - apiVersion: v1 kind: ConfigMap @@ -262,10 +269,10 @@ objects: resources: requests: cpu: "250m" - memory: "4096Mi" + memory: ${MEMORY_REQUEST} limits: cpu: "500m" - memory: "8192Mi" + memory: ${MEMORY_LIMIT} command: [ "poetry", From b1e8bb7cced7ae7981ec17412dbc54e986916132 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 23:13:08 -0700 Subject: [PATCH 29/37] Clean up hfi masked tpi raster --- api/app/auto_spatial_advisory/elevation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index 2d228f991..174540cd2 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -276,6 +276,7 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d tpi_class_freq_dist.pop(4, None) fire_zone_stats[row[1]] = tpi_class_freq_dist + hfi_masked_tpi = None return FireZoneTPIStats(fire_zone_stats=fire_zone_stats, pixel_size_metres=pixel_size_metres) From 944ea96930a4f5ad982a1ac59078effce4d107be Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 6 Aug 2024 23:19:55 -0700 Subject: [PATCH 30/37] Fix script syntax --- openshift/scripts/oc_provision_nats.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openshift/scripts/oc_provision_nats.sh b/openshift/scripts/oc_provision_nats.sh index 458aa47b9..44ead304f 100755 --- a/openshift/scripts/oc_provision_nats.sh +++ b/openshift/scripts/oc_provision_nats.sh @@ -29,8 +29,8 @@ OC_PROCESS="oc -n ${PROJ_TARGET} process -f ${PATH_NATS} \ -p IMAGE_NAME=${APP_NAME}-api-${SUFFIX} \ -p IMAGE_TAG=${SUFFIX} \ -p POSTGRES_DATABASE=wps \ - ${MEMORY_LIMIT:+ "-p MEMORY_LIMIT=${MEMORY_LIMIT} \ - ${MEMORY_REQUEST:+ "-p MEMORY_REQUEST=${MEMORY_REQUEST} \ + ${MEMORY_REQUEST:+ "-p MEMORY_REQUEST=${MEMORY_REQUEST}"} \ + ${MEMORY_LIMIT:+ "-p MEMORY_LIMIT=${MEMORY_LIMIT}"} \ -p CRUNCHYDB_USER=wps-crunchydb-${SUFFIX}-pguser-wps-crunchydb-${SUFFIX} \ -p APP_NAME=${APP_NAME}" From ed06548a2a71ab77888dae7b5829a7470f2f4a9a Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 7 Aug 2024 12:21:50 -0700 Subject: [PATCH 31/37] Pull out band calls --- api/app/utils/geospatial.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py index 19a52654e..e54288235 100644 --- a/api/app/utils/geospatial.py +++ b/api/app/utils/geospatial.py @@ -59,6 +59,9 @@ def raster_mul(tpi_raster: gdal.Dataset, hfi_raster: gdal.Dataset, chunk_size=25 out_raster.SetGeoTransform(geotransform) out_raster.SetProjection(projection) + tpi_raster_band = tpi_raster.GetRasterBand(1) + hfi_raster_band = hfi_raster.GetRasterBand(1) + # Process in chunks for y in range(0, y_size, chunk_size): y_chunk_size = min(chunk_size, y_size - y) @@ -67,8 +70,8 @@ def raster_mul(tpi_raster: gdal.Dataset, hfi_raster: gdal.Dataset, chunk_size=25 x_chunk_size = min(chunk_size, x_size - x) # Read chunks from both rasters - tpi_chunk = tpi_raster.GetRasterBand(1).ReadAsArray(x, y, x_chunk_size, y_chunk_size) - hfi_chunk = hfi_raster.GetRasterBand(1).ReadAsArray(x, y, x_chunk_size, y_chunk_size) + tpi_chunk = tpi_raster_band.ReadAsArray(x, y, x_chunk_size, y_chunk_size) + hfi_chunk = hfi_raster_band.ReadAsArray(x, y, x_chunk_size, y_chunk_size) hfi_chunk[hfi_chunk >= 1] = 1 hfi_chunk[hfi_chunk < 1] = 0 From 45c827fb54f8fe721ef04d6485c5ee8e05b40841 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 7 Aug 2024 16:04:30 -0700 Subject: [PATCH 32/37] unlink and cleanup datasets --- api/app/auto_spatial_advisory/elevation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index 174540cd2..572c624a0 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -257,6 +257,9 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d resized_hfi_source: gdal.Dataset = warp_to_match_extent(hfi_source, tpi_source, warped_mem_path) hfi_masked_tpi = raster_mul(tpi_source, resized_hfi_source) resized_hfi_source = None + hfi_source = None + tpi_source = None + gdal.Unlink(warped_mem_path) fire_zone_stats: Dict[int, Dict[int, int]] = {} async with get_async_write_session_scope() as session: @@ -270,6 +273,7 @@ async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: d # Get unique values and their counts tpi_classes, counts = np.unique(cut_hfi_masked_tpi.GetRasterBand(1).ReadAsArray(), return_counts=True) cut_hfi_masked_tpi = None + gdal.Unlink(output_path) tpi_class_freq_dist = dict(zip(tpi_classes, counts)) # Drop TPI class 4, this is the no data value from the TPI raster From 0267190bc2c70193cba43840bb6d057ce7ed262e Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 7 Aug 2024 16:36:42 -0700 Subject: [PATCH 33/37] Try some more storage --- openshift/templates/nats.yaml | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/openshift/templates/nats.yaml b/openshift/templates/nats.yaml index 48f0f2640..f79b10d09 100644 --- a/openshift/templates/nats.yaml +++ b/openshift/templates/nats.yaml @@ -246,6 +246,18 @@ objects: "-c", "/nats-server -sl=ldm=/var/run/nats/nats.pid && /bin/sleep 60", ] + - apiVersion: v1 + kind: PersistentVolumeClaim + metadata: + name: ${APP_NAME}-${SUFFIX}-nats-consumer-pvc + labels: + app: ${APP_NAME}-${SUFFIX} + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi - apiVersion: apps/v1 kind: Deployment metadata: @@ -265,6 +277,10 @@ objects: containers: - name: ${APP_NAME}-${SUFFIX}-nats-consumer image: ${IMAGE_REGISTRY}/${PROJ_TOOLS}/${IMAGE_NAME}:${IMAGE_TAG} + volumes: + - name: nats-consumer-storage + persistentVolumeClaim: + claimName: ${APP_NAME}-${SUFFIX}-nats-consumer-pvc imagePullPolicy: "Always" resources: requests: @@ -273,6 +289,9 @@ objects: limits: cpu: "500m" memory: ${MEMORY_LIMIT} + volumeMounts: + - mountPath: "/" + name: nats-consumer-storage command: [ "poetry", From 2f91873aec5d6415423132c3b98126b76a1710c5 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 8 Aug 2024 09:21:43 -0700 Subject: [PATCH 34/37] Remove volume mounts --- openshift/templates/nats.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/openshift/templates/nats.yaml b/openshift/templates/nats.yaml index f79b10d09..6937aaa95 100644 --- a/openshift/templates/nats.yaml +++ b/openshift/templates/nats.yaml @@ -289,9 +289,6 @@ objects: limits: cpu: "500m" memory: ${MEMORY_LIMIT} - volumeMounts: - - mountPath: "/" - name: nats-consumer-storage command: [ "poetry", From 6aa3e4aecf582bcc6dfc1aea5c2588eb5405dc04 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 8 Aug 2024 11:05:01 -0700 Subject: [PATCH 35/37] Remove pvc --- openshift/templates/nats.yaml | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/openshift/templates/nats.yaml b/openshift/templates/nats.yaml index 6937aaa95..48f0f2640 100644 --- a/openshift/templates/nats.yaml +++ b/openshift/templates/nats.yaml @@ -246,18 +246,6 @@ objects: "-c", "/nats-server -sl=ldm=/var/run/nats/nats.pid && /bin/sleep 60", ] - - apiVersion: v1 - kind: PersistentVolumeClaim - metadata: - name: ${APP_NAME}-${SUFFIX}-nats-consumer-pvc - labels: - app: ${APP_NAME}-${SUFFIX} - spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 2Gi - apiVersion: apps/v1 kind: Deployment metadata: @@ -277,10 +265,6 @@ objects: containers: - name: ${APP_NAME}-${SUFFIX}-nats-consumer image: ${IMAGE_REGISTRY}/${PROJ_TOOLS}/${IMAGE_NAME}:${IMAGE_TAG} - volumes: - - name: nats-consumer-storage - persistentVolumeClaim: - claimName: ${APP_NAME}-${SUFFIX}-nats-consumer-pvc imagePullPolicy: "Always" resources: requests: From 51ce4b284ad549a3473a7b1e8f66aceb8059a5a9 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 8 Aug 2024 11:17:54 -0700 Subject: [PATCH 36/37] Bump memory limit to test --- .github/workflows/deployment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deployment.yml b/.github/workflows/deployment.yml index f9cdc4073..aefd42194 100644 --- a/.github/workflows/deployment.yml +++ b/.github/workflows/deployment.yml @@ -257,7 +257,7 @@ jobs: shell: bash run: | oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}" - PROJ_DEV="e1e498-dev" MEMORY_REQUEST=250Mi MEMORY_LIMIT=500Mi bash openshift/scripts/oc_provision_nats.sh ${SUFFIX} apply + PROJ_DEV="e1e498-dev" MEMORY_REQUEST=2000Mi MEMORY_LIMIT=5000Mi bash openshift/scripts/oc_provision_nats.sh ${SUFFIX} apply scan-dev: name: ZAP Baseline Scan Dev From 8bf6f8d9f6ebe47056545f9a23e3c6f9e0815b08 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 8 Aug 2024 11:52:17 -0700 Subject: [PATCH 37/37] Revert "Bump memory limit to test" This reverts commit 51ce4b284ad549a3473a7b1e8f66aceb8059a5a9. --- .github/workflows/deployment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deployment.yml b/.github/workflows/deployment.yml index aefd42194..f9cdc4073 100644 --- a/.github/workflows/deployment.yml +++ b/.github/workflows/deployment.yml @@ -257,7 +257,7 @@ jobs: shell: bash run: | oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}" - PROJ_DEV="e1e498-dev" MEMORY_REQUEST=2000Mi MEMORY_LIMIT=5000Mi bash openshift/scripts/oc_provision_nats.sh ${SUFFIX} apply + PROJ_DEV="e1e498-dev" MEMORY_REQUEST=250Mi MEMORY_LIMIT=500Mi bash openshift/scripts/oc_provision_nats.sh ${SUFFIX} apply scan-dev: name: ZAP Baseline Scan Dev