Skip to content

Commit

Permalink
SFMS: Daily FWI (#4126)
Browse files Browse the repository at this point in the history
Co-authored-by: Conor Brady <con.brad@gmail.com>
  • Loading branch information
dgboss and conbrad authored Nov 21, 2024
1 parent 29d509c commit 8d58b59
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 14 deletions.
4 changes: 2 additions & 2 deletions api/app/jobs/sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class SFMSCalcJob:
async def calculate_daily_fwi(self, start_time: datetime):
"""
Entry point for processing SFMS DMC/DC/BUI rasters. To run from a specific date manually in openshift,
Entry point for processing SFMS daily FWI rasters. To run from a specific date manually in openshift,
see openshift/sfms-calculate/README.md
"""
logger.info(f"Begin BUI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward")
Expand All @@ -30,7 +30,7 @@ async def calculate_daily_fwi(self, start_time: datetime):
daily_processor = DailyFWIProcessor(start_time, DAYS_TO_CALCULATE, RasterKeyAddresser())

async with S3Client() as s3_client:
await daily_processor.process(s3_client, multi_wps_dataset_context, multi_wps_dataset_context, multi_wps_dataset_context)
await daily_processor.process(s3_client, multi_wps_dataset_context, multi_wps_dataset_context, multi_wps_dataset_context, multi_wps_dataset_context)

# calculate the execution time.
execution_time = get_utc_now() - start_exec
Expand Down
31 changes: 25 additions & 6 deletions api/app/sfms/daily_fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np

from app.geospatial.wps_dataset import WPSDataset
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_isi
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_fwi, calculate_isi
from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser
from app.utils.geospatial import GDALResamplingMethod
from app.utils.s3 import set_s3_gdal_config
Expand All @@ -30,7 +30,14 @@ def __init__(self, start_datetime: datetime, days: int, addresser: RasterKeyAddr
self.days = days
self.addresser = addresser

async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext, new_ffmc_context: MultiDatasetContext):
async def process(
self,
s3_client: S3Client,
input_dataset_context: MultiDatasetContext,
new_dmc_dc_context: MultiDatasetContext,
new_ffmc_context: MultiDatasetContext,
new_isi_bui_context: MultiDatasetContext,
):
set_s3_gdal_config()

for day in range(self.days):
Expand Down Expand Up @@ -114,16 +121,16 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset
with new_dmc_dc_context([new_dmc_path, new_dc_path]) as new_dmc_dc_datasets:
new_ds = cast(List[WPSDataset], new_dmc_dc_datasets) # Ensure correct type inference
new_dmc_ds, new_dc_ds = new_ds
bui_values, nodata = calculate_bui(new_dmc_ds, new_dc_ds)
bui_values, bui_nodata = calculate_bui(new_dmc_ds, new_dc_ds)

# Store the new BUI dataset
await s3_client.persist_raster_data(
new_bui_path = await s3_client.persist_raster_data(
temp_dir,
new_bui_key,
dmc_ds.as_gdal_ds().GetGeoTransform(),
dmc_ds.as_gdal_ds().GetProjection(),
bui_values,
nodata,
bui_nodata,
)

# Open new FFMC dataset and calculate ISI
Expand All @@ -134,7 +141,7 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset
isi_values, isi_nodata = calculate_isi(new_ffmc_ds, warped_wind_speed_ds)

# Store the new ISI dataset
await s3_client.persist_raster_data(
new_isi_path = await s3_client.persist_raster_data(
temp_dir,
new_isi_key,
new_ffmc_ds.as_gdal_ds().GetGeoTransform(),
Expand All @@ -143,6 +150,18 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset
isi_nodata,
)

# Open new ISI and BUI datasets to calculate FWI
new_fwi_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.FWI)
with new_isi_bui_context([new_isi_path, new_bui_path]) as new_isi_bui_datasets:
new_ds = cast(List[WPSDataset], new_isi_bui_datasets) # Ensure correct type inference
new_isi_ds, new_bui_ds = new_ds

fwi_values, fwi_nodata = calculate_fwi(new_isi_ds, new_bui_ds)

await s3_client.persist_raster_data(
temp_dir, new_fwi_key, new_isi_ds.as_gdal_ds().GetGeoTransform(), new_isi_ds.as_gdal_ds().GetProjection(), fwi_values, fwi_nodata
)

def _get_calculate_dates(self, day: int):
"""
Calculate the UTC date and times based on the provided day offset.
Expand Down
17 changes: 16 additions & 1 deletion api/app/sfms/fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import numpy as np

from app.auto_spatial_advisory.sfms import vectorized_bui, vectorized_dc, vectorized_dmc, vectorized_ffmc, vectorized_isi
from app.auto_spatial_advisory.sfms import vectorized_bui, vectorized_dc, vectorized_dmc, vectorized_ffmc, vectorized_fwi, vectorized_isi
from app.geospatial.wps_dataset import WPSDataset

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -76,6 +76,21 @@ def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPS
return ffmc_values, nodata_value


def calculate_fwi(isi_ds: WPSDataset, bui_ds: WPSDataset):
isi_array, _ = isi_ds.replace_nodata_with(0)
bui_array, _ = bui_ds.replace_nodata_with(0)

start = perf_counter()
fwi_values = vectorized_fwi(isi_array, bui_array)
logger.info("%f seconds to calculate vectorized fwi", perf_counter() - start)

nodata_mask, nodata_value = isi_ds.get_nodata_mask()
if nodata_mask is not None:
fwi_values[nodata_mask] = nodata_value

return fwi_values, nodata_value


def calculate_isi(ffmc_ds: WPSDataset, wind_speed_ds: WPSDataset):
ffmc_array, _ = ffmc_ds.replace_nodata_with(0)
wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0)
Expand Down
22 changes: 19 additions & 3 deletions api/app/tests/sfms/test_daily_fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
new_datasets, mock_new_ffmc_datasets_context = create_mock_new_ds_context(1)
mock_new_ffmc_ds = new_datasets[0]

# mock the isi and bui datasets
new_datasets, mock_new_isi_bui_datasets_context = create_mock_new_ds_context(2)
mock_new_isi_ds, mock_new_bui_ds = new_datasets

# mock gdal open
mocker.patch("osgeo.gdal.Open", return_value=create_mock_gdal_dataset())

Expand All @@ -92,14 +96,15 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
calculate_bui_spy = mocker.spy(daily_fwi_processor, "calculate_bui")
calculate_ffmc_spy = mocker.spy(daily_fwi_processor, "calculate_ffmc")
calculate_isi_spy = mocker.spy(daily_fwi_processor, "calculate_isi")
calculate_fwi_spy = mocker.spy(daily_fwi_processor, "calculate_fwi")

async with S3Client() as mock_s3_client:
# mock s3 client
mock_all_objects_exist = AsyncMock(return_value=True)
mocker.patch.object(mock_s3_client, "all_objects_exist", new=mock_all_objects_exist)
persist_raster_spy = mocker.patch.object(mock_s3_client, "persist_raster_data", return_value="test_key.tif")

await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_datasets_context)
await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_datasets_context, mock_new_isi_bui_datasets_context)

# Verify weather model keys and actual keys are checked for both days
assert mock_all_objects_exist.call_count == 4
Expand Down Expand Up @@ -142,6 +147,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.FFMC),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.BUI),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.ISI),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.FWI),
# second day, previous days' dc and dmc are looked up first
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DC),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DMC),
Expand All @@ -151,6 +157,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.FFMC),
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.BUI),
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.ISI),
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.FWI),
]

# Verify weather inputs are warped to match dmc raster
Expand Down Expand Up @@ -192,6 +199,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
wps_datasets = ffmc_calls[0][1:4] # Extract dataset arguments
assert all(isinstance(ds, WPSDataset) for ds in wps_datasets)


assert calculate_bui_spy.call_args_list == [
mocker.call(mock_new_dmc_ds, mock_new_dc_ds),
mocker.call(mock_new_dmc_ds, mock_new_dc_ds),
Expand All @@ -203,8 +211,13 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
wps_datasets = isi_calls.args # Extract dataset arguments
assert all(isinstance(ds, WPSDataset) for ds in wps_datasets)

assert calculate_fwi_spy.call_args_list == [
mocker.call(mock_new_isi_ds, mock_new_bui_ds),
mocker.call(mock_new_isi_ds, mock_new_bui_ds),
]

# 5 each day, new dmc, dc, ffmc, bui, and isi rasters
assert persist_raster_spy.call_count == 10
assert persist_raster_spy.call_count == 12


@pytest.mark.parametrize(
Expand All @@ -225,20 +238,23 @@ async def test_no_weather_keys_exist(side_effect_1: bool, side_effect_2: bool, m

_, mock_new_dmc_dc_datasets_context = create_mock_new_ds_context(2)
_, mock_new_ffmc_dataset_context = create_mock_new_ds_context(1)
_, mock_new_isi_bui_datasets_context = create_mock_new_ds_context(2)

# calculation spies
calculate_dmc_spy = mocker.spy(daily_fwi_processor, "calculate_dmc")
calculate_dc_spy = mocker.spy(daily_fwi_processor, "calculate_dc")
calculate_bui_spy = mocker.spy(daily_fwi_processor, "calculate_bui")
calculate_ffmc_spy = mocker.spy(daily_fwi_processor, "calculate_ffmc")
calculate_isi_spy = mocker.spy(daily_fwi_processor, "calculate_isi")
calculate_fwi_spy = mocker.spy(daily_fwi_processor, "calculate_fwi")

fwi_processor = DailyFWIProcessor(TEST_DATETIME, 1, RasterKeyAddresser())

await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_dataset_context)
await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_dataset_context, mock_new_isi_bui_datasets_context)

calculate_dmc_spy.assert_not_called()
calculate_dc_spy.assert_not_called()
calculate_bui_spy.assert_not_called()
calculate_ffmc_spy.assert_not_called()
calculate_isi_spy.assert_not_called()
calculate_fwi_spy.assert_not_called()
50 changes: 48 additions & 2 deletions api/app/tests/sfms/test_fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@

import numpy as np
import pytest
from cffdrs import bui, dc, dmc, ffmc, isi
from cffdrs import bui, dc, dmc, ffmc, fwi, isi
from osgeo import osr

from app.geospatial.wps_dataset import WPSDataset
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_isi
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_fwi, calculate_isi

FWI_ARRAY = np.array([[12, 20], [-999, -999]])
WEATHER_ARRAY = np.array([[12, 20], [0, 0]])


@dataclass
class InputDatasets:
bui: WPSDataset
dc: WPSDataset
dmc: WPSDataset
ffmc: WPSDataset
isi: WPSDataset
temp: WPSDataset
rh: WPSDataset
precip: WPSDataset
Expand All @@ -31,9 +33,11 @@ def input_datasets():
transform = (-2, 1, 0, 2, 0, -1)

return InputDatasets(
bui=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999),
dc=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999),
dmc=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999),
ffmc=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999),
isi=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999),
temp=WPSDataset.from_array(WEATHER_ARRAY, transform, srs.ExportToWkt()),
rh=WPSDataset.from_array(WEATHER_ARRAY, transform, srs.ExportToWkt()),
precip=WPSDataset.from_array(WEATHER_ARRAY, transform, srs.ExportToWkt()),
Expand Down Expand Up @@ -152,6 +156,20 @@ def test_calculate_bui_values(input_datasets):
assert math.isclose(static_bui, bui_values[0, 0], abs_tol=0.01)


def test_calculate_isi_masked_correctly(input_datasets):
ffmc_ds = input_datasets.ffmc
windspeed_ds = input_datasets.wind_speed

isi_values, nodata_value = calculate_isi(ffmc_ds, windspeed_ds)

# validate output shape and nodata masking
assert isi_values.shape == (2, 2)
assert isi_values[1, 0] == nodata_value
assert isi_values[1, 1] == nodata_value
assert isi_values[0, 0] != nodata_value
assert isi_values[0, 1] != nodata_value


def test_calculate_isi_values(input_datasets):
ffmc_ds = input_datasets.ffmc
wind_speed_ds = input_datasets.wind_speed
Expand Down Expand Up @@ -197,3 +215,31 @@ def test_calculate_ffmc_masked_correctly(input_datasets):
assert daily_ffmc_values[1, 1] == nodata_value
assert daily_ffmc_values[0, 0] != nodata_value
assert daily_ffmc_values[0, 1] != nodata_value


def test_calculate_fwi_masked_correctly(input_datasets):
isi_ds = input_datasets.isi
bui_ds = input_datasets.bui

fwi_values, nodata_value = calculate_fwi(isi_ds, bui_ds)

# validate output shape and nodata masking
assert fwi_values.shape == (2, 2)
assert fwi_values[1, 0] == nodata_value
assert fwi_values[1, 1] == nodata_value
assert fwi_values[0, 0] != nodata_value
assert fwi_values[0, 1] != nodata_value


def test_calculate_fwi_values(input_datasets):
isi_ds = input_datasets.isi
bui_ds = input_datasets.bui

isi_sample = FWI_ARRAY[0, 0]
bui_sample = FWI_ARRAY[0, 0]

fwi_values, _ = calculate_fwi(isi_ds, bui_ds)

static_fwi = fwi(isi_sample, bui_sample)

assert math.isclose(static_fwi, fwi_values[0, 0], abs_tol=0.01)

0 comments on commit 8d58b59

Please sign in to comment.