From 8d58b596552f0c66dc42acca52b0caa021eabfa5 Mon Sep 17 00:00:00 2001 From: dgboss Date: Thu, 21 Nov 2024 09:45:45 -0800 Subject: [PATCH] SFMS: Daily FWI (#4126) Co-authored-by: Conor Brady --- api/app/jobs/sfms_calculations.py | 4 +- api/app/sfms/daily_fwi_processor.py | 31 +++++++++--- api/app/sfms/fwi_processor.py | 17 ++++++- .../tests/sfms/test_daily_fwi_processor.py | 22 ++++++-- api/app/tests/sfms/test_fwi_processor.py | 50 ++++++++++++++++++- 5 files changed, 110 insertions(+), 14 deletions(-) diff --git a/api/app/jobs/sfms_calculations.py b/api/app/jobs/sfms_calculations.py index 91be7cb21..2deb590e7 100644 --- a/api/app/jobs/sfms_calculations.py +++ b/api/app/jobs/sfms_calculations.py @@ -20,7 +20,7 @@ class SFMSCalcJob: async def calculate_daily_fwi(self, start_time: datetime): """ - Entry point for processing SFMS DMC/DC/BUI rasters. To run from a specific date manually in openshift, + Entry point for processing SFMS daily FWI rasters. To run from a specific date manually in openshift, see openshift/sfms-calculate/README.md """ logger.info(f"Begin BUI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward") @@ -30,7 +30,7 @@ async def calculate_daily_fwi(self, start_time: datetime): daily_processor = DailyFWIProcessor(start_time, DAYS_TO_CALCULATE, RasterKeyAddresser()) async with S3Client() as s3_client: - await daily_processor.process(s3_client, multi_wps_dataset_context, multi_wps_dataset_context, multi_wps_dataset_context) + await daily_processor.process(s3_client, multi_wps_dataset_context, multi_wps_dataset_context, multi_wps_dataset_context, multi_wps_dataset_context) # calculate the execution time. execution_time = get_utc_now() - start_exec diff --git a/api/app/sfms/daily_fwi_processor.py b/api/app/sfms/daily_fwi_processor.py index c2d602424..b958ff677 100644 --- a/api/app/sfms/daily_fwi_processor.py +++ b/api/app/sfms/daily_fwi_processor.py @@ -7,7 +7,7 @@ import numpy as np from app.geospatial.wps_dataset import WPSDataset -from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_isi +from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_fwi, calculate_isi from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser from app.utils.geospatial import GDALResamplingMethod from app.utils.s3 import set_s3_gdal_config @@ -30,7 +30,14 @@ def __init__(self, start_datetime: datetime, days: int, addresser: RasterKeyAddr self.days = days self.addresser = addresser - async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext, new_ffmc_context: MultiDatasetContext): + async def process( + self, + s3_client: S3Client, + input_dataset_context: MultiDatasetContext, + new_dmc_dc_context: MultiDatasetContext, + new_ffmc_context: MultiDatasetContext, + new_isi_bui_context: MultiDatasetContext, + ): set_s3_gdal_config() for day in range(self.days): @@ -114,16 +121,16 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset with new_dmc_dc_context([new_dmc_path, new_dc_path]) as new_dmc_dc_datasets: new_ds = cast(List[WPSDataset], new_dmc_dc_datasets) # Ensure correct type inference new_dmc_ds, new_dc_ds = new_ds - bui_values, nodata = calculate_bui(new_dmc_ds, new_dc_ds) + bui_values, bui_nodata = calculate_bui(new_dmc_ds, new_dc_ds) # Store the new BUI dataset - await s3_client.persist_raster_data( + new_bui_path = await s3_client.persist_raster_data( temp_dir, new_bui_key, dmc_ds.as_gdal_ds().GetGeoTransform(), dmc_ds.as_gdal_ds().GetProjection(), bui_values, - nodata, + bui_nodata, ) # Open new FFMC dataset and calculate ISI @@ -134,7 +141,7 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset isi_values, isi_nodata = calculate_isi(new_ffmc_ds, warped_wind_speed_ds) # Store the new ISI dataset - await s3_client.persist_raster_data( + new_isi_path = await s3_client.persist_raster_data( temp_dir, new_isi_key, new_ffmc_ds.as_gdal_ds().GetGeoTransform(), @@ -143,6 +150,18 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset isi_nodata, ) + # Open new ISI and BUI datasets to calculate FWI + new_fwi_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.FWI) + with new_isi_bui_context([new_isi_path, new_bui_path]) as new_isi_bui_datasets: + new_ds = cast(List[WPSDataset], new_isi_bui_datasets) # Ensure correct type inference + new_isi_ds, new_bui_ds = new_ds + + fwi_values, fwi_nodata = calculate_fwi(new_isi_ds, new_bui_ds) + + await s3_client.persist_raster_data( + temp_dir, new_fwi_key, new_isi_ds.as_gdal_ds().GetGeoTransform(), new_isi_ds.as_gdal_ds().GetProjection(), fwi_values, fwi_nodata + ) + def _get_calculate_dates(self, day: int): """ Calculate the UTC date and times based on the provided day offset. diff --git a/api/app/sfms/fwi_processor.py b/api/app/sfms/fwi_processor.py index de54ccd58..2f2ca2ead 100644 --- a/api/app/sfms/fwi_processor.py +++ b/api/app/sfms/fwi_processor.py @@ -3,7 +3,7 @@ import numpy as np -from app.auto_spatial_advisory.sfms import vectorized_bui, vectorized_dc, vectorized_dmc, vectorized_ffmc, vectorized_isi +from app.auto_spatial_advisory.sfms import vectorized_bui, vectorized_dc, vectorized_dmc, vectorized_ffmc, vectorized_fwi, vectorized_isi from app.geospatial.wps_dataset import WPSDataset logger = logging.getLogger(__name__) @@ -76,6 +76,21 @@ def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPS return ffmc_values, nodata_value +def calculate_fwi(isi_ds: WPSDataset, bui_ds: WPSDataset): + isi_array, _ = isi_ds.replace_nodata_with(0) + bui_array, _ = bui_ds.replace_nodata_with(0) + + start = perf_counter() + fwi_values = vectorized_fwi(isi_array, bui_array) + logger.info("%f seconds to calculate vectorized fwi", perf_counter() - start) + + nodata_mask, nodata_value = isi_ds.get_nodata_mask() + if nodata_mask is not None: + fwi_values[nodata_mask] = nodata_value + + return fwi_values, nodata_value + + def calculate_isi(ffmc_ds: WPSDataset, wind_speed_ds: WPSDataset): ffmc_array, _ = ffmc_ds.replace_nodata_with(0) wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0) diff --git a/api/app/tests/sfms/test_daily_fwi_processor.py b/api/app/tests/sfms/test_daily_fwi_processor.py index d815d2f5f..0f0bf8dc1 100644 --- a/api/app/tests/sfms/test_daily_fwi_processor.py +++ b/api/app/tests/sfms/test_daily_fwi_processor.py @@ -83,6 +83,10 @@ async def test_daily_fwi_processor(mocker: MockerFixture): new_datasets, mock_new_ffmc_datasets_context = create_mock_new_ds_context(1) mock_new_ffmc_ds = new_datasets[0] + # mock the isi and bui datasets + new_datasets, mock_new_isi_bui_datasets_context = create_mock_new_ds_context(2) + mock_new_isi_ds, mock_new_bui_ds = new_datasets + # mock gdal open mocker.patch("osgeo.gdal.Open", return_value=create_mock_gdal_dataset()) @@ -92,6 +96,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture): calculate_bui_spy = mocker.spy(daily_fwi_processor, "calculate_bui") calculate_ffmc_spy = mocker.spy(daily_fwi_processor, "calculate_ffmc") calculate_isi_spy = mocker.spy(daily_fwi_processor, "calculate_isi") + calculate_fwi_spy = mocker.spy(daily_fwi_processor, "calculate_fwi") async with S3Client() as mock_s3_client: # mock s3 client @@ -99,7 +104,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture): mocker.patch.object(mock_s3_client, "all_objects_exist", new=mock_all_objects_exist) persist_raster_spy = mocker.patch.object(mock_s3_client, "persist_raster_data", return_value="test_key.tif") - await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_datasets_context) + await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_datasets_context, mock_new_isi_bui_datasets_context) # Verify weather model keys and actual keys are checked for both days assert mock_all_objects_exist.call_count == 4 @@ -142,6 +147,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture): mocker.call(EXPECTED_FIRST_DAY, FWIParameter.FFMC), mocker.call(EXPECTED_FIRST_DAY, FWIParameter.BUI), mocker.call(EXPECTED_FIRST_DAY, FWIParameter.ISI), + mocker.call(EXPECTED_FIRST_DAY, FWIParameter.FWI), # second day, previous days' dc and dmc are looked up first mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DC), mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DMC), @@ -151,6 +157,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture): mocker.call(EXPECTED_SECOND_DAY, FWIParameter.FFMC), mocker.call(EXPECTED_SECOND_DAY, FWIParameter.BUI), mocker.call(EXPECTED_SECOND_DAY, FWIParameter.ISI), + mocker.call(EXPECTED_SECOND_DAY, FWIParameter.FWI), ] # Verify weather inputs are warped to match dmc raster @@ -192,6 +199,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture): wps_datasets = ffmc_calls[0][1:4] # Extract dataset arguments assert all(isinstance(ds, WPSDataset) for ds in wps_datasets) + assert calculate_bui_spy.call_args_list == [ mocker.call(mock_new_dmc_ds, mock_new_dc_ds), mocker.call(mock_new_dmc_ds, mock_new_dc_ds), @@ -203,8 +211,13 @@ async def test_daily_fwi_processor(mocker: MockerFixture): wps_datasets = isi_calls.args # Extract dataset arguments assert all(isinstance(ds, WPSDataset) for ds in wps_datasets) + assert calculate_fwi_spy.call_args_list == [ + mocker.call(mock_new_isi_ds, mock_new_bui_ds), + mocker.call(mock_new_isi_ds, mock_new_bui_ds), + ] + # 5 each day, new dmc, dc, ffmc, bui, and isi rasters - assert persist_raster_spy.call_count == 10 + assert persist_raster_spy.call_count == 12 @pytest.mark.parametrize( @@ -225,6 +238,7 @@ async def test_no_weather_keys_exist(side_effect_1: bool, side_effect_2: bool, m _, mock_new_dmc_dc_datasets_context = create_mock_new_ds_context(2) _, mock_new_ffmc_dataset_context = create_mock_new_ds_context(1) + _, mock_new_isi_bui_datasets_context = create_mock_new_ds_context(2) # calculation spies calculate_dmc_spy = mocker.spy(daily_fwi_processor, "calculate_dmc") @@ -232,13 +246,15 @@ async def test_no_weather_keys_exist(side_effect_1: bool, side_effect_2: bool, m calculate_bui_spy = mocker.spy(daily_fwi_processor, "calculate_bui") calculate_ffmc_spy = mocker.spy(daily_fwi_processor, "calculate_ffmc") calculate_isi_spy = mocker.spy(daily_fwi_processor, "calculate_isi") + calculate_fwi_spy = mocker.spy(daily_fwi_processor, "calculate_fwi") fwi_processor = DailyFWIProcessor(TEST_DATETIME, 1, RasterKeyAddresser()) - await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_dataset_context) + await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_dataset_context, mock_new_isi_bui_datasets_context) calculate_dmc_spy.assert_not_called() calculate_dc_spy.assert_not_called() calculate_bui_spy.assert_not_called() calculate_ffmc_spy.assert_not_called() calculate_isi_spy.assert_not_called() + calculate_fwi_spy.assert_not_called() diff --git a/api/app/tests/sfms/test_fwi_processor.py b/api/app/tests/sfms/test_fwi_processor.py index cf8255110..ea95639f5 100644 --- a/api/app/tests/sfms/test_fwi_processor.py +++ b/api/app/tests/sfms/test_fwi_processor.py @@ -3,11 +3,11 @@ import numpy as np import pytest -from cffdrs import bui, dc, dmc, ffmc, isi +from cffdrs import bui, dc, dmc, ffmc, fwi, isi from osgeo import osr from app.geospatial.wps_dataset import WPSDataset -from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_isi +from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_fwi, calculate_isi FWI_ARRAY = np.array([[12, 20], [-999, -999]]) WEATHER_ARRAY = np.array([[12, 20], [0, 0]]) @@ -15,9 +15,11 @@ @dataclass class InputDatasets: + bui: WPSDataset dc: WPSDataset dmc: WPSDataset ffmc: WPSDataset + isi: WPSDataset temp: WPSDataset rh: WPSDataset precip: WPSDataset @@ -31,9 +33,11 @@ def input_datasets(): transform = (-2, 1, 0, 2, 0, -1) return InputDatasets( + bui=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999), dc=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999), dmc=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999), ffmc=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999), + isi=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999), temp=WPSDataset.from_array(WEATHER_ARRAY, transform, srs.ExportToWkt()), rh=WPSDataset.from_array(WEATHER_ARRAY, transform, srs.ExportToWkt()), precip=WPSDataset.from_array(WEATHER_ARRAY, transform, srs.ExportToWkt()), @@ -152,6 +156,20 @@ def test_calculate_bui_values(input_datasets): assert math.isclose(static_bui, bui_values[0, 0], abs_tol=0.01) +def test_calculate_isi_masked_correctly(input_datasets): + ffmc_ds = input_datasets.ffmc + windspeed_ds = input_datasets.wind_speed + + isi_values, nodata_value = calculate_isi(ffmc_ds, windspeed_ds) + + # validate output shape and nodata masking + assert isi_values.shape == (2, 2) + assert isi_values[1, 0] == nodata_value + assert isi_values[1, 1] == nodata_value + assert isi_values[0, 0] != nodata_value + assert isi_values[0, 1] != nodata_value + + def test_calculate_isi_values(input_datasets): ffmc_ds = input_datasets.ffmc wind_speed_ds = input_datasets.wind_speed @@ -197,3 +215,31 @@ def test_calculate_ffmc_masked_correctly(input_datasets): assert daily_ffmc_values[1, 1] == nodata_value assert daily_ffmc_values[0, 0] != nodata_value assert daily_ffmc_values[0, 1] != nodata_value + + +def test_calculate_fwi_masked_correctly(input_datasets): + isi_ds = input_datasets.isi + bui_ds = input_datasets.bui + + fwi_values, nodata_value = calculate_fwi(isi_ds, bui_ds) + + # validate output shape and nodata masking + assert fwi_values.shape == (2, 2) + assert fwi_values[1, 0] == nodata_value + assert fwi_values[1, 1] == nodata_value + assert fwi_values[0, 0] != nodata_value + assert fwi_values[0, 1] != nodata_value + + +def test_calculate_fwi_values(input_datasets): + isi_ds = input_datasets.isi + bui_ds = input_datasets.bui + + isi_sample = FWI_ARRAY[0, 0] + bui_sample = FWI_ARRAY[0, 0] + + fwi_values, _ = calculate_fwi(isi_ds, bui_ds) + + static_fwi = fwi(isi_sample, bui_sample) + + assert math.isclose(static_fwi, fwi_values[0, 0], abs_tol=0.01)