From 9b51ed33e8fb8d39fff659e7d242ec0fc311b3d2 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Mon, 29 Jul 2024 14:18:59 -0700 Subject: [PATCH 1/4] Store snow masked hfi rasters --- .vscode/settings.json | 5 +- api/app/auto_spatial_advisory/hfi_filepath.py | 49 ++++++++++ api/app/auto_spatial_advisory/hfi_pmtiles.py | 24 ----- api/app/auto_spatial_advisory/process_hfi.py | 97 +++++++++++-------- api/app/utils/polygonize.py | 31 +++--- 5 files changed, 122 insertions(+), 84 deletions(-) create mode 100644 api/app/auto_spatial_advisory/hfi_filepath.py delete mode 100644 api/app/auto_spatial_advisory/hfi_pmtiles.py diff --git a/.vscode/settings.json b/.vscode/settings.json index ac35de154..00bdbdc0c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -66,19 +66,22 @@ "GDPS", "GEOGCS", "geospatial", + "geotiff", "grib", "gribs", "hourlies", "HRDPS", + "luxon", "maxx", "maxy", "miny", - "luxon", "ndarray", "numba", "osgeo", "PMTILES", "polygonize", + "Polygonized", + "polygonizing", "PRECIP", "PRIMEM", "PROJCS", diff --git a/api/app/auto_spatial_advisory/hfi_filepath.py b/api/app/auto_spatial_advisory/hfi_filepath.py new file mode 100644 index 000000000..9986bf23d --- /dev/null +++ b/api/app/auto_spatial_advisory/hfi_filepath.py @@ -0,0 +1,49 @@ +import os +from app.auto_spatial_advisory.run_type import RunType +from datetime import date + + +def get_pmtiles_filepath(run_date: date, run_type: RunType, filename: str) -> str: + """ + Get the file path for both reading and writing the pmtiles from/to the object store. + Example: {bucket}/psu/pmtiles/hfi/actual/[issue/run_date]/hfi[for_date].pmtiles + + + :param run_date: The date of the run to process. (when was the hfi file created?) + :param run_type: forecast or actual + :param filename: hfi[for_date].pmtiles -> hfi20230821.pmtiles + :return: s3 bucket key for pmtiles file + """ + pmtiles_filepath = os.path.join("psu", "pmtiles", "hfi", run_type.value, run_date.strftime("%Y-%m-%d"), filename) + + return pmtiles_filepath + + +def get_pmtiles_filename(for_date: date): + return f'hfi{for_date.strftime("%Y%m%d")}.pmtiles' + + +def get_raster_filepath(run_date: date, run_type: RunType, filename: str) -> str: + """ + Get the file path for both reading and writing the tif raster from/to the object store. + Example: {bucket}/psu/rasters/hfi/actual/[issue/run_date]/hfi[for_date].tif + + + :param run_date: The date of the run to process. (when was the hfi file created?) + :param run_type: forecast or actual + :param filename: hfi[for_date].tif -> hfi20230821.tif + :return: s3 bucket key for raster file + """ + raster_filepath = os.path.join("psu", "rasters", "hfi", run_type.value, run_date.strftime("%Y-%m-%d"), filename) + + return raster_filepath + + +def get_raster_tif_filename(for_date: date) -> str: + """ + Returns the object store filename for a raster tif based on a given for_date. + + :param for_date: the date the hfi tif is forecasted for + :return: filename string + """ + return f'snow_masked_hfi{for_date.strftime("%Y%m%d")}.tif' diff --git a/api/app/auto_spatial_advisory/hfi_pmtiles.py b/api/app/auto_spatial_advisory/hfi_pmtiles.py deleted file mode 100644 index e80377dea..000000000 --- a/api/app/auto_spatial_advisory/hfi_pmtiles.py +++ /dev/null @@ -1,24 +0,0 @@ -import os -from app.auto_spatial_advisory.run_type import RunType -from datetime import date - - - -def get_pmtiles_filepath(run_date: date, run_type: RunType, filename: str) -> str: - """ - Get the file path for both reading and writing the pmtiles from/to the object store. - Example: {bucket}/sfms/upload/actual/[issue/run_date]/hfi[for_date].pmtiles - - - :param run_date: The date of the run to process. (when was the hfi file created?) - :type run_date: date - :param run_type: forecast or actual - :type run_type: RunType - :param filename: hfi[for_date].pmtiles -> hfi20230821.pmtiles - :type filename: str - :return: s3 bucket key for pmtiles file - :rtype: str - """ - pmtiles_filepath = os.path.join('psu', 'pmtiles', 'hfi', run_type.value, run_date.strftime('%Y-%m-%d'), filename) - - return pmtiles_filepath diff --git a/api/app/auto_spatial_advisory/process_hfi.py b/api/app/auto_spatial_advisory/process_hfi.py index 6ac501f7e..6d7b709c5 100644 --- a/api/app/auto_spatial_advisory/process_hfi.py +++ b/api/app/auto_spatial_advisory/process_hfi.py @@ -18,7 +18,7 @@ from app.auto_spatial_advisory.run_type import RunType from app.auto_spatial_advisory.snow import apply_snow_mask from app.geospatial import NAD83_BC_ALBERS -from app.auto_spatial_advisory.hfi_pmtiles import get_pmtiles_filepath +from app.auto_spatial_advisory.hfi_filepath import get_pmtiles_filepath, get_raster_filepath, get_raster_tif_filename from app.utils.polygonize import polygonize_in_memory from app.utils.pmtiles import tippecanoe_wrapper, write_geojson from app.utils.s3 import get_client @@ -98,27 +98,38 @@ async def process_hfi(run_type: RunType, run_date: date, run_datetime: datetime, hfi_key = get_s3_key(run_type, run_date, for_date) logger.info(f"Key to HFI in object storage: {hfi_key}") - with tempfile.TemporaryDirectory() as temp_dir: - temp_filename = os.path.join(temp_dir, "classified.tif") - classify_hfi(hfi_key, temp_filename) - # If something has gone wrong with the collection of snow coverage data and it has not been collected - # within the past 7 days, don't apply an old snow mask, work with the classified hfi data as is - if last_processed_snow is None or last_processed_snow[0].for_date + datetime.timedelta(days=7) < datetime.now(): - logger.info("No recently processed snow data found. Proceeding with non-masked hfi data.") - working_hfi_path = temp_filename - else: - working_hfi_path = await apply_snow_mask(temp_filename, last_processed_snow[0], temp_dir) - # Create a snow coverage mask from previously downloaded snow data. - with polygonize_in_memory(working_hfi_path, "hfi", "hfi") as layer: - # We need a geojson file to pass to tippecanoe - temp_geojson = write_geojson(layer, temp_dir) - - pmtiles_filename = f'hfi{for_date.strftime("%Y%m%d")}.pmtiles' - temp_pmtiles_filepath = os.path.join(temp_dir, pmtiles_filename) - logger.info(f"Writing pmtiles -- {pmtiles_filename}") - tippecanoe_wrapper(temp_geojson, temp_pmtiles_filepath, min_zoom=HFI_PMTILES_MIN_ZOOM, max_zoom=HFI_PMTILES_MAX_ZOOM) - - async with get_client() as (client, bucket): + async with get_client() as (client, bucket): + with tempfile.TemporaryDirectory() as temp_dir: + temp_filename = os.path.join(temp_dir, "classified.tif") + classify_hfi(hfi_key, temp_filename) + # If something has gone wrong with the collection of snow coverage data and it has not been collected + # within the past 7 days, don't apply an old snow mask, work with the classified hfi data as is + if last_processed_snow is None or last_processed_snow[0].for_date + datetime.timedelta(days=7) < datetime.now(): + logger.info("No recently processed snow data found. Proceeding with non-masked hfi data.") + working_hfi_path = temp_filename + else: + # Create a snow coverage mask from previously downloaded snow data. + working_hfi_path = await apply_snow_mask(temp_filename, last_processed_snow[0], temp_dir) + + raster_filename = get_raster_tif_filename(for_date) + raster_key = get_raster_filepath(run_date, run_type, raster_filename) + logger.info(f"Uploading file {raster_filename} to {raster_key}") + await client.put_object( + Bucket=bucket, + Key=raster_key, + ACL=HFI_PMTILES_PERMISSIONS, # We need these to be accessible to everyone + Body=open(working_hfi_path, "rb"), + ) + logger.info("Done uploading %s", raster_key) + with polygonize_in_memory(working_hfi_path, "hfi", "hfi") as layer: + # We need a geojson file to pass to tippecanoe + temp_geojson = write_geojson(layer, temp_dir) + + pmtiles_filename = f'hfi{for_date.strftime("%Y%m%d")}.pmtiles' + temp_pmtiles_filepath = os.path.join(temp_dir, pmtiles_filename) + logger.info(f"Writing pmtiles -- {pmtiles_filename}") + tippecanoe_wrapper(temp_geojson, temp_pmtiles_filepath, min_zoom=HFI_PMTILES_MIN_ZOOM, max_zoom=HFI_PMTILES_MAX_ZOOM) + key = get_pmtiles_filepath(run_date, run_type, pmtiles_filename) logger.info(f"Uploading file {pmtiles_filename} to {key}") @@ -128,27 +139,27 @@ async def process_hfi(run_type: RunType, run_date: date, run_datetime: datetime, ACL=HFI_PMTILES_PERMISSIONS, # We need these to be accessible to everyone Body=open(temp_pmtiles_filepath, "rb"), ) - logger.info("Done uploading file") - - spatial_reference: osr.SpatialReference = layer.GetSpatialRef() - target_srs = osr.SpatialReference() - target_srs.ImportFromEPSG(NAD83_BC_ALBERS) - target_srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) - coordinate_transform = osr.CoordinateTransformation(spatial_reference, target_srs) - - async with get_async_write_session_scope() as session: - advisory = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.ADVISORY) - warning = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.WARNING) - - logger.info("Writing HFI advisory zones to API database...") - for i in range(layer.GetFeatureCount()): - # https://gdal.org/api/python/osgeo.ogr.html#osgeo.ogr.Feature - feature: ogr.Feature = layer.GetFeature(i) - obj = create_model_object(feature, advisory, warning, coordinate_transform, run_type, run_datetime, for_date) - await save_hfi(session, obj) - - # Store the unique combination of run type, run datetime and for date in the run_parameters table - await save_run_parameters(session, run_type, run_datetime, for_date) + logger.info("Done uploading %s", key) + + spatial_reference: osr.SpatialReference = layer.GetSpatialRef() + target_srs = osr.SpatialReference() + target_srs.ImportFromEPSG(NAD83_BC_ALBERS) + target_srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) + coordinate_transform = osr.CoordinateTransformation(spatial_reference, target_srs) + + async with get_async_write_session_scope() as session: + advisory = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.ADVISORY) + warning = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.WARNING) + + logger.info("Writing HFI advisory zones to API database...") + for i in range(layer.GetFeatureCount()): + # https://gdal.org/api/python/osgeo.ogr.html#osgeo.ogr.Feature + feature: ogr.Feature = layer.GetFeature(i) + obj = create_model_object(feature, advisory, warning, coordinate_transform, run_type, run_datetime, for_date) + await save_hfi(session, obj) + + # Store the unique combination of run type, run datetime and for date in the run_parameters table + await save_run_parameters(session, run_type, run_datetime, for_date) perf_end = perf_counter() delta = perf_end - perf_start diff --git a/api/app/utils/polygonize.py b/api/app/utils/polygonize.py index c3e796e9f..c00a8d52b 100644 --- a/api/app/utils/polygonize.py +++ b/api/app/utils/polygonize.py @@ -1,4 +1,5 @@ -""" Code for polygonizing a geotiff file. """ +"""Code for polygonizing a geotiff file.""" + import logging from contextlib import contextmanager from osgeo import gdal, ogr, osr @@ -9,13 +10,13 @@ def _create_in_memory_band(data: np.ndarray, cols, rows, projection, geotransform): - """ Create an in memory data band to represent a single raster layer. + """Create an in memory data band to represent a single raster layer. See https://gdal.org/user/raster_data_model.html#raster-band for a complete description of what a raster band is. """ - mem_driver = gdal.GetDriverByName('MEM') + mem_driver = gdal.GetDriverByName("MEM") - dataset = mem_driver.Create('memory', cols, rows, 1, gdal.GDT_Byte) + dataset = mem_driver.Create("memory", cols, rows, 1, gdal.GDT_Byte) dataset.SetProjection(projection) dataset.SetGeoTransform(geotransform) band = dataset.GetRasterBand(1) @@ -26,7 +27,7 @@ def _create_in_memory_band(data: np.ndarray, cols, rows, projection, geotransfor @contextmanager def polygonize_in_memory(geotiff_filename, layer, field) -> ogr.Layer: - """ Given some tiff file, return a polygonized version of it, in memory, as an ogr layer. """ + """Given some tiff file, return a polygonized version of it, in memory, as an ogr layer.""" source: gdal.Dataset = gdal.Open(geotiff_filename, gdal.GA_ReadOnly) source_band = source.GetRasterBand(1) @@ -37,9 +38,7 @@ def polygonize_in_memory(geotiff_filename, layer, field) -> ogr.Layer: # generate mask data mask_data = np.where(source_data == nodata_value, False, True) - mask_ds, mask_band = _create_in_memory_band( - mask_data, source_band.XSize, source_band.YSize, source.GetProjection(), - source.GetGeoTransform()) + mask_ds, mask_band = _create_in_memory_band(mask_data, source_band.XSize, source_band.YSize, source.GetProjection(), source.GetGeoTransform()) # Create a memory OGR datasource to put results in. # https://gdal.org/drivers/vector/memory.html#vector-memory @@ -71,15 +70,15 @@ def polygonize_geotiff_to_shapefile(raster_source_filename, vector_dest_filename .shp, and inserts polygonized contents of source file into destination file. """ - if raster_source_filename[-3:] != '.tif': - return f'{raster_source_filename} is an invalid file format for raster source' - if vector_dest_filename[-3:] != '.shp': - vector_dest_filename += '.shp' + if raster_source_filename[-3:] != ".tif": + return f"{raster_source_filename} is an invalid file format for raster source" + if vector_dest_filename[-3:] != ".shp": + vector_dest_filename += ".shp" source_data = gdal.Open(raster_source_filename, gdal.GA_ReadOnly) source_band = source_data.GetRasterBand(1) - value = ogr.FieldDefn('Band 1', ogr.OFTInteger) - logger.info('%s raster count %s', raster_source_filename, source_data.RasterCount) + value = ogr.FieldDefn("Band 1", ogr.OFTInteger) + logger.info("%s raster count %s", raster_source_filename, source_data.RasterCount) driver = ogr.GetDriverByName("ESRI Shapefile") destination = driver.CreateDataSource(vector_dest_filename) @@ -88,7 +87,7 @@ def polygonize_geotiff_to_shapefile(raster_source_filename, vector_dest_filename dest_layer = destination.CreateLayer(vector_dest_filename, geom_type=ogr.wkbPolygon, srs=dest_srs) dest_layer.CreateField(value) # 'Band 1' is the field name on the layer for Fuel Type ID - dest_field = dest_layer.GetLayerDefn().GetFieldIndex('Band 1') + dest_field = dest_layer.GetLayerDefn().GetFieldIndex("Band 1") gdal.Polygonize(source_band, None, dest_layer, dest_field, []) - return f'Polygonized {raster_source_filename} to {vector_dest_filename}' + return f"Polygonized {raster_source_filename} to {vector_dest_filename}" From 10744b518d35ca2b903622016caad24a01aad294 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Mon, 29 Jul 2024 14:31:01 -0700 Subject: [PATCH 2/4] Naming --- api/app/auto_spatial_advisory/process_hfi.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/app/auto_spatial_advisory/process_hfi.py b/api/app/auto_spatial_advisory/process_hfi.py index 6d7b709c5..0fcdda2cc 100644 --- a/api/app/auto_spatial_advisory/process_hfi.py +++ b/api/app/auto_spatial_advisory/process_hfi.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) -HFI_PMTILES_PERMISSIONS = "public-read" +HFI_GEOSPATIAL_PERMISSIONS = "public-read" HFI_PMTILES_MIN_ZOOM = 4 HFI_PMTILES_MAX_ZOOM = 11 @@ -117,7 +117,7 @@ async def process_hfi(run_type: RunType, run_date: date, run_datetime: datetime, await client.put_object( Bucket=bucket, Key=raster_key, - ACL=HFI_PMTILES_PERMISSIONS, # We need these to be accessible to everyone + ACL=HFI_GEOSPATIAL_PERMISSIONS, # We need these to be accessible to everyone Body=open(working_hfi_path, "rb"), ) logger.info("Done uploading %s", raster_key) @@ -136,7 +136,7 @@ async def process_hfi(run_type: RunType, run_date: date, run_datetime: datetime, await client.put_object( Bucket=bucket, Key=key, - ACL=HFI_PMTILES_PERMISSIONS, # We need these to be accessible to everyone + ACL=HFI_GEOSPATIAL_PERMISSIONS, # We need these to be accessible to everyone Body=open(temp_pmtiles_filepath, "rb"), ) logger.info("Done uploading %s", key) From 5afcfd2a62efbcd299e4b82d9598f24bb1d4f536 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Mon, 29 Jul 2024 14:32:23 -0700 Subject: [PATCH 3/4] doc string --- api/app/auto_spatial_advisory/hfi_filepath.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/api/app/auto_spatial_advisory/hfi_filepath.py b/api/app/auto_spatial_advisory/hfi_filepath.py index 9986bf23d..a13e2c8dc 100644 --- a/api/app/auto_spatial_advisory/hfi_filepath.py +++ b/api/app/auto_spatial_advisory/hfi_filepath.py @@ -20,6 +20,12 @@ def get_pmtiles_filepath(run_date: date, run_type: RunType, filename: str) -> st def get_pmtiles_filename(for_date: date): + """ + Returns the object store filename for a pmtiles file based on a given for_date. + + :param for_date: the date the hfi pmtiles is forecasted for + :return: filename string + """ return f'hfi{for_date.strftime("%Y%m%d")}.pmtiles' From 878183efddef8c1a4757bfa4d324207a9acfacd3 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Mon, 29 Jul 2024 14:45:08 -0700 Subject: [PATCH 4/4] use function --- api/app/auto_spatial_advisory/process_hfi.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/app/auto_spatial_advisory/process_hfi.py b/api/app/auto_spatial_advisory/process_hfi.py index 0fcdda2cc..edfa8788b 100644 --- a/api/app/auto_spatial_advisory/process_hfi.py +++ b/api/app/auto_spatial_advisory/process_hfi.py @@ -18,7 +18,7 @@ from app.auto_spatial_advisory.run_type import RunType from app.auto_spatial_advisory.snow import apply_snow_mask from app.geospatial import NAD83_BC_ALBERS -from app.auto_spatial_advisory.hfi_filepath import get_pmtiles_filepath, get_raster_filepath, get_raster_tif_filename +from app.auto_spatial_advisory.hfi_filepath import get_pmtiles_filename, get_pmtiles_filepath, get_raster_filepath, get_raster_tif_filename from app.utils.polygonize import polygonize_in_memory from app.utils.pmtiles import tippecanoe_wrapper, write_geojson from app.utils.s3 import get_client @@ -125,7 +125,7 @@ async def process_hfi(run_type: RunType, run_date: date, run_datetime: datetime, # We need a geojson file to pass to tippecanoe temp_geojson = write_geojson(layer, temp_dir) - pmtiles_filename = f'hfi{for_date.strftime("%Y%m%d")}.pmtiles' + pmtiles_filename = get_pmtiles_filename(for_date) temp_pmtiles_filepath = os.path.join(temp_dir, pmtiles_filename) logger.info(f"Writing pmtiles -- {pmtiles_filename}") tippecanoe_wrapper(temp_geojson, temp_pmtiles_filepath, min_zoom=HFI_PMTILES_MIN_ZOOM, max_zoom=HFI_PMTILES_MAX_ZOOM)