Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SFMS: Daily FFMC #4081

Merged
merged 16 commits into from
Nov 13, 2024
3 changes: 2 additions & 1 deletion api/app/auto_spatial_advisory/sfms.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from cffdrs import bui, dc, dmc
from cffdrs import bui, dc, dmc, ffmc
from numba import vectorize

vectorized_bui = vectorize(bui)
vectorized_dc = vectorize(dc)
vectorized_dmc = vectorize(dmc)
vectorized_ffmc = vectorize(ffmc)
25 changes: 24 additions & 1 deletion api/app/jobs/sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from app import configure_logging
from app.rocketchat_notifications import send_rocketchat_notification
from app.sfms.daily_ffmc_processor import DailyFFMCProcessor
from app.sfms.date_range_processor import BUIDateRangeProcessor
from app.sfms.raster_addresser import RasterKeyAddresser
from app.utils.s3_client import S3Client
Expand Down Expand Up @@ -40,6 +41,27 @@

logger.info(f"BUI processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds")

async def calculate_daily_ffmc(self, start_time: datetime):
"""
Entry point for processing SFMS daily FFMC rasters. To run from a specific date manually in openshift,
see openshift/sfms-calculate/README.md
"""
logger.info("Begin daily FFMC raster calculations -- calculates 2 days forward by default")

Check warning on line 49 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L49

Added line #L49 was not covered by tests

start_exec = get_utc_now()

Check warning on line 51 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L51

Added line #L51 was not covered by tests

daily_ffmc_processor = DailyFFMCProcessor(start_time, RasterKeyAddresser())

Check warning on line 53 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L53

Added line #L53 was not covered by tests

async with S3Client() as s3_client:
await daily_ffmc_processor.process_daily_ffmc(s3_client, multi_wps_dataset_context)

Check warning on line 56 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L55-L56

Added lines #L55 - L56 were not covered by tests

# calculate the execution time.
execution_time = get_utc_now() - start_exec
hours, remainder = divmod(execution_time.seconds, 3600)
minutes, seconds = divmod(remainder, 60)

Check warning on line 61 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L59-L61

Added lines #L59 - L61 were not covered by tests

logger.info(f"Daily FFMC processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds")

Check warning on line 63 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L63

Added line #L63 was not covered by tests


def main():
if len(sys.argv) > 1:
Expand All @@ -57,8 +79,9 @@
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(job.calculate_bui(start_time))
loop.run_until_complete(job.calculate_daily_ffmc(start_time))
except Exception as e:
logger.error("An exception occurred while processing DMC/DC/BUI raster calculations", exc_info=e)
logger.error("An exception occurred while processing SFMS raster calculations", exc_info=e)
rc_message = ":scream: Encountered an error while processing SFMS raster data."
send_rocketchat_notification(rc_message, e)
sys.exit(os.EX_SOFTWARE)
Expand Down
69 changes: 69 additions & 0 deletions api/app/sfms/daily_ffmc_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
import logging
import tempfile
from typing import Callable, List, Iterator, cast
from datetime import datetime, timedelta
from app.auto_spatial_advisory.run_type import RunType
from app.geospatial.wps_dataset import WPSDataset
from app.sfms.fwi_processor import calculate_ffmc
from app.sfms.raster_addresser import RasterKeyAddresser, WeatherParameter
from app.utils.geospatial import GDALResamplingMethod
from app.utils.s3_client import S3Client

logger = logging.getLogger(__name__)

DAILY_FFMC_DAYS = 2

MultiDatasetContext = Callable[[List[str]], Iterator[List["WPSDataset"]]]


class DailyFFMCProcessor:
"""
Class for calculating/generating forecasted daily FFMC rasters for a date range
"""

def __init__(self, start_datetime: datetime, addresser: RasterKeyAddresser):
self.start_datetime = start_datetime
self.addresser = addresser

async def process_daily_ffmc(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext):
for day in range(DAILY_FFMC_DAYS):
current_day = self.start_datetime + timedelta(days=day)
yesterday = current_day - timedelta(days=1)
yesterday_ffmc_key = self.addresser.get_daily_ffmc(yesterday, RunType.ACTUAL)
if await s3_client.all_objects_exist(yesterday_ffmc_key) == False:
yesterday_ffmc_key = self.addresser.get_calculated_daily_ffmc(yesterday)

if await s3_client.all_objects_exist(yesterday_ffmc_key) == False:
logging.warning(f"No ffmc objects found for key: {yesterday_ffmc_key}")
return

temp_forecast_key = self.addresser.get_daily_model_data_key(current_day, RunType.FORECAST, WeatherParameter.TEMP)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we want to use weather_model data we need to keep track of a prediction_hour because we get model data for so many different hours, but maybe there's a better way to do it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opted to use all the daily SFMS model data since that's where we get the previous FFMC raster. Not sure if that's correct though. I guess that conflicts with our BUI calculations but the seed FFMC raster will either way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, I could be wrong but I think the intention was to step forward in these calculations with weather model data (RDPS), since the weather data coming from SFMS is the coarsely interpolated forecasts from BC weather stations

rh_forecast_key = self.addresser.get_daily_model_data_key(current_day, RunType.FORECAST, WeatherParameter.RH)
precip_forecast_key = self.addresser.get_daily_model_data_key(current_day, RunType.FORECAST, WeatherParameter.PRECIP)
wind_speed_forecast_key = self.addresser.get_daily_model_data_key(current_day, RunType.FORECAST, WeatherParameter.WIND_SPEED)

with tempfile.TemporaryDirectory() as temp_dir:
with input_dataset_context([yesterday_ffmc_key, temp_forecast_key, rh_forecast_key, precip_forecast_key, wind_speed_forecast_key]) as input_datasets:
input_datasets = cast(List[WPSDataset], input_datasets) # Ensure correct type inference
yesterday_ffmc_ds, temp_forecast_ds, rh_forecast_ds, precip_forecast_ds, wind_speed_forecast_ds = input_datasets

# Warp weather datasets to match ffmc
warped_temp_ds = temp_forecast_ds.warp_to_match(yesterday_ffmc_ds, f"{temp_dir}/{os.path.basename(temp_forecast_key)}", GDALResamplingMethod.BILINEAR)
warped_rh_ds = rh_forecast_ds.warp_to_match(yesterday_ffmc_ds, f"{temp_dir}/{os.path.basename(rh_forecast_key)}", GDALResamplingMethod.BILINEAR)
warped_precip_ds = precip_forecast_ds.warp_to_match(yesterday_ffmc_ds, f"{temp_dir}/{os.path.basename(precip_forecast_key)}", GDALResamplingMethod.BILINEAR)
warped_wind_speed_ds = wind_speed_forecast_ds.warp_to_match(
yesterday_ffmc_ds, f"{temp_dir}/{os.path.basename(wind_speed_forecast_key)}", GDALResamplingMethod.BILINEAR
)

temp_forecast_ds.close()
rh_forecast_ds.close()
precip_forecast_ds.close()
wind_speed_forecast_ds.close()

ffmc_values, ffmc_no_data_value = calculate_ffmc(yesterday_ffmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, warped_wind_speed_ds)

today_ffmc_key = self.addresser.get_calculated_daily_ffmc(current_day)
s3_client.persist_raster_data(
temp_dir, today_ffmc_key, yesterday_ffmc_ds.as_gdal_ds().GetGeoTransform(), yesterday_ffmc_ds.as_gdal_ds().GetProjection(), ffmc_values, ffmc_no_data_value
)
20 changes: 19 additions & 1 deletion api/app/sfms/fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np

from app.geospatial.wps_dataset import WPSDataset
from app.auto_spatial_advisory.sfms import vectorized_dmc, vectorized_dc, vectorized_bui
from app.auto_spatial_advisory.sfms import vectorized_dmc, vectorized_dc, vectorized_bui, vectorized_ffmc

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,3 +55,21 @@ def calculate_bui(dmc_ds: WPSDataset, dc_ds: WPSDataset):
bui_values[nodata_mask] = nodata_value

return bui_values, nodata_value


def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPSDataset, precip_ds: WPSDataset, wind_speed_ds: WPSDataset):
previous_ffmc_array, _ = previous_ffmc_ds.replace_nodata_with(0)
temp_array, _ = temp_ds.replace_nodata_with(0)
rh_array, _ = rh_ds.replace_nodata_with(0)
precip_array, _ = precip_ds.replace_nodata_with(0)
wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0)

start = perf_counter()
ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, precip_array, wind_speed_array)
logger.info("%f seconds to calculate vectorized ffmc", perf_counter() - start)

nodata_mask, nodata_value = previous_ffmc_ds.get_nodata_mask()
if nodata_mask is not None:
ffmc_values[nodata_mask] = nodata_value

return ffmc_values, nodata_value
32 changes: 30 additions & 2 deletions api/app/sfms/raster_addresser.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import os
import enum
from datetime import datetime, timezone, timedelta
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from app import config
from app.db.models.auto_spatial_advisory import RunTypeEnum
from app.weather_models import ModelEnum
from app.weather_models.rdps_filename_marshaller import compose_computed_precip_rdps_key, compose_rdps_key

Expand All @@ -11,6 +12,11 @@ class WeatherParameter(enum.Enum):
TEMP = "temp"
RH = "rh"
WIND_SPEED = "wind_speed"
PRECIP = "precipitation"

@classmethod
def non_precip(cls):
return [cls.TEMP, cls.RH, cls.WIND_SPEED]


class FWIParameter(enum.Enum):
Expand Down Expand Up @@ -69,6 +75,18 @@ def get_model_data_key(self, start_time_utc: datetime, prediction_hour: int, wea
weather_model_date_prefix = f"{self.weather_model_prefix}/{start_time_utc.date().isoformat()}/"
return os.path.join(weather_model_date_prefix, compose_rdps_key(start_time_utc, start_time_utc.hour, prediction_hour, weather_param.value))

def get_daily_model_data_key(self, timestamp: datetime, run_type: RunTypeEnum, weather_param: WeatherParameter):
"""
Generates the model data key that points to the associated raster artifact in the object store.
The model is always assumed to be RDPS.

:param timestamp: UTC date time when the model run started
:param prediction_hour: the prediction hour offset from the start time
"""
assert_all_utc(timestamp)
iso_date = timestamp.date().isoformat()
return f"sfms/uploads/{run_type.value}/{iso_date}/{weather_param.value}{iso_date.replace('-', '')}.tif"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to get RDPS weather model data? get_model_data_key does that, but it's stored in a different place in our s3 bucket

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def get_calculated_precip_key(self, datetime_to_calculate_utc: datetime):
"""
Generates the calculated precip key that points to the associated raster artifact in the object store.
Expand All @@ -81,6 +99,16 @@ def get_calculated_precip_key(self, datetime_to_calculate_utc: datetime):
calculated_weather_prefix = f"{self.weather_model_prefix}/{datetime_to_calculate_utc.date().isoformat()}/"
return os.path.join(calculated_weather_prefix, compose_computed_precip_rdps_key(datetime_to_calculate_utc))

def get_daily_ffmc(self, timestamp: datetime, run_type: RunTypeEnum):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using get_uploaded_index_key for this part, maybe makes sense to split them out though, whatever you think works best

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_uploaded_index_key is based on an enum that contains only the abbreviated FWI names, whereas SFMS gives us the full FWI name for FFMC, so I opted for a specific function.

assert_all_utc(timestamp)
iso_date = timestamp.date().isoformat()
return f"sfms/uploads/{run_type.value}/{iso_date}/fine_fuel_moisture_code{iso_date.replace('-', '')}.tif"

def get_calculated_daily_ffmc(self, timestamp: datetime):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using the generic get_calculated_index_key for this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert_all_utc(timestamp)
iso_date = timestamp.date().isoformat()
return f"sfms/uploads/calculated/{iso_date}/fine_fuel_moisture_code{iso_date.replace('-', '')}.tif"

def get_weather_data_keys(self, start_time_utc: datetime, datetime_to_calculate_utc: datetime, prediction_hour: int):
"""
Generates all model data keys that point to their associated raster artifacts in the object store.
Expand All @@ -91,7 +119,7 @@ def get_weather_data_keys(self, start_time_utc: datetime, datetime_to_calculate_
:return: temp, rh, wind speed and precip model data key
"""
assert_all_utc(start_time_utc, datetime_to_calculate_utc)
non_precip_keys = tuple([self.get_model_data_key(start_time_utc, prediction_hour, param) for param in WeatherParameter])
non_precip_keys = tuple([self.get_model_data_key(start_time_utc, prediction_hour, param) for param in WeatherParameter.non_precip()])
precip_key = self.get_calculated_precip_key(datetime_to_calculate_utc)
all_weather_data_keys = non_precip_keys + (precip_key,)

Expand Down
15 changes: 10 additions & 5 deletions api/app/tests/jobs/test_sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
from app.jobs.sfms_calculations import SFMSCalcJob


def test_sfms_calc_job_fail_default(monkeypatch, mocker: MockerFixture):
@pytest.mark.parametrize(
"raster_function",
[("calculate_bui"), ("calculate_daily_ffmc")],
)
def test_sfms_calc_job_fail_default(raster_function, monkeypatch, mocker: MockerFixture):
async def mock_job_error():
raise OSError("Error")

monkeypatch.setattr(SFMSCalcJob, "calculate_bui", mock_job_error)
monkeypatch.setattr(SFMSCalcJob, raster_function, mock_job_error)

monkeypatch.setattr("sys.argv", ["sfms_calculations.py"])

Expand All @@ -27,15 +31,16 @@ async def mock_job_error():


def test_sfms_calc_job_cli_arg(monkeypatch, mocker: MockerFixture):
calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_bui", return_value=None)
bui_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_bui", return_value=None)
daily_ffmc_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_daily_ffmc", return_value=None)

test_datetime = "2024-10-10 5"
monkeypatch.setattr("sys.argv", ["sfms_calculations.py", test_datetime])

sfms_calculations.main()

called_args, _ = calc_spy.call_args
assert called_args[0] == datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc)
bui_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc))
daily_ffmc_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc))


@pytest.mark.anyio
Expand Down
Loading
Loading