Skip to content

Commit

Permalink
Merge branch 'main' into task/update-backup-image
Browse files Browse the repository at this point in the history
  • Loading branch information
dgboss authored Nov 18, 2024
2 parents efb19f2 + 7d0ab02 commit a52b702
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 77 deletions.
5 changes: 4 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@
"type": "python",
"request": "launch",
"module": "app.jobs.sfms_calculations",
"console": "integratedTerminal"
"console": "integratedTerminal",
"args": [
"2024-10-23 12"
]
},
{
"name": "Chrome",
Expand Down
4 changes: 3 additions & 1 deletion api/app/auto_spatial_advisory/sfms.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from cffdrs import bui, dc, dmc, ffmc
from cffdrs import bui, dc, dmc, ffmc, fwi, isi
from numba import vectorize

vectorized_bui = vectorize(bui)
vectorized_dc = vectorize(dc)
vectorized_dmc = vectorize(dmc)
vectorized_ffmc = vectorize(ffmc)
vectorized_isi = vectorize(isi)
vectorized_fwi = vectorize(fwi)
7 changes: 3 additions & 4 deletions api/app/jobs/sfms_calculations.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import asyncio
from datetime import datetime, timezone
import logging
import os
import sys
from datetime import datetime, timezone

from app import configure_logging
from app.geospatial.wps_dataset import multi_wps_dataset_context
from app.rocketchat_notifications import send_rocketchat_notification
from app.sfms.daily_fwi_processor import DailyFWIProcessor
from app.sfms.raster_addresser import RasterKeyAddresser
from app.utils.s3_client import S3Client
from app.utils.time import get_utc_now
from app.geospatial.wps_dataset import multi_wps_dataset_context


logger = logging.getLogger(__name__)

Expand All @@ -31,7 +30,7 @@ async def calculate_daily_fwi(self, start_time: datetime):
daily_processor = DailyFWIProcessor(start_time, DAYS_TO_CALCULATE, RasterKeyAddresser())

async with S3Client() as s3_client:
await daily_processor.process(s3_client, multi_wps_dataset_context, multi_wps_dataset_context)
await daily_processor.process(s3_client, multi_wps_dataset_context, multi_wps_dataset_context, multi_wps_dataset_context)

# calculate the execution time.
execution_time = get_utc_now() - start_exec
Expand Down
31 changes: 24 additions & 7 deletions api/app/sfms/daily_fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import os
import tempfile
from datetime import datetime, timedelta
from typing import Callable, Tuple, List, Iterator, cast
from typing import Callable, Iterator, List, Tuple, cast

import numpy as np

from app.geospatial.wps_dataset import WPSDataset
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_isi
from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc
from app.utils.geospatial import GDALResamplingMethod
from app.utils.s3 import set_s3_gdal_config
from app.utils.s3_client import S3Client
Expand All @@ -30,25 +30,25 @@ def __init__(self, start_datetime: datetime, days: int, addresser: RasterKeyAddr
self.days = days
self.addresser = addresser

async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext):
async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext, new_ffmc_context: MultiDatasetContext):
set_s3_gdal_config()

for day in range(self.days):
datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour = self._get_calculate_dates(day)
logger.info(f"Calculating DMC/DC/BUI for {datetime_to_calculate_utc.isoformat()}")
logger.info(f"Calculating daily FWI rasters for {datetime_to_calculate_utc.isoformat()}")

# Get and check existence of weather s3 keys
temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour)
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, precip_key)
if not weather_keys_exist:
logging.warning(f"No weather keys found for {model_run_for_hour(self.start_datetime.hour):02} model run")
logging.warning(f"Missing weather keys for {model_run_for_hour(self.start_datetime.hour):02} model run")
break

# get and check existence of fwi s3 keys
dc_key, dmc_key, ffmc_key = self._get_previous_fwi_keys(day, previous_fwi_datetime)
fwi_keys_exist = await s3_client.all_objects_exist(dc_key, dmc_key)
if not fwi_keys_exist:
logging.warning(f"No previous DMC/DC keys found for {previous_fwi_datetime.date().isoformat()}")
logging.warning(f"No previous DMC/DC/FFMC keys found for {previous_fwi_datetime.date().isoformat()}")
break

temp_key, rh_key, wind_speed_key, precip_key, ffmc_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, wind_speed_key, precip_key, ffmc_key)
Expand Down Expand Up @@ -100,7 +100,7 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset
# Create and store FFMC dataset
ffmc_values, ffmc_no_data_value = calculate_ffmc(ffmc_ds, warped_temp_ds, warped_rh_ds, warped_wind_speed_ds, warped_precip_ds)
new_ffmc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.FFMC)
await s3_client.persist_raster_data(
new_ffmc_path = await s3_client.persist_raster_data(
temp_dir,
new_ffmc_key,
dc_ds.as_gdal_ds().GetGeoTransform(),
Expand All @@ -126,6 +126,23 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset
nodata,
)

# Open new FFMC dataset and calculate ISI
new_isi_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.ISI)
with new_ffmc_context([new_ffmc_path]) as new_ffmc_dataset_context:
new_ffmc_ds = cast(List[WPSDataset], new_ffmc_dataset_context)[0] # Ensure correct type inference

isi_values, isi_nodata = calculate_isi(new_ffmc_ds, warped_wind_speed_ds)

# Store the new ISI dataset
await s3_client.persist_raster_data(
temp_dir,
new_isi_key,
new_ffmc_ds.as_gdal_ds().GetGeoTransform(),
new_ffmc_ds.as_gdal_ds().GetProjection(),
isi_values,
isi_nodata,
)

def _get_calculate_dates(self, day: int):
"""
Calculate the UTC date and times based on the provided day offset.
Expand Down
20 changes: 18 additions & 2 deletions api/app/sfms/fwi_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from time import perf_counter
import logging
from time import perf_counter

import numpy as np

from app.auto_spatial_advisory.sfms import vectorized_bui, vectorized_dc, vectorized_dmc, vectorized_ffmc, vectorized_isi
from app.geospatial.wps_dataset import WPSDataset
from app.auto_spatial_advisory.sfms import vectorized_dmc, vectorized_dc, vectorized_bui, vectorized_ffmc

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -73,3 +74,18 @@ def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPS
ffmc_values[nodata_mask] = nodata_value

return ffmc_values, nodata_value


def calculate_isi(ffmc_ds: WPSDataset, wind_speed_ds: WPSDataset):
ffmc_array, _ = ffmc_ds.replace_nodata_with(0)
wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0)

start = perf_counter()
isi_values = vectorized_isi(ffmc_array, wind_speed_array, False)
logger.info("%f seconds to calculate vectorized ffmc", perf_counter() - start)

nodata_mask, nodata_value = ffmc_ds.get_nodata_mask()
if nodata_mask is not None:
isi_values[nodata_mask] = nodata_value

return isi_values, nodata_value
46 changes: 32 additions & 14 deletions api/app/tests/sfms/test_daily_fwi_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from contextlib import ExitStack, contextmanager
from datetime import datetime, timedelta, timezone
from typing import List
from unittest.mock import AsyncMock

import pytest
from datetime import datetime, timezone, timedelta
from pytest_mock import MockerFixture

from app.geospatial.wps_dataset import WPSDataset
from app.sfms import daily_fwi_processor
from app.sfms.daily_fwi_processor import DailyFWIProcessor
Expand Down Expand Up @@ -38,11 +40,11 @@ def mock_input_dataset_context(_: List[str]):
return input_datasets, mock_input_dataset_context


def create_mock_new_dmc_dc_context():
new_datasets = create_mock_wps_datasets(2)
def create_mock_new_ds_context(number_of_datasets: int):
new_datasets = create_mock_wps_datasets(number_of_datasets)

@contextmanager
def mock_new_dmc_dc_datasets_context(_: List[str]):
def mock_new_datasets_context(_: List[str]):
try:
# Enter each dataset's context and yield the list of instances
with ExitStack() as stack:
Expand All @@ -52,7 +54,7 @@ def mock_new_dmc_dc_datasets_context(_: List[str]):
for ds in new_datasets:
ds.close()

return new_datasets, mock_new_dmc_dc_datasets_context
return new_datasets, mock_new_datasets_context


@pytest.mark.anyio
Expand All @@ -74,9 +76,13 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
precip_ds_spy = mocker.spy(mock_precip_ds, "warp_to_match")

# mock new dmc and dc datasets
new_datasets, mock_new_dmc_dc_datasets_context = create_mock_new_dmc_dc_context()
new_datasets, mock_new_dmc_dc_datasets_context = create_mock_new_ds_context(2)
mock_new_dmc_ds, mock_new_dc_ds = new_datasets

# mock new ffmc dataset
new_datasets, mock_new_ffmc_datasets_context = create_mock_new_ds_context(1)
mock_new_ffmc_ds = new_datasets[0]

# mock gdal open
mocker.patch("osgeo.gdal.Open", return_value=create_mock_gdal_dataset())

Expand All @@ -85,14 +91,15 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
calculate_dc_spy = mocker.spy(daily_fwi_processor, "calculate_dc")
calculate_bui_spy = mocker.spy(daily_fwi_processor, "calculate_bui")
calculate_ffmc_spy = mocker.spy(daily_fwi_processor, "calculate_ffmc")
calculate_isi_spy = mocker.spy(daily_fwi_processor, "calculate_isi")

async with S3Client() as mock_s3_client:
# mock s3 client
mock_all_objects_exist = AsyncMock(return_value=True)
mocker.patch.object(mock_s3_client, "all_objects_exist", new=mock_all_objects_exist)
persist_raster_spy = mocker.patch.object(mock_s3_client, "persist_raster_data", return_value="test_key.tif")

await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context)
await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_datasets_context)

# Verify weather model keys and actual keys are checked for both days
assert mock_all_objects_exist.call_count == 4
Expand Down Expand Up @@ -134,6 +141,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DC),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.FFMC),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.BUI),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.ISI),
# second day, previous days' dc and dmc are looked up first
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DC),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DMC),
Expand All @@ -142,6 +150,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.DC),
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.FFMC),
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.BUI),
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.ISI),
]

# Verify weather inputs are warped to match dmc raster
Expand All @@ -166,19 +175,19 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
]

for dmc_calls in calculate_dmc_spy.call_args_list:
dmc_ds = dmc_calls[0][0]
dmc_ds = dmc_calls.args[0]
assert dmc_ds == mock_dmc_ds
wps_datasets = dmc_calls[0][1:4] # Extract dataset arguments
assert all(isinstance(ds, WPSDataset) for ds in wps_datasets)

for dc_calls in calculate_dc_spy.call_args_list:
dc_ds = dc_calls[0][0]
dc_ds = dc_calls.args[0]
assert dc_ds == mock_dc_ds
wps_datasets = dc_calls[0][1:4] # Extract dataset arguments
assert all(isinstance(ds, WPSDataset) for ds in wps_datasets)

for ffmc_calls in calculate_ffmc_spy.call_args_list:
ffmc_ds = ffmc_calls[0][0]
ffmc_ds = ffmc_calls.args[0]
assert ffmc_ds == mock_ffmc_ds
wps_datasets = ffmc_calls[0][1:4] # Extract dataset arguments
assert all(isinstance(ds, WPSDataset) for ds in wps_datasets)
Expand All @@ -188,8 +197,14 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
mocker.call(mock_new_dmc_ds, mock_new_dc_ds),
]

# 4 each day, new dmc, dc and bui rasters
assert persist_raster_spy.call_count == 8
for isi_calls in calculate_isi_spy.call_args_list:
ffmc_ds = isi_calls.args[0]
assert ffmc_ds == mock_new_ffmc_ds
wps_datasets = isi_calls.args # Extract dataset arguments
assert all(isinstance(ds, WPSDataset) for ds in wps_datasets)

# 5 each day, new dmc, dc, ffmc, bui, and isi rasters
assert persist_raster_spy.call_count == 10


@pytest.mark.parametrize(
Expand All @@ -208,19 +223,22 @@ async def test_no_weather_keys_exist(side_effect_1: bool, side_effect_2: bool, m

_, mock_input_dataset_context = create_mock_input_dataset_context()

_, mock_new_dmc_dc_datasets_context = create_mock_new_dmc_dc_context()
_, mock_new_dmc_dc_datasets_context = create_mock_new_ds_context(2)
_, mock_new_ffmc_dataset_context = create_mock_new_ds_context(1)

# calculation spies
calculate_dmc_spy = mocker.spy(daily_fwi_processor, "calculate_dmc")
calculate_dc_spy = mocker.spy(daily_fwi_processor, "calculate_dc")
calculate_bui_spy = mocker.spy(daily_fwi_processor, "calculate_bui")
calculate_ffmc_spy = mocker.spy(daily_fwi_processor, "calculate_ffmc")
calculate_isi_spy = mocker.spy(daily_fwi_processor, "calculate_isi")

fwi_processor = DailyFWIProcessor(TEST_DATETIME, 1, RasterKeyAddresser())

await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context)
await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_dataset_context)

calculate_dmc_spy.assert_not_called()
calculate_dc_spy.assert_not_called()
calculate_bui_spy.assert_not_called()
calculate_ffmc_spy.assert_not_called()
calculate_isi_spy.assert_not_called()
Loading

0 comments on commit a52b702

Please sign in to comment.