-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hourly FFMC #4153
Hourly FFMC #4153
Changes from 8 commits
7628b12
5239ae2
7454cbf
bcd5ec0
c1b74d7
e2b9ad1
0e7bda3
bdf469a
51059b1
c348767
c0d3524
dda121e
972be14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
import logging | ||
import os | ||
import tempfile | ||
from datetime import datetime, timedelta | ||
from osgeo import gdal | ||
from typing import List, cast | ||
|
||
from app.weather_models.rdps_filename_marshaller import model_run_for_hour | ||
|
||
from app.geospatial.wps_dataset import WPSDataset | ||
from app.jobs.rdps_sfms import MAX_MODEL_RUN_HOUR | ||
from app.sfms.daily_fwi_processor import MultiDatasetContext | ||
from app.sfms.fwi_processor import calculate_ffmc | ||
from app.sfms.raster_addresser import RasterKeyAddresser | ||
from app.utils.geospatial import GDALResamplingMethod | ||
from app.utils.s3 import set_s3_gdal_config | ||
from app.utils.s3_client import S3Client | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class HourlyFFMCProcessor: | ||
""" | ||
Class for calculating/generating forecasted hourly FFMC rasters. | ||
""" | ||
|
||
def __init__(self, start_datetime: datetime, addresser: RasterKeyAddresser): | ||
self.start_datetime = start_datetime | ||
self.addresser = addresser | ||
|
||
async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, hours_to_process: int = MAX_MODEL_RUN_HOUR): | ||
set_s3_gdal_config() | ||
|
||
# hFFMC general process | ||
# 1. cron job kicks off the job and we use current UTC time as start time | ||
dgboss marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# 2. Create HourlyFFMCProcessor with the start time and begin processing | ||
# 3. Use job start time to determine most recent RDPS model run start time (date and 00z or 12z) | ||
# 4. Use most recent RDPS model run start time to determine most recent hFFMC key to use as source which is always one hour before the RDPS start time (04 or 16 PDT) | ||
# 5. Start calculating hFFMC from model run hour 0 through to 47. Save the calculated hFFMCs to S3. Most recently calculated hFFMC is used as input to the next hour's hFFMC calculation. | ||
# 6. hFFMC rasters are saved to S3 with PDT based keys. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we using PDT for all our other S3 keys? Is there anything preventing/hindering us from using UTC? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The hourly SFMS rasters we receive are named with PDT times so I followed the existing pattern. I'm totally open to naming via UTC timestamp instead. My brain definitely prefers to work in UTC as it never changes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, and our other calculated SFMS S3 rasters/keys are daily in nature, so need to consider UTC vs PDT in the naming convention. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right SFMS gives us PDT rasters. Either way works but I think adding the timezone to the naming convention would be helpful. |
||
|
||
# Determine most recent RDPS model run | ||
rdps_model_run_hour = model_run_for_hour(self.start_datetime.hour) | ||
rdps_model_run_start = datetime( | ||
year=self.start_datetime.year, month=self.start_datetime.month, day=self.start_datetime.day, hour=rdps_model_run_hour, tzinfo=self.start_datetime.tzinfo | ||
) | ||
|
||
# Determine key to the initial/seed hFFMC from SFMS and check if it exists. Initial hffmc will be a 04 or 16 hour hffmc from SFMS. | ||
hffmc_key = self.addresser.get_uploaded_hffmc_key(rdps_model_run_start) | ||
hffmc_key_exists = await s3_client.all_objects_exist(hffmc_key) | ||
if not hffmc_key_exists: | ||
logger.warning(f"Missing initial hFFMC raster from SFMS for date {self.start_datetime}. Missing key is {hffmc_key}.") | ||
return | ||
|
||
for hour in range(0, hours_to_process): | ||
with tempfile.TemporaryDirectory() as temp_dir: | ||
# Get and check existence of weather s3 keys | ||
temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys_hffmc(rdps_model_run_start, hour) | ||
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, wind_speed_key, precip_key) | ||
if not weather_keys_exist: | ||
logging.warning(f"Missing weather keys for model run: {rdps_model_run_start}") | ||
break | ||
|
||
# Prefix our S3 keys for access via gdal | ||
temp_key, rh_key, wind_speed_key, precip_key, hffmc_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, wind_speed_key, precip_key, hffmc_key) | ||
with input_dataset_context([temp_key, rh_key, wind_speed_key, precip_key, hffmc_key]) as input_datasets: | ||
input_datasets = cast(List[WPSDataset], input_datasets) # Ensure correct type inference | ||
temp_ds, rh_ds, wind_speed_ds, precip_ds, hffmc_ds = input_datasets | ||
# Warp weather datasets to match hffmc | ||
warped_temp_ds = temp_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(temp_key)}", GDALResamplingMethod.BILINEAR) | ||
warped_rh_ds = rh_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(rh_key)}", GDALResamplingMethod.BILINEAR) | ||
warped_wind_speed_ds = wind_speed_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(wind_speed_key)}", GDALResamplingMethod.BILINEAR) | ||
warped_precip_ds = precip_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(precip_key)}", GDALResamplingMethod.BILINEAR) | ||
|
||
# Create and store new hFFMC dataset | ||
hffmc_values, hffmc_no_data_value = calculate_ffmc(hffmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, warped_wind_speed_ds) | ||
new_hffmc_datetime = rdps_model_run_start + timedelta(hours=hour) | ||
hffmc_key = self.addresser.get_calculated_hffmc_index_key(new_hffmc_datetime) | ||
geotransform = hffmc_ds.as_gdal_ds().GetGeoTransform() | ||
projection = hffmc_ds.as_gdal_ds().GetProjection() | ||
hffmc_ds.close() | ||
await s3_client.persist_raster_data( | ||
temp_dir, | ||
hffmc_key, | ||
geotransform, | ||
projection, | ||
hffmc_values, | ||
hffmc_no_data_value, | ||
) | ||
# Clear gdal virtual file system cache of S3 metadata in order to allow newly uploaded hffmc rasters to be opened immediately. | ||
dgboss marked this conversation as resolved.
Show resolved
Hide resolved
|
||
gdal.VSICurlClearCache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ouch, good find, I wonder if there's a test we could add to catch this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add some sort of name or id property to
WPSDataset
objects and use this to assert that we're getting parameters in the right order?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think that'd be worth it since we're passing so many
WPSDataset
's around