Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SFMS: Daily FFMC #4081

Merged
merged 16 commits into from
Nov 13, 2024
3 changes: 2 additions & 1 deletion api/app/auto_spatial_advisory/sfms.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from cffdrs import bui, dc, dmc
from cffdrs import bui, dc, dmc, ffmc
from numba import vectorize

vectorized_bui = vectorize(bui)
vectorized_dc = vectorize(dc)
vectorized_dmc = vectorize(dmc)
vectorized_ffmc = vectorize(ffmc)
14 changes: 7 additions & 7 deletions api/app/jobs/sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from app import configure_logging
from app.rocketchat_notifications import send_rocketchat_notification
from app.sfms.date_range_processor import BUIDateRangeProcessor
from app.sfms.daily_fwi_processor import DailyFWIProcessor
from app.sfms.raster_addresser import RasterKeyAddresser
from app.utils.s3_client import S3Client
from app.utils.time import get_utc_now
Expand All @@ -19,7 +19,7 @@


class SFMSCalcJob:
async def calculate_bui(self, start_time: datetime):
async def calculate_daily_fwi(self, start_time: datetime):
"""
Entry point for processing SFMS DMC/DC/BUI rasters. To run from a specific date manually in openshift,
see openshift/sfms-calculate/README.md
Expand All @@ -28,17 +28,17 @@

start_exec = get_utc_now()

bui_processor = BUIDateRangeProcessor(start_time, DAYS_TO_CALCULATE, RasterKeyAddresser())
daily_processor = DailyFWIProcessor(start_time, DAYS_TO_CALCULATE, RasterKeyAddresser())

Check warning on line 31 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L31

Added line #L31 was not covered by tests

async with S3Client() as s3_client:
await bui_processor.process_bui(s3_client, multi_wps_dataset_context, multi_wps_dataset_context)
await daily_processor.process(s3_client, multi_wps_dataset_context, multi_wps_dataset_context)

Check warning on line 34 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L34

Added line #L34 was not covered by tests

# calculate the execution time.
execution_time = get_utc_now() - start_exec
hours, remainder = divmod(execution_time.seconds, 3600)
minutes, seconds = divmod(remainder, 60)

logger.info(f"BUI processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds")
logger.info(f"Daily FWI processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds")

Check warning on line 41 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L41

Added line #L41 was not covered by tests


def main():
Expand All @@ -56,9 +56,9 @@
job = SFMSCalcJob()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(job.calculate_bui(start_time))
loop.run_until_complete(job.calculate_daily_fwi(start_time))
except Exception as e:
logger.error("An exception occurred while processing DMC/DC/BUI raster calculations", exc_info=e)
logger.error("An exception occurred while processing SFMS raster calculations", exc_info=e)
rc_message = ":scream: Encountered an error while processing SFMS raster data."
send_rocketchat_notification(rc_message, e)
sys.exit(os.EX_SOFTWARE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from app.geospatial.wps_dataset import WPSDataset
from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc
from app.utils.geospatial import GDALResamplingMethod
from app.utils.s3 import set_s3_gdal_config
from app.utils.s3_client import S3Client
Expand All @@ -20,48 +20,49 @@
MultiDatasetContext = Callable[[List[str]], Iterator[List["WPSDataset"]]]


class BUIDateRangeProcessor:
class DailyFWIProcessor:
"""
Class for calculating/generating forecasted DMC/DC/BUI rasters for a date range
Class for calculating/generating forecasted daily FWI rasters for a date range
"""

def __init__(self, start_datetime: datetime, days: int, addresser: RasterKeyAddresser):
self.start_datetime = start_datetime
self.days = days
self.addresser = addresser

async def process_bui(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext):
async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext):
set_s3_gdal_config()

for day in range(self.days):
datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour = self._get_calculate_dates(day)
logger.info(f"Calculating DMC/DC/BUI for {datetime_to_calculate_utc.isoformat()}")

# Get and check existence of weather s3 keys
temp_key, rh_key, _, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour)
temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour)
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, precip_key)
if not weather_keys_exist:
logging.warning(f"No weather keys found for {model_run_for_hour(self.start_datetime.hour):02} model run")
break

# get and check existence of fwi s3 keys
dc_key, dmc_key = self._get_previous_fwi_keys(day, previous_fwi_datetime)
dc_key, dmc_key, ffmc_key = self._get_previous_fwi_keys(day, previous_fwi_datetime)
fwi_keys_exist = await s3_client.all_objects_exist(dc_key, dmc_key)
if not fwi_keys_exist:
logging.warning(f"No previous DMC/DC keys found for {previous_fwi_datetime.date().isoformat()}")
break

temp_key, rh_key, precip_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, precip_key)
temp_key, rh_key, wind_speed_key, precip_key, ffmc_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, wind_speed_key, precip_key, ffmc_key)
dc_key, dmc_key = self.addresser.gdal_prefix_keys(dc_key, dmc_key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ffmc might need to be included here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch!


with tempfile.TemporaryDirectory() as temp_dir:
with input_dataset_context([temp_key, rh_key, precip_key, dc_key, dmc_key]) as input_datasets:
with input_dataset_context([temp_key, rh_key, wind_speed_key, precip_key, dc_key, dmc_key, ffmc_key]) as input_datasets:
input_datasets = cast(List[WPSDataset], input_datasets) # Ensure correct type inference
temp_ds, rh_ds, precip_ds, dc_ds, dmc_ds = input_datasets
temp_ds, rh_ds, wind_speed_ds, precip_ds, dc_ds, dmc_ds, ffmc_ds = input_datasets

# Warp weather datasets to match fwi
warped_temp_ds = temp_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(temp_key)}", GDALResamplingMethod.BILINEAR)
warped_rh_ds = rh_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(rh_key)}", GDALResamplingMethod.BILINEAR)
warped_wind_speed_ds = wind_speed_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(wind_speed_key)}", GDALResamplingMethod.BILINEAR)
warped_precip_ds = precip_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(precip_key)}", GDALResamplingMethod.BILINEAR)

# close unneeded datasets to reduce memory usage
Expand Down Expand Up @@ -96,6 +97,18 @@ async def process_bui(self, s3_client: S3Client, input_dataset_context: MultiDat
dc_nodata_value,
)

# Create and store FFMC dataset
ffmc_values, ffmc_no_data_value = calculate_ffmc(ffmc_ds, warped_temp_ds, warped_rh_ds, warped_wind_speed_ds, warped_precip_ds)
new_ffmc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.FFMC)
await s3_client.persist_raster_data(
temp_dir,
new_ffmc_key,
dc_ds.as_gdal_ds().GetGeoTransform(),
dc_ds.as_gdal_ds().GetProjection(),
ffmc_values,
ffmc_no_data_value,
)

# Open new DMC and DC datasets and calculate BUI
new_bui_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.BUI)
with new_dmc_dc_context([new_dmc_path, new_dc_path]) as new_dmc_dc_datasets:
Expand Down Expand Up @@ -137,7 +150,10 @@ def _get_previous_fwi_keys(self, day_to_calculate: int, previous_fwi_datetime: d
if day_to_calculate == 0: # if we're running the first day of the calculation, use previously uploaded actuals
dc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.DC)
dmc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.DMC)
ffmc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.FFMC)
else: # otherwise use the last calculated key
dc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.DC)
dmc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.DMC)
return dc_key, dmc_key
ffmc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.FFMC)

return dc_key, dmc_key, ffmc_key
20 changes: 19 additions & 1 deletion api/app/sfms/fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np

from app.geospatial.wps_dataset import WPSDataset
from app.auto_spatial_advisory.sfms import vectorized_dmc, vectorized_dc, vectorized_bui
from app.auto_spatial_advisory.sfms import vectorized_dmc, vectorized_dc, vectorized_bui, vectorized_ffmc

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,3 +55,21 @@ def calculate_bui(dmc_ds: WPSDataset, dc_ds: WPSDataset):
bui_values[nodata_mask] = nodata_value

return bui_values, nodata_value


def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPSDataset, precip_ds: WPSDataset, wind_speed_ds: WPSDataset):
previous_ffmc_array, _ = previous_ffmc_ds.replace_nodata_with(0)
temp_array, _ = temp_ds.replace_nodata_with(0)
rh_array, _ = rh_ds.replace_nodata_with(0)
precip_array, _ = precip_ds.replace_nodata_with(0)
wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0)

start = perf_counter()
ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, precip_array, wind_speed_array)
logger.info("%f seconds to calculate vectorized ffmc", perf_counter() - start)

nodata_mask, nodata_value = previous_ffmc_ds.get_nodata_mask()
if nodata_mask is not None:
ffmc_values[nodata_mask] = nodata_value

return ffmc_values, nodata_value
2 changes: 1 addition & 1 deletion api/app/sfms/raster_addresser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import enum
from datetime import datetime, timezone, timedelta
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from app import config
from app.weather_models import ModelEnum
Expand Down
7 changes: 3 additions & 4 deletions api/app/tests/jobs/test_sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test_sfms_calc_job_fail_default(monkeypatch, mocker: MockerFixture):
async def mock_job_error():
raise OSError("Error")

monkeypatch.setattr(SFMSCalcJob, "calculate_bui", mock_job_error)
monkeypatch.setattr(SFMSCalcJob, "calculate_daily_fwi", mock_job_error)

monkeypatch.setattr("sys.argv", ["sfms_calculations.py"])

Expand All @@ -27,15 +27,14 @@ async def mock_job_error():


def test_sfms_calc_job_cli_arg(monkeypatch, mocker: MockerFixture):
calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_bui", return_value=None)
daily_fwi_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_daily_fwi", return_value=None)

test_datetime = "2024-10-10 5"
monkeypatch.setattr("sys.argv", ["sfms_calculations.py", test_datetime])

sfms_calculations.main()

called_args, _ = calc_spy.call_args
assert called_args[0] == datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc)
daily_fwi_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc))


@pytest.mark.anyio
Expand Down
Loading
Loading