Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hourly FFMC #4153

Merged
merged 13 commits into from
Dec 5, 2024
2 changes: 1 addition & 1 deletion api/app/jobs/rdps_sfms.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@


DAYS_TO_RETAIN = 7
MAX_MODEL_RUN_HOUR = 37
MAX_MODEL_RUN_HOUR = 45
GRIB_LAYERS = {"temp": "TMP_TGL_2", "rh": "RH_TGL_2", "precip": "APCP_SFC_0", "wind_speed": "WIND_TGL_10"}


Expand Down
42 changes: 38 additions & 4 deletions api/app/jobs/sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

from app import configure_logging
from app.geospatial.wps_dataset import multi_wps_dataset_context
from app.jobs.rdps_sfms import MAX_MODEL_RUN_HOUR
from app.rocketchat_notifications import send_rocketchat_notification
from app.sfms.daily_fwi_processor import DailyFWIProcessor
from app.sfms.hourly_ffmc_processor import HourlyFFMCProcessor
from app.sfms.raster_addresser import RasterKeyAddresser
from app.utils.s3_client import S3Client
from app.utils.time import get_utc_now
Expand All @@ -18,12 +20,44 @@


class SFMSCalcJob:
async def calculate_daily_fwi(self, start_time: datetime):
async def calculate_fwi_rasters(self, start_time: datetime) -> None:
"""
Entry point for processing SFMS daily FWI rasters. To run from a specific date manually in openshift,
Entry point for processing SFMS daily FWI rasters and hFFMC rasters. To run from a specific date manually in openshift,
see openshift/sfms-calculate/README.md

:param start_time: The RDPS model run time to use for processing.
"""

await self.calculate_daily_fwi(start_time)
await self.calculate_hffmc(start_time)

async def calculate_hffmc(self, start_time: datetime) -> None:
"""
Entry point for calculating hourly FFMC rasters. Uses a 04:00 or 16:00 PST (12:00 or 24:00 UTC) hFFMC raster from SFMS as a base input.

:param start_time: The date time to use for processing. Calculations will begin at the most recent RDPS model run (00Z or 12Z).
"""
logger.info("Begin hFFMC raster calculations.")

Check warning on line 40 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L40

Added line #L40 was not covered by tests

start_exec = get_utc_now()

Check warning on line 42 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L42

Added line #L42 was not covered by tests

hffmc_processor = HourlyFFMCProcessor(start_time, RasterKeyAddresser())

Check warning on line 44 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L44

Added line #L44 was not covered by tests

async with S3Client() as s3_client:
await hffmc_processor.process(s3_client, multi_wps_dataset_context, MAX_MODEL_RUN_HOUR)

Check warning on line 47 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L46-L47

Added lines #L46 - L47 were not covered by tests

# calculate the execution time.
execution_time = get_utc_now() - start_exec
hours, remainder = divmod(execution_time.seconds, 3600)
minutes, seconds = divmod(remainder, 60)

Check warning on line 52 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L50-L52

Added lines #L50 - L52 were not covered by tests

logger.info(f"hFFMC raster processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds")

Check warning on line 54 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L54

Added line #L54 was not covered by tests

async def calculate_daily_fwi(self, start_time: datetime):
"""
Entry point for processing SFMS daily FWI rasters.
"""
logger.info(f"Begin BUI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward")
logger.info(f"Begin FWI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward")

Check warning on line 60 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L60

Added line #L60 was not covered by tests

start_exec = get_utc_now()

Expand Down Expand Up @@ -55,7 +89,7 @@
job = SFMSCalcJob()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(job.calculate_daily_fwi(start_time))
loop.run_until_complete(job.calculate_fwi_rasters(start_time))
except Exception as e:
logger.error("An exception occurred while processing SFMS raster calculations", exc_info=e)
rc_message = ":scream: Encountered an error while processing SFMS raster data."
Expand Down
5 changes: 3 additions & 2 deletions api/app/sfms/daily_fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async def process(

# Get and check existence of weather s3 keys
temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour)
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, precip_key)
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, wind_speed_key, precip_key)
if not weather_keys_exist:
logging.warning(f"Missing weather keys for {model_run_for_hour(self.start_datetime.hour):02} model run")
break
Expand Down Expand Up @@ -76,6 +76,7 @@ async def process(
precip_ds.close()
rh_ds.close()
temp_ds.close()
wind_speed_ds.close()
# Create latitude and month arrays needed for calculations
latitude_array = dmc_ds.generate_latitude_array()
month_array = np.full(latitude_array.shape, datetime_to_calculate_utc.month)
Expand Down Expand Up @@ -105,7 +106,7 @@ async def process(
)

# Create and store FFMC dataset
ffmc_values, ffmc_no_data_value = calculate_ffmc(ffmc_ds, warped_temp_ds, warped_rh_ds, warped_wind_speed_ds, warped_precip_ds)
ffmc_values, ffmc_no_data_value = calculate_ffmc(ffmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, warped_wind_speed_ds)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch, good find, I wonder if there's a test we could add to catch this.

Copy link
Collaborator Author

@dgboss dgboss Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add some sort of name or id property to WPSDataset objects and use this to assert that we're getting parameters in the right order?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that'd be worth it since we're passing so many WPSDataset's around

new_ffmc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.FFMC)
new_ffmc_path = await s3_client.persist_raster_data(
temp_dir,
Expand Down
2 changes: 1 addition & 1 deletion api/app/sfms/fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPS
wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0)

start = perf_counter()
ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, precip_array, wind_speed_array)
ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, wind_speed_array, precip_array)
dgboss marked this conversation as resolved.
Show resolved Hide resolved
logger.info("%f seconds to calculate vectorized ffmc", perf_counter() - start)

nodata_mask, nodata_value = previous_ffmc_ds.get_nodata_mask()
Expand Down
95 changes: 95 additions & 0 deletions api/app/sfms/hourly_ffmc_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
import os
import tempfile
from datetime import datetime, timedelta
from osgeo import gdal
from typing import List, cast

from app.weather_models.rdps_filename_marshaller import model_run_for_hour

from app.geospatial.wps_dataset import WPSDataset
from app.jobs.rdps_sfms import MAX_MODEL_RUN_HOUR
from app.sfms.daily_fwi_processor import MultiDatasetContext
from app.sfms.fwi_processor import calculate_ffmc
from app.sfms.raster_addresser import RasterKeyAddresser
from app.utils.geospatial import GDALResamplingMethod
from app.utils.s3 import set_s3_gdal_config
from app.utils.s3_client import S3Client


logger = logging.getLogger(__name__)


class HourlyFFMCProcessor:
"""
Class for calculating/generating forecasted hourly FFMC rasters.
"""

def __init__(self, start_datetime: datetime, addresser: RasterKeyAddresser):
self.start_datetime = start_datetime
self.addresser = addresser

async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, hours_to_process: int = MAX_MODEL_RUN_HOUR):
set_s3_gdal_config()

# 1 - Determine starting hFFMC (4am or 4pm) from SFMS and get key, confirm exists, if not, exit
# 2 - Determine what would be last key of run and check if exists, if exists, exit
# 3 - Get all weather variable keys and check if last one exists, if not, exit
# 4 - Use seed hFFMC plus:
# - rh, temp and wind speed from RDPS model run hour n = 000
# - computed precip at n = 0Z or 12Z
# 5 - Use newly calculated hFFMC to calculate next hFFMC using:
# - rh, temp and wind speed from RDPS model run hour n + 1
# - computed precip at n + 1
# hFFMC files from SFMS use PST datetimes
dgboss marked this conversation as resolved.
Show resolved Hide resolved

# Determine most recent RDPS model run
rdps_model_run_hour = model_run_for_hour(self.start_datetime.hour)
rdps_model_run_start = datetime(
year=self.start_datetime.year, month=self.start_datetime.month, day=self.start_datetime.day, hour=rdps_model_run_hour, tzinfo=self.start_datetime.tzinfo
)

# Determine key to the initial/seed hFFMC from SFMS and check if it exists. Initial hffmc will be a 04 or 16 hour hffmc from SFMS.
hffmc_key = self.addresser.get_uploaded_hffmc_key(rdps_model_run_start)
hffmc_key_exists = await s3_client.all_objects_exist(hffmc_key)
if not hffmc_key_exists:
logger.warning(f"Missing initial hFFMC raster from SFMS for date {self.start_datetime}. Missing key is {hffmc_key}.")
return

for hour in range(0, hours_to_process):
with tempfile.TemporaryDirectory() as temp_dir:
# Get and check existence of weather s3 keys
temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys_hffmc(rdps_model_run_start, hour)
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, wind_speed_key, precip_key)
if not weather_keys_exist:
logging.warning(f"Missing weather keys for model run: {rdps_model_run_start}")
break

# Prefix our S3 keys for access via gdal
temp_key, rh_key, wind_speed_key, precip_key, hffmc_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, wind_speed_key, precip_key, hffmc_key)
with input_dataset_context([temp_key, rh_key, wind_speed_key, precip_key, hffmc_key]) as input_datasets:
input_datasets = cast(List[WPSDataset], input_datasets) # Ensure correct type inference
temp_ds, rh_ds, wind_speed_ds, precip_ds, hffmc_ds = input_datasets
# Warp weather datasets to match hffmc
warped_temp_ds = temp_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(temp_key)}", GDALResamplingMethod.BILINEAR)
warped_rh_ds = rh_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(rh_key)}", GDALResamplingMethod.BILINEAR)
warped_wind_speed_ds = wind_speed_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(wind_speed_key)}", GDALResamplingMethod.BILINEAR)
warped_precip_ds = precip_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(precip_key)}", GDALResamplingMethod.BILINEAR)

# Create and store new hFFMC dataset
hffmc_values, hffmc_no_data_value = calculate_ffmc(hffmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, warped_wind_speed_ds)
new_hffmc_datetime = rdps_model_run_start + timedelta(hours=hour)
hffmc_key = self.addresser.get_calculated_hffmc_index_key(new_hffmc_datetime)
geotransform = hffmc_ds.as_gdal_ds().GetGeoTransform()
projection = hffmc_ds.as_gdal_ds().GetProjection()
hffmc_ds.close()
await s3_client.persist_raster_data(
temp_dir,
hffmc_key,
geotransform,
projection,
hffmc_values,
hffmc_no_data_value,
)
# Clear gdal virtual file system cache of S3 metadata in order to allow newly uploaded hffmc rasters to be opened immediately.
dgboss marked this conversation as resolved.
Show resolved Hide resolved
gdal.VSICurlClearCache()
65 changes: 62 additions & 3 deletions api/app/sfms/raster_addresser.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import enum
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from app import config
from app.utils.time import convert_utc_to_pdt
from app.weather_models import ModelEnum
from app.weather_models.rdps_filename_marshaller import compose_computed_precip_rdps_key, compose_rdps_key
from app.weather_models.rdps_filename_marshaller import compose_computed_precip_rdps_key, compose_rdps_key, compose_rdps_key_hffmc


class WeatherParameter(enum.Enum):
Expand Down Expand Up @@ -36,15 +37,16 @@ class RasterKeyAddresser:
def __init__(self):
self.sfms_calculated_prefix = "sfms/calculated"
self.s3_prefix = f"/vsis3/{config.get('OBJECT_STORE_BUCKET')}"
self.smfs_hourly_upload_prefix = "sfms/uploads/hourlies"
self.sfms_upload_prefix = "sfms/uploads/actual"
self.weather_model_prefix = f"weather_models/{ModelEnum.RDPS.lower()}"

def get_uploaded_index_key(self, datetime_utc: datetime, fwi_param: FWIParameter):
assert_all_utc(datetime_utc)
iso_date = datetime_utc.date().isoformat()

return f"{self.sfms_upload_prefix}/{iso_date}/{fwi_param.value}{iso_date.replace('-', '')}.tif"


def get_calculated_index_key(self, datetime_utc: datetime, fwi_param: FWIParameter):
"""
Generates the calculated fire weather index key that points to the associated raster artifact in the object store.
Expand Down Expand Up @@ -105,3 +107,60 @@ def gdal_prefix_keys(self, *keys):
:return: A tuple of all strings provided, prefixed with vsis3/{bucket}
"""
return tuple(f"{self.s3_prefix}/{key}" for key in keys)

def get_uploaded_hffmc_key(self, datetime_utc: datetime):
"""
Given the start time of an RDPS model run, return a key to the most recent hFFMC raster which will be
equivalent to RDPS model run start time minus one hour in PDT. Note that the hFFMC rasters are stored according
to PDT times. hFFMC keys will end with 04 or 16 for their hour.

:param datetime_utc: The RDPS model run start date and time.
:return: A key to the most recent hFFMC raster.
"""
assert_all_utc(datetime_utc)

# Convert utc into pdt and substract one hour to get hFFMC source raster time. sfms only produces hFFMC from Apr - Oct which is always PDT
datetime_pdt = convert_utc_to_pdt(datetime_utc) - timedelta(hours=1)
iso_date = datetime_pdt.date().isoformat()
return f"{self.smfs_hourly_upload_prefix}/{iso_date}/fine_fuel_moisture_code{iso_date.replace('-', '')}{datetime_pdt.hour:02d}.tif"

def get_weather_data_keys_hffmc(self, rdps_model_run_start: datetime, offset_hour):
"""
Gets temp, rh, wind speed and calculated accumulated precip for the specified RDPS model run start date and hour.

:param rdps_model_run_start: The RDPS model run start date and time.
:param offset_hour: The hour offset from the RDPS model run start hour.
:return: Keys to rasters in S3 storage for temp, rh, wind speed and calculated precip rasters.
"""
assert_all_utc(rdps_model_run_start)
non_precip_keys = tuple(self.get_model_data_key_hffmc(rdps_model_run_start, offset_hour, param) for param in WeatherParameter)
datetime_to_calculate_utc = rdps_model_run_start + timedelta(hours=offset_hour)
precip_key = self.get_calculated_precip_key(datetime_to_calculate_utc)
all_weather_data_keys = non_precip_keys + (precip_key,)
return all_weather_data_keys

def get_model_data_key_hffmc(self, rdps_model_run_start: datetime, offset_hour: int, weather_param: WeatherParameter):
"""
Gets a S3 key for the weather parameter of interest for the specified RDPS model run start date and time at the provided offset.

:param rdps_model_run_start: The RDPS model run start date and time.
:param offset_hour: The hour offset from the RDPS model run start hour.
:param weather_param: The weather parameter of interest (temp, rh, or wind speed).
:return: A key to a raster in S3 storage.
"""
assert_all_utc(rdps_model_run_start)
weather_model_date_prefix = f"{self.weather_model_prefix}/{rdps_model_run_start.date().isoformat()}/"
return os.path.join(weather_model_date_prefix, compose_rdps_key_hffmc(rdps_model_run_start, offset_hour, weather_param.value))

def get_calculated_hffmc_index_key(self, datetime_utc: datetime):
"""
Given a UTC datetime return a calculated key based on PDT time as hFFMC rasters are named according to PDT.

:param datetime_utc: A UTC datetime.
:return: An S3 key for hFFMC using PDT time.
"""
assert_all_utc(datetime_utc)
datetime_pdt = convert_utc_to_pdt(datetime_utc)
iso_date = datetime_pdt.date().isoformat()
weather_param_prefix = "fine_fuel_moisture_code"
return f"{self.sfms_calculated_prefix}/hourlies/{iso_date}/{weather_param_prefix}{iso_date.replace('-', '')}{datetime_pdt.hour:02d}.tif"
39 changes: 39 additions & 0 deletions api/app/tests/dataset_common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from contextlib import ExitStack, contextmanager
import numpy as np
from osgeo import osr, gdal
from typing import List
import uuid
from app.geospatial.wps_dataset import WPSDataset

Expand Down Expand Up @@ -47,3 +49,40 @@ def create_mock_gdal_dataset():
def create_mock_wps_dataset():
mock_ds = create_mock_gdal_dataset()
return WPSDataset(ds=mock_ds, ds_path=None)

def create_mock_wps_datasets(num: int) -> List[WPSDataset]:
return [create_mock_wps_dataset() for _ in range(num)]


def create_mock_input_dataset_context(num: int):
input_datasets = create_mock_wps_datasets(num)

@contextmanager
def mock_input_dataset_context(_: List[str]):
try:
# Enter each dataset's context and yield the list of instances
with ExitStack() as stack:
yield [stack.enter_context(ds) for ds in input_datasets]
finally:
# Close all datasets to ensure cleanup
for ds in input_datasets:
ds.close()

return input_datasets, mock_input_dataset_context


def create_mock_new_ds_context(number_of_datasets: int):
new_datasets = create_mock_wps_datasets(number_of_datasets)

@contextmanager
def mock_new_datasets_context(_: List[str]):
try:
# Enter each dataset's context and yield the list of instances
with ExitStack() as stack:
yield [stack.enter_context(ds) for ds in new_datasets]
finally:
# Close all datasets to ensure cleanup
for ds in new_datasets:
ds.close()

return new_datasets, mock_new_datasets_context
2 changes: 2 additions & 0 deletions api/app/tests/jobs/test_sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ async def mock_job_error():

def test_sfms_calc_job_cli_arg(monkeypatch, mocker: MockerFixture):
daily_fwi_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_daily_fwi", return_value=None)
hffmc_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_hffmc", return_value=None)

test_datetime = "2024-10-10 5"
monkeypatch.setattr("sys.argv", ["sfms_calculations.py", test_datetime])

sfms_calculations.main()

daily_fwi_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc))
hffmc_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc))


@pytest.mark.anyio
Expand Down
Loading
Loading