diff --git a/api/app/db/crud/weather_models.py b/api/app/db/crud/weather_models.py index 91db61877..a46540581 100644 --- a/api/app/db/crud/weather_models.py +++ b/api/app/db/crud/weather_models.py @@ -1,5 +1,5 @@ -""" CRUD operations for management of weather model data. -""" +"""CRUD operations for management of weather model data.""" + import logging import datetime from typing import List, Union @@ -13,7 +13,6 @@ PredictionModelRunTimestamp, ModelRunPrediction, WeatherStationModelPrediction, - MoreCast2MaterializedView, SavedModelRunForSFMSUrl, ModelRunForSFMS, ) @@ -22,142 +21,112 @@ logger = logging.getLogger(__name__) -def get_prediction_run(session: Session, prediction_model_id: int, - prediction_run_timestamp: datetime.datetime) -> PredictionModelRunTimestamp: - """ load the model run from the database (.e.g. for 2020 07 07 12h00). """ - logger.info('get prediction run for %s', prediction_run_timestamp) - return session.query(PredictionModelRunTimestamp).\ - filter(PredictionModelRunTimestamp.prediction_model_id == prediction_model_id).\ - filter(PredictionModelRunTimestamp.prediction_run_timestamp == - prediction_run_timestamp).first() +def get_prediction_run(session: Session, prediction_model_id: int, prediction_run_timestamp: datetime.datetime) -> PredictionModelRunTimestamp: + """load the model run from the database (.e.g. for 2020 07 07 12h00).""" + logger.info("get prediction run for %s", prediction_run_timestamp) + return ( + session.query(PredictionModelRunTimestamp) + .filter(PredictionModelRunTimestamp.prediction_model_id == prediction_model_id) + .filter(PredictionModelRunTimestamp.prediction_run_timestamp == prediction_run_timestamp) + .first() + ) -def create_prediction_run( - session: Session, - prediction_model_id: int, - prediction_run_timestamp: datetime.datetime, - complete: bool, - interpolated: bool) -> PredictionModelRunTimestamp: - """ Create a model prediction run for a particular model. - """ +def create_prediction_run(session: Session, prediction_model_id: int, prediction_run_timestamp: datetime.datetime, complete: bool, interpolated: bool) -> PredictionModelRunTimestamp: + """Create a model prediction run for a particular model.""" prediction_run = PredictionModelRunTimestamp( - prediction_model_id=prediction_model_id, - prediction_run_timestamp=prediction_run_timestamp, - complete=complete, - interpolated=interpolated) + prediction_model_id=prediction_model_id, prediction_run_timestamp=prediction_run_timestamp, complete=complete, interpolated=interpolated + ) session.add(prediction_run) session.commit() return prediction_run def update_prediction_run(session: Session, prediction_run: PredictionModelRunTimestamp): - """ Update a PredictionModelRunTimestamp record """ + """Update a PredictionModelRunTimestamp record""" session.add(prediction_run) session.commit() -def get_or_create_prediction_run(session, prediction_model: PredictionModel, - prediction_run_timestamp: datetime.datetime) -> PredictionModelRunTimestamp: - """ Get a model prediction run for a particular model, creating one if it doesn't already exist. - """ - prediction_run = get_prediction_run( - session, prediction_model.id, prediction_run_timestamp) +def get_or_create_prediction_run(session, prediction_model: PredictionModel, prediction_run_timestamp: datetime.datetime) -> PredictionModelRunTimestamp: + """Get a model prediction run for a particular model, creating one if it doesn't already exist.""" + prediction_run = get_prediction_run(session, prediction_model.id, prediction_run_timestamp) if not prediction_run: - logger.info('Creating prediction run %s for %s', - prediction_model.abbreviation, prediction_run_timestamp) - prediction_run = create_prediction_run( - session, prediction_model.id, prediction_run_timestamp, False, False) + logger.info("Creating prediction run %s for %s", prediction_model.abbreviation, prediction_run_timestamp) + prediction_run = create_prediction_run(session, prediction_model.id, prediction_run_timestamp, False, False) return prediction_run -def get_model_run_predictions_for_station(session: Session, station_code: int, - prediction_run: PredictionModelRunTimestamp) -> List: - """ Get all the predictions for a provided model run """ +def get_model_run_predictions_for_station(session: Session, station_code: int, prediction_run: PredictionModelRunTimestamp) -> List: + """Get all the predictions for a provided model run""" logger.info("Getting model predictions for grid %s", prediction_run) - return session.query(ModelRunPrediction)\ - .filter(ModelRunPrediction.prediction_model_run_timestamp_id == - prediction_run.id)\ - .filter(ModelRunPrediction.station_code == station_code)\ + return ( + session.query(ModelRunPrediction) + .filter(ModelRunPrediction.prediction_model_run_timestamp_id == prediction_run.id) + .filter(ModelRunPrediction.station_code == station_code) .order_by(ModelRunPrediction.prediction_timestamp) + ) def delete_weather_station_model_predictions(session: Session, older_than: datetime): - """ Delete any weather model prediction older than a certain date. - """ - logger.info('Deleting weather station model prediction data older than %s...', older_than) - session.query(WeatherStationModelPrediction)\ - .filter(WeatherStationModelPrediction.prediction_timestamp < older_than)\ - .delete() - + """Delete any weather model prediction older than a certain date.""" + logger.info("Deleting weather station model prediction data older than %s...", older_than) + session.query(WeatherStationModelPrediction).filter(WeatherStationModelPrediction.prediction_timestamp < older_than).delete() + def delete_model_run_predictions(session: Session, older_than: datetime): - """ Delete any model run prediction older than a certain date. - """ - logger.info('Deleting model_run_prediction data older than %s...', older_than) - session.query(ModelRunPrediction)\ - .filter(ModelRunPrediction.prediction_timestamp < older_than)\ - .delete() + """Delete any model run prediction older than a certain date.""" + logger.info("Deleting model_run_prediction data older than %s...", older_than) + session.query(ModelRunPrediction).filter(ModelRunPrediction.prediction_timestamp < older_than).delete() def get_station_model_predictions_order_by_prediction_timestamp( - session: Session, - station_codes: List, - model: ModelEnum, - start_date: datetime.datetime, - end_date: datetime.datetime) -> List[ - Union[WeatherStationModelPrediction, PredictionModel]]: - """ Fetch model predictions for given stations within given time range ordered by station code + session: Session, station_codes: List, model: ModelEnum, start_date: datetime.datetime, end_date: datetime.datetime +) -> List[Union[WeatherStationModelPrediction, PredictionModel]]: + """Fetch model predictions for given stations within given time range ordered by station code and prediction timestamp. This is useful if you're interested in seeing all the different predictions regardles of model run. """ - query = session.query(WeatherStationModelPrediction, PredictionModel).\ - join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == - WeatherStationModelPrediction.prediction_model_run_timestamp_id).\ - join(PredictionModel, PredictionModel.id == - PredictionModelRunTimestamp.prediction_model_id).\ - filter(WeatherStationModelPrediction.station_code.in_(station_codes)).\ - filter(WeatherStationModelPrediction.prediction_timestamp >= start_date).\ - filter(WeatherStationModelPrediction.prediction_timestamp <= end_date).\ - filter(PredictionModel.abbreviation == model).\ - order_by(WeatherStationModelPrediction.station_code).\ - order_by(WeatherStationModelPrediction.prediction_timestamp) + query = ( + session.query(WeatherStationModelPrediction, PredictionModel) + .join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id) + .join(PredictionModel, PredictionModel.id == PredictionModelRunTimestamp.prediction_model_id) + .filter(WeatherStationModelPrediction.station_code.in_(station_codes)) + .filter(WeatherStationModelPrediction.prediction_timestamp >= start_date) + .filter(WeatherStationModelPrediction.prediction_timestamp <= end_date) + .filter(PredictionModel.abbreviation == model) + .order_by(WeatherStationModelPrediction.station_code) + .order_by(WeatherStationModelPrediction.prediction_timestamp) + ) return query def get_station_model_predictions( - session: Session, - station_codes: List, - model: str, - start_date: datetime.datetime, - end_date: datetime.datetime) -> List[ - Union[WeatherStationModelPrediction, PredictionModelRunTimestamp, PredictionModel]]: - """ Fetches the model predictions that were most recently issued before the prediction_timestamp. + session: Session, station_codes: List, model: str, start_date: datetime.datetime, end_date: datetime.datetime +) -> List[Union[WeatherStationModelPrediction, PredictionModelRunTimestamp, PredictionModel]]: + """Fetches the model predictions that were most recently issued before the prediction_timestamp. Used to compare the most recent model predictions against forecasts and actuals for the same weather date and weather station. Only fetches WeatherStationModelPredictions for prediction_timestamps in the date range of start_date - end_date (inclusive). """ - query = session.query(WeatherStationModelPrediction, PredictionModelRunTimestamp, PredictionModel).\ - filter(WeatherStationModelPrediction.station_code.in_(station_codes)).\ - filter(WeatherStationModelPrediction.prediction_timestamp >= start_date).\ - filter(WeatherStationModelPrediction.prediction_timestamp <= end_date).\ - filter(PredictionModelRunTimestamp.id == - WeatherStationModelPrediction.prediction_model_run_timestamp_id).\ - filter(PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id, - PredictionModel.abbreviation == model).\ - order_by(WeatherStationModelPrediction.station_code).\ - order_by(WeatherStationModelPrediction.prediction_timestamp).\ - order_by(PredictionModelRunTimestamp.prediction_run_timestamp.asc()) + query = ( + session.query(WeatherStationModelPrediction, PredictionModelRunTimestamp, PredictionModel) + .filter(WeatherStationModelPrediction.station_code.in_(station_codes)) + .filter(WeatherStationModelPrediction.prediction_timestamp >= start_date) + .filter(WeatherStationModelPrediction.prediction_timestamp <= end_date) + .filter(PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id) + .filter(PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id, PredictionModel.abbreviation == model) + .order_by(WeatherStationModelPrediction.station_code) + .order_by(WeatherStationModelPrediction.prediction_timestamp) + .order_by(PredictionModelRunTimestamp.prediction_run_timestamp.asc()) + ) return query -def get_latest_station_model_prediction_per_day(session: Session, - station_codes: List[int], - model: str, - day_start: datetime.datetime, - day_end: datetime.datetime): +def get_latest_station_model_prediction_per_day(session: Session, station_codes: List[int], model: str, day_start: datetime.datetime, day_end: datetime.datetime): """ All weather station model predictions for: - a given day @@ -173,167 +142,110 @@ def get_latest_station_model_prediction_per_day(session: Session, """ subquery = ( session.query( - func.max(WeatherStationModelPrediction.prediction_timestamp).label('latest_prediction'), + func.max(WeatherStationModelPrediction.prediction_timestamp).label("latest_prediction"), WeatherStationModelPrediction.station_code, - func.date(WeatherStationModelPrediction.prediction_timestamp).label('unique_day') + func.date(WeatherStationModelPrediction.prediction_timestamp).label("unique_day"), ) .filter( WeatherStationModelPrediction.station_code.in_(station_codes), WeatherStationModelPrediction.prediction_timestamp >= day_start, WeatherStationModelPrediction.prediction_timestamp <= day_end, - func.date_part('hour', WeatherStationModelPrediction.prediction_timestamp) == 20 + func.date_part("hour", WeatherStationModelPrediction.prediction_timestamp) == 20, ) - .group_by( - WeatherStationModelPrediction.station_code, - func.date(WeatherStationModelPrediction.prediction_timestamp).label('unique_day') - ) - .subquery('latest') + .group_by(WeatherStationModelPrediction.station_code, func.date(WeatherStationModelPrediction.prediction_timestamp).label("unique_day")) + .subquery("latest") ) - result = session.query( - WeatherStationModelPrediction.id, - WeatherStationModelPrediction.prediction_timestamp, - PredictionModel.abbreviation, - WeatherStationModelPrediction.station_code, - WeatherStationModelPrediction.rh_tgl_2, - WeatherStationModelPrediction.tmp_tgl_2, - WeatherStationModelPrediction.bias_adjusted_temperature, - WeatherStationModelPrediction.bias_adjusted_rh, - WeatherStationModelPrediction.apcp_sfc_0, - WeatherStationModelPrediction.wdir_tgl_10, - WeatherStationModelPrediction.wind_tgl_10, - WeatherStationModelPrediction.update_date)\ - .join(PredictionModelRunTimestamp, WeatherStationModelPrediction.prediction_model_run_timestamp_id == PredictionModelRunTimestamp.id)\ - .join(PredictionModel, PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id)\ - .join(subquery, and_( - WeatherStationModelPrediction.prediction_timestamp == subquery.c.latest_prediction, - WeatherStationModelPrediction.station_code == subquery.c.station_code))\ - .filter(PredictionModel.abbreviation == model)\ + result = ( + session.query( + WeatherStationModelPrediction.id, + WeatherStationModelPrediction.prediction_timestamp, + PredictionModel.abbreviation, + WeatherStationModelPrediction.station_code, + WeatherStationModelPrediction.rh_tgl_2, + WeatherStationModelPrediction.tmp_tgl_2, + WeatherStationModelPrediction.bias_adjusted_temperature, + WeatherStationModelPrediction.bias_adjusted_rh, + WeatherStationModelPrediction.apcp_sfc_0, + WeatherStationModelPrediction.wdir_tgl_10, + WeatherStationModelPrediction.wind_tgl_10, + WeatherStationModelPrediction.update_date, + ) + .join(PredictionModelRunTimestamp, WeatherStationModelPrediction.prediction_model_run_timestamp_id == PredictionModelRunTimestamp.id) + .join(PredictionModel, PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id) + .join(subquery, and_(WeatherStationModelPrediction.prediction_timestamp == subquery.c.latest_prediction, WeatherStationModelPrediction.station_code == subquery.c.station_code)) + .filter(PredictionModel.abbreviation == model) .order_by(WeatherStationModelPrediction.update_date.desc()) + ) return result -def get_latest_station_prediction_mat_view(session: Session, - station_codes: List[int], - day_start: datetime.datetime, - day_end: datetime.datetime): +def get_latest_station_prediction(session: Session, station_codes: List[int], day_start: datetime.datetime, day_end: datetime.datetime): logger.info("Getting data from materialized view.") - result = session.query(MoreCast2MaterializedView.prediction_timestamp, - MoreCast2MaterializedView.abbreviation, - MoreCast2MaterializedView.station_code, - MoreCast2MaterializedView.rh_tgl_2, - MoreCast2MaterializedView.tmp_tgl_2, - MoreCast2MaterializedView.bias_adjusted_temperature, - MoreCast2MaterializedView.bias_adjusted_rh, - MoreCast2MaterializedView.bias_adjusted_wind_speed, - MoreCast2MaterializedView.bias_adjusted_wdir, - MoreCast2MaterializedView.precip_24h, - MoreCast2MaterializedView.bias_adjusted_precip_24h, - MoreCast2MaterializedView.wdir_tgl_10, - MoreCast2MaterializedView.wind_tgl_10, - MoreCast2MaterializedView.update_date).\ - filter(MoreCast2MaterializedView.station_code.in_(station_codes), - MoreCast2MaterializedView.prediction_timestamp >= day_start, - MoreCast2MaterializedView.prediction_timestamp <= day_end) - return result - - -def get_latest_station_prediction_per_day(session: Session, - station_codes: List[int], - day_start: datetime.datetime, - day_end: datetime.datetime): - """ - All weather station model predictions for: - - a given day - - each station in the given list - ordered by update_timestamp - - This is done by joining the predictions on their runs, - that are filtered by the day and the 20:00UTC predictions. - - In turn prediction runs are filtered via a join - on runs that are for the selected model. - """ - subquery = ( + result = ( session.query( - func.max(WeatherStationModelPrediction.prediction_timestamp).label('latest_prediction'), + WeatherStationModelPrediction.prediction_timestamp, + PredictionModel.abbreviation, WeatherStationModelPrediction.station_code, - func.date(WeatherStationModelPrediction.prediction_timestamp).label('unique_day') + WeatherStationModelPrediction.rh_tgl_2, + WeatherStationModelPrediction.tmp_tgl_2, + WeatherStationModelPrediction.bias_adjusted_temperature, + WeatherStationModelPrediction.bias_adjusted_rh, + WeatherStationModelPrediction.bias_adjusted_wind_speed, + WeatherStationModelPrediction.bias_adjusted_wdir, + WeatherStationModelPrediction.precip_24h, + WeatherStationModelPrediction.bias_adjusted_precip_24h, + WeatherStationModelPrediction.wdir_tgl_10, + WeatherStationModelPrediction.wind_tgl_10, ) + .join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id) + .join(PredictionModel, PredictionModel.id == PredictionModelRunTimestamp.prediction_model_id) .filter( WeatherStationModelPrediction.station_code.in_(station_codes), WeatherStationModelPrediction.prediction_timestamp >= day_start, WeatherStationModelPrediction.prediction_timestamp <= day_end, - func.date_part('hour', WeatherStationModelPrediction.prediction_timestamp) == 20 ) - .group_by( - WeatherStationModelPrediction.station_code, - func.date(WeatherStationModelPrediction.prediction_timestamp).label('unique_day') - ) - .subquery('latest') ) - - result = session.query( - WeatherStationModelPrediction.prediction_timestamp, - PredictionModel.abbreviation, - WeatherStationModelPrediction.station_code, - WeatherStationModelPrediction.rh_tgl_2, - WeatherStationModelPrediction.tmp_tgl_2, - WeatherStationModelPrediction.bias_adjusted_temperature, - WeatherStationModelPrediction.bias_adjusted_rh, - WeatherStationModelPrediction.apcp_sfc_0, - WeatherStationModelPrediction.wdir_tgl_10, - WeatherStationModelPrediction.wind_tgl_10, - WeatherStationModelPrediction.update_date)\ - .join(PredictionModelRunTimestamp, WeatherStationModelPrediction.prediction_model_run_timestamp_id == PredictionModelRunTimestamp.id)\ - .join(PredictionModel, PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id)\ - .join(subquery, and_( - WeatherStationModelPrediction.prediction_timestamp == subquery.c.latest_prediction, - WeatherStationModelPrediction.station_code == subquery.c.station_code))\ - .order_by(WeatherStationModelPrediction.update_date.desc()) return result def get_station_model_prediction_from_previous_model_run( - session: Session, - station_code: int, - model: ModelEnum, - prediction_timestamp: datetime.datetime, - prediction_model_run_timestamp: datetime.datetime) -> List[WeatherStationModelPrediction]: - """ Fetches the one model prediction for the specified station_code, model, and prediction_timestamp + session: Session, station_code: int, model: ModelEnum, prediction_timestamp: datetime.datetime, prediction_model_run_timestamp: datetime.datetime +) -> List[WeatherStationModelPrediction]: + """Fetches the one model prediction for the specified station_code, model, and prediction_timestamp from the prediction model run immediately previous to the given prediction_model_run_timestamp. """ # create a lower_bound for time range so that we're not querying timestamps all the way back to the # beginning of time lower_bound = prediction_model_run_timestamp - datetime.timedelta(days=1) - response = session.query(WeatherStationModelPrediction).\ - join(PredictionModelRunTimestamp, - PredictionModelRunTimestamp.id == - WeatherStationModelPrediction.prediction_model_run_timestamp_id).\ - join(PredictionModel, PredictionModel.id == - PredictionModelRunTimestamp.prediction_model_id).\ - filter(WeatherStationModelPrediction.station_code == station_code).\ - filter(WeatherStationModelPrediction.prediction_timestamp == prediction_timestamp).\ - filter(PredictionModel.abbreviation == model).\ - filter(PredictionModelRunTimestamp.prediction_run_timestamp < prediction_model_run_timestamp).\ - filter(PredictionModelRunTimestamp.prediction_run_timestamp > lower_bound).\ - order_by(PredictionModelRunTimestamp.prediction_run_timestamp.desc()).\ - limit(1).first() + response = ( + session.query(WeatherStationModelPrediction) + .join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id) + .join(PredictionModel, PredictionModel.id == PredictionModelRunTimestamp.prediction_model_id) + .filter(WeatherStationModelPrediction.station_code == station_code) + .filter(WeatherStationModelPrediction.prediction_timestamp == prediction_timestamp) + .filter(PredictionModel.abbreviation == model) + .filter(PredictionModelRunTimestamp.prediction_run_timestamp < prediction_model_run_timestamp) + .filter(PredictionModelRunTimestamp.prediction_run_timestamp > lower_bound) + .order_by(PredictionModelRunTimestamp.prediction_run_timestamp.desc()) + .limit(1) + .first() + ) return response def get_processed_file_count(session: Session, urls: List[str]) -> int: - """ Return the number of matching urls """ + """Return the number of matching urls""" return session.query(ProcessedModelRunUrl).filter(ProcessedModelRunUrl.url.in_(urls)).count() def get_processed_file_record(session: Session, url: str) -> ProcessedModelRunUrl: - """ Get record corresponding to a processed file. """ - processed_file = session.query(ProcessedModelRunUrl).\ - filter(ProcessedModelRunUrl.url == url).first() + """Get record corresponding to a processed file.""" + processed_file = session.query(ProcessedModelRunUrl).filter(ProcessedModelRunUrl.url == url).first() return processed_file + def get_saved_model_run_for_sfms(session: Session, url: str) -> SavedModelRunForSFMSUrl: """Get record corresponding to a processed model run url for sfms""" return session.query(SavedModelRunForSFMSUrl).filter(SavedModelRunForSFMSUrl.url == url).first() @@ -346,6 +258,7 @@ def create_saved_model_run_for_sfms_url(session: Session, url: str, key: str): session.add(saved_model_run_for_sfms_url) session.commit() + def get_rdps_sfms_urls_for_deletion(session: Session, threshold: datetime): """Gets all records older than the provided threshold.""" return session.query(SavedModelRunForSFMSUrl).filter(SavedModelRunForSFMSUrl.create_date < threshold).all() @@ -362,9 +275,7 @@ def create_model_run_for_sfms(session: Session, model: ModelEnum, model_run_date date and model has been stored in S3. """ prediction_model = get_prediction_model_by_model_enum(session, model) - model_run_timestamp = datetime.datetime( - year=model_run_date.year, month=model_run_date.month, day=model_run_date.day, hour=model_run_hour, tzinfo=datetime.timezone.utc - ) + model_run_timestamp = datetime.datetime(year=model_run_date.year, month=model_run_date.month, day=model_run_date.day, hour=model_run_hour, tzinfo=datetime.timezone.utc) model_run_for_sfms = ModelRunForSFMS( prediction_model_id=prediction_model.id, model_run_timestamp=model_run_timestamp, @@ -380,42 +291,35 @@ def get_prediction_model_by_model_enum(session: Session, model_enum: ModelEnum): return session.query(PredictionModel).filter(PredictionModel.abbreviation == model_enum.value).first() -def get_prediction_model(session: Session, - model_enum: ModelEnum, - projection: ProjectionEnum) -> PredictionModel: - """ Get the prediction model corresponding to a particular abbreviation and projection. """ - return session.query(PredictionModel).\ - filter(PredictionModel.abbreviation == model_enum.value).\ - filter(PredictionModel.projection == projection.value).first() +def get_prediction_model(session: Session, model_enum: ModelEnum, projection: ProjectionEnum) -> PredictionModel: + """Get the prediction model corresponding to a particular abbreviation and projection.""" + return session.query(PredictionModel).filter(PredictionModel.abbreviation == model_enum.value).filter(PredictionModel.projection == projection.value).first() -def get_prediction_model_run_timestamp_records( - session: Session, model_type: ModelEnum, complete: bool = True, interpolated: bool = True): - """ Get prediction model run timestamps (filter on complete and interpolated if provided.) """ - query = session.query(PredictionModelRunTimestamp, PredictionModel) \ - .join(PredictionModelRunTimestamp, - PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id)\ +def get_prediction_model_run_timestamp_records(session: Session, model_type: ModelEnum, complete: bool = True, interpolated: bool = True): + """Get prediction model run timestamps (filter on complete and interpolated if provided.)""" + query = ( + session.query(PredictionModelRunTimestamp, PredictionModel) + .join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id) .filter(PredictionModel.abbreviation == model_type.value) + ) if interpolated is not None: - query = query.filter( - PredictionModelRunTimestamp.interpolated == interpolated) + query = query.filter(PredictionModelRunTimestamp.interpolated == interpolated) if complete is not None: query = query.filter(PredictionModelRunTimestamp.complete == complete) query = query.order_by(PredictionModelRunTimestamp.prediction_run_timestamp) return query -def get_weather_station_model_prediction(session: Session, - station_code: int, - prediction_model_run_timestamp_id: int, - prediction_timestamp: datetime) -> WeatherStationModelPrediction: - """ Get the model prediction for a weather station given a model run and a timestamp. """ - return session.query(WeatherStationModelPrediction).\ - filter(WeatherStationModelPrediction.station_code == station_code).\ - filter(WeatherStationModelPrediction.prediction_model_run_timestamp_id == - prediction_model_run_timestamp_id).\ - filter(WeatherStationModelPrediction.prediction_timestamp == - prediction_timestamp).first() +def get_weather_station_model_prediction(session: Session, station_code: int, prediction_model_run_timestamp_id: int, prediction_timestamp: datetime) -> WeatherStationModelPrediction: + """Get the model prediction for a weather station given a model run and a timestamp.""" + return ( + session.query(WeatherStationModelPrediction) + .filter(WeatherStationModelPrediction.station_code == station_code) + .filter(WeatherStationModelPrediction.prediction_model_run_timestamp_id == prediction_model_run_timestamp_id) + .filter(WeatherStationModelPrediction.prediction_timestamp == prediction_timestamp) + .first() + ) def refresh_morecast2_materialized_view(session: Session): diff --git a/api/app/jobs/common_model_fetchers.py b/api/app/jobs/common_model_fetchers.py index 8141949e8..ff36ce90c 100644 --- a/api/app/jobs/common_model_fetchers.py +++ b/api/app/jobs/common_model_fetchers.py @@ -14,7 +14,6 @@ get_weather_station_model_prediction, delete_weather_station_model_predictions, delete_model_run_predictions, - refresh_morecast2_materialized_view, ) from app.weather_models.machine_learning import StationMachineLearning from app.weather_models import ModelEnum @@ -24,8 +23,7 @@ import app.utils.time as time_utils from app.utils.redis import create_redis from app.stations import get_stations_synchronously -from app.db.models.weather_models import (ProcessedModelRunUrl, PredictionModelRunTimestamp, - WeatherStationModelPrediction, ModelRunPrediction) +from app.db.models.weather_models import ProcessedModelRunUrl, PredictionModelRunTimestamp, WeatherStationModelPrediction, ModelRunPrediction import app.db.database from app.db.crud.observations import get_accumulated_precipitation @@ -40,11 +38,11 @@ class UnhandledPredictionModelType(Exception): - """ Exception raised when an unknown model type is encountered. """ + """Exception raised when an unknown model type is encountered.""" class CompletedWithSomeExceptions(Exception): - """ Exception raised when processing completed, but there were some non critical exceptions """ + """Exception raised when processing completed, but there were some non critical exceptions""" def download(url: str, path: str, config_cache_var: str, model_name: str, config_cache_expiry_var=None) -> str: @@ -55,7 +53,7 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config is a security concern. TODO: Would be nice to make this an async """ - if model_name == 'GFS': + if model_name == "GFS": original_filename = os.path.split(url)[-1] # NOTE: This is a very not-ideal way to interpolate the filename. # The original_filename that we get from the url is too long and must be condensed. @@ -63,7 +61,7 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config # As long as NOAA's API remains unchanged, we'll have all the info we need (run datetimes, # projections, etc.) in the first 81 characters of original_filename. # An alternative would be to build out a regex to look for - filename = original_filename[:81].replace('.', '') + filename = original_filename[:81].replace(".", "") else: # Infer filename from url. filename = os.path.split(url)[-1] @@ -74,7 +72,7 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config # saves having to re-download the file all the time. # It also save a lot of bandwidth in our dev environment, where we have multiple workers downloading # the same files over and over. - if config.get(config_cache_var) == 'True': + if config.get(config_cache_var) == "True": cache = create_redis() try: cached_object = cache.get(url) @@ -85,13 +83,13 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config cached_object = None cache = None if cached_object: - logger.info('Cache hit %s', url) + logger.info("Cache hit %s", url) # Store the cached object in a file - with open(target, 'wb') as file_object: + with open(target, "wb") as file_object: # Write the file. file_object.write(cached_object) else: - logger.info('Downloading %s', url) + logger.info("Downloading %s", url) # It's important to have a timeout on the get, otherwise the call may get stuck for an indefinite # amount of time - there is no default value for timeout. During testing, it was observed that # downloads usually complete in less than a second. @@ -99,17 +97,17 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config # If the response is 200/OK. if response.status_code == 200: # Store the response. - with open(target, 'wb') as file_object: + with open(target, "wb") as file_object: # Write the file. file_object.write(response.content) # Cache the response if cache: - with open(target, 'rb') as file_object: + with open(target, "rb") as file_object: # Cache for 6 hours (21600 seconds) cache.set(url, file_object.read(), ex=config.get(config_cache_expiry_var, 21600)) elif response.status_code == 404: # We expect this to happen frequently - just log for info. - logger.info('404 error for %s', url) + logger.info("404 error for %s", url) target = None else: # Raise an exception @@ -119,40 +117,34 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config def get_closest_index(coordinate: List, points: List): - """ Get the index of the point closest to the coordinate """ + """Get the index of the point closest to the coordinate""" # https://pyproj4.github.io/pyproj/stable/api/geod.html # Use GRS80 ellipsoid (it's what NAD83 uses) geod = Geod(ellps="GRS80") # Calculate the distance each point is from the coordinate. - _, _, distances = geod.inv([coordinate[0] for _ in range(4)], - [coordinate[1] for _ in range(4)], - [x[0] for x in points], - [x[1] for x in points]) + _, _, distances = geod.inv([coordinate[0] for _ in range(4)], [coordinate[1] for _ in range(4)], [x[0] for x in points], [x[1] for x in points]) # Return the index of the point with the shortest distance. return numpy.argmin(distances) def flag_file_as_processed(url: str, session: Session): - """ Flag the file as processed in the database """ + """Flag the file as processed in the database""" processed_file = get_processed_file_record(session, url) if processed_file: - logger.info('re-procesed %s', url) + logger.info("re-procesed %s", url) else: - logger.info('file processed %s', url) - processed_file = ProcessedModelRunUrl( - url=url, - create_date=time_utils.get_utc_now()) + logger.info("file processed %s", url) + processed_file = ProcessedModelRunUrl(url=url, create_date=time_utils.get_utc_now()) processed_file.update_date = time_utils.get_utc_now() session.add(processed_file) session.commit() def check_if_model_run_complete(session: Session, urls): - """ Check if a particular model run is complete """ + """Check if a particular model run is complete""" actual_count = get_processed_file_count(session, urls) expected_count = len(urls) - logger.info('we have processed %s/%s files', - actual_count, expected_count) + logger.info("we have processed %s/%s files", actual_count, expected_count) return actual_count == expected_count and actual_count > 0 @@ -170,7 +162,7 @@ def apply_data_retention_policy(): def accumulate_nam_precipitation(nam_cumulative_precip: float, prediction: ModelRunPrediction, model_run_hour: int): - """ Calculate overall cumulative precip and cumulative precip for the current prediction. """ + """Calculate overall cumulative precip and cumulative precip for the current prediction.""" # 00 and 12 hour model runs accumulate precipitation in 12 hour intervals, 06 and 18 hour accumulate in # 3 hour intervals nam_accumulation_interval = 3 if model_run_hour == 6 or model_run_hour == 18 else 12 @@ -184,117 +176,78 @@ def accumulate_nam_precipitation(nam_cumulative_precip: float, prediction: Model class ModelValueProcessor: - """ Iterate through model runs that have completed, and calculate the interpolated weather predictions. - """ + """Iterate through model runs that have completed, and calculate the interpolated weather predictions.""" def __init__(self, session): - """ Prepare variables we're going to use throughout """ + """Prepare variables we're going to use throughout""" self.session = session self.stations = get_stations_synchronously() self.station_count = len(self.stations) def _process_model_run(self, model_run: PredictionModelRunTimestamp, model_type: ModelEnum): - """ Interpolate predictions in the provided model run for all stations. """ - logger.info('Interpolating values for model run: %s', model_run) + """Interpolate predictions in the provided model run for all stations.""" + logger.info("Interpolating values for model run: %s", model_run) # Iterate through stations. for index, station in enumerate(self.stations): - logger.info('Interpolating model run %s (%s/%s) for %s:%s', - model_run.id, - index, self.station_count, - station.code, station.name) + logger.info("Interpolating model run %s (%s/%s) for %s:%s", model_run.id, index, self.station_count, station.code, station.name) # Process this model run for station. self._process_model_run_for_station(model_run, station, model_type) # Commit all the weather station model predictions (it's fast if we line them all up and commit # them in one go.) - logger.info('commit to database...') + logger.info("commit to database...") self.session.commit() - logger.info('done commit.') + logger.info("done commit.") def _add_interpolated_bias_adjustments_to_prediction(self, station_prediction: WeatherStationModelPrediction, machine: StationMachineLearning): # We need to interpolate prediction for 2000 using predictions for 1800 and 2100 # Predict the temperature - temp_at_1800 = machine.predict_temperature(station_prediction.tmp_tgl_2, - station_prediction.prediction_timestamp.replace(hour=18)) - temp_at_2100 = machine.predict_temperature(station_prediction.tmp_tgl_2, - station_prediction.prediction_timestamp.replace(hour=21)) - station_prediction.bias_adjusted_temperature = interpolate_between_two_points(18, 21, temp_at_1800, - temp_at_2100, 20) + temp_at_1800 = machine.predict_temperature(station_prediction.tmp_tgl_2, station_prediction.prediction_timestamp.replace(hour=18)) + temp_at_2100 = machine.predict_temperature(station_prediction.tmp_tgl_2, station_prediction.prediction_timestamp.replace(hour=21)) + station_prediction.bias_adjusted_temperature = interpolate_between_two_points(18, 21, temp_at_1800, temp_at_2100, 20) # Predict the rh - rh_at_1800 = machine.predict_rh(station_prediction.rh_tgl_2, - station_prediction.prediction_timestamp.replace(hour=18)) - rh_at_2100 = machine.predict_rh(station_prediction.rh_tgl_2, - station_prediction.prediction_timestamp.replace(hour=21)) + rh_at_1800 = machine.predict_rh(station_prediction.rh_tgl_2, station_prediction.prediction_timestamp.replace(hour=18)) + rh_at_2100 = machine.predict_rh(station_prediction.rh_tgl_2, station_prediction.prediction_timestamp.replace(hour=21)) station_prediction.bias_adjusted_rh = interpolate_between_two_points(18, 21, rh_at_1800, rh_at_2100, 20) # Predict the wind speed - wind_speed_at_1800 = machine.predict_wind_speed(station_prediction.wind_tgl_10, - station_prediction.prediction_timestamp.replace(hour=18)) - wind_speed_at_2100 = machine.predict_wind_speed(station_prediction.wind_tgl_10, - station_prediction.prediction_timestamp.replace(hour=21)) - station_prediction.bias_adjusted_wind_speed = interpolate_between_two_points(18, 21, wind_speed_at_1800, - wind_speed_at_2100, 20) - + wind_speed_at_1800 = machine.predict_wind_speed(station_prediction.wind_tgl_10, station_prediction.prediction_timestamp.replace(hour=18)) + wind_speed_at_2100 = machine.predict_wind_speed(station_prediction.wind_tgl_10, station_prediction.prediction_timestamp.replace(hour=21)) + station_prediction.bias_adjusted_wind_speed = interpolate_between_two_points(18, 21, wind_speed_at_1800, wind_speed_at_2100, 20) + # Predict the wind direction wind_direction_at_1800 = station_prediction.bias_adjusted_wdir = machine.predict_wind_direction( - station_prediction.wind_tgl_10, - station_prediction.wdir_tgl_10, - station_prediction.prediction_timestamp.replace(hour=18) + station_prediction.wind_tgl_10, station_prediction.wdir_tgl_10, station_prediction.prediction_timestamp.replace(hour=18) ) wind_direction_at_2100 = station_prediction.bias_adjusted_wdir = machine.predict_wind_direction( - station_prediction.wind_tgl_10, - station_prediction.wdir_tgl_10, - station_prediction.prediction_timestamp.replace(hour=21) + station_prediction.wind_tgl_10, station_prediction.wdir_tgl_10, station_prediction.prediction_timestamp.replace(hour=21) ) - station_prediction.bias_adjusted_wdir = interpolate_between_two_points(18, 21, wind_direction_at_1800, - wind_direction_at_2100, 20) + station_prediction.bias_adjusted_wdir = interpolate_between_two_points(18, 21, wind_direction_at_1800, wind_direction_at_2100, 20) # Predict the 24h precipitation. No interpolation necessary due to the underlying model training. - station_prediction.bias_adjusted_precip_24h = machine.predict_precipitation( - station_prediction.precip_24h, - station_prediction.prediction_timestamp - ) - - def _add_bias_adjustments_to_prediction(self, station_prediction: WeatherStationModelPrediction, - machine: StationMachineLearning): + station_prediction.bias_adjusted_precip_24h = machine.predict_precipitation(station_prediction.precip_24h, station_prediction.prediction_timestamp) + + def _add_bias_adjustments_to_prediction(self, station_prediction: WeatherStationModelPrediction, machine: StationMachineLearning): # Predict the temperature - station_prediction.bias_adjusted_temperature = machine.predict_temperature( - station_prediction.tmp_tgl_2, - station_prediction.prediction_timestamp) - + station_prediction.bias_adjusted_temperature = machine.predict_temperature(station_prediction.tmp_tgl_2, station_prediction.prediction_timestamp) + # Predict the rh - station_prediction.bias_adjusted_rh = machine.predict_rh(station_prediction.rh_tgl_2, - station_prediction.prediction_timestamp) - + station_prediction.bias_adjusted_rh = machine.predict_rh(station_prediction.rh_tgl_2, station_prediction.prediction_timestamp) + # Predict the wind speed - station_prediction.bias_adjusted_wind_speed = machine.predict_wind_speed( - station_prediction.wind_tgl_10, - station_prediction.prediction_timestamp - ) + station_prediction.bias_adjusted_wind_speed = machine.predict_wind_speed(station_prediction.wind_tgl_10, station_prediction.prediction_timestamp) # Predict the wind direction - station_prediction.bias_adjusted_wdir = machine.predict_wind_direction( - station_prediction.wind_tgl_10, - station_prediction.wdir_tgl_10, - station_prediction.prediction_timestamp - ) + station_prediction.bias_adjusted_wdir = machine.predict_wind_direction(station_prediction.wind_tgl_10, station_prediction.wdir_tgl_10, station_prediction.prediction_timestamp) # Predict the 24h precipitation - station_prediction.bias_adjusted_precip_24h = machine.predict_precipitation( - station_prediction.precip_24h, - station_prediction.prediction_timestamp - ) + station_prediction.bias_adjusted_precip_24h = machine.predict_precipitation(station_prediction.precip_24h, station_prediction.prediction_timestamp) - def _process_prediction(self, - prediction: ModelRunPrediction, - station: WeatherStation, - model_run: PredictionModelRunTimestamp, - machine: StationMachineLearning, - prediction_is_interpolated: bool): - """ Create a WeatherStationModelPrediction from the ModelRunPrediction data. - """ + def _process_prediction( + self, prediction: ModelRunPrediction, station: WeatherStation, model_run: PredictionModelRunTimestamp, machine: StationMachineLearning, prediction_is_interpolated: bool + ): + """Create a WeatherStationModelPrediction from the ModelRunPrediction data.""" # If there's already a prediction, we want to update it - station_prediction = get_weather_station_model_prediction( - self.session, station.code, model_run.id, prediction.prediction_timestamp) + station_prediction = get_weather_station_model_prediction(self.session, station.code, model_run.id, prediction.prediction_timestamp) if station_prediction is None: station_prediction = WeatherStationModelPrediction() # Populate the weather station prediction object. @@ -306,7 +259,7 @@ def _process_prediction(self, # NOTE: Not sure why this value would ever be None. This could happen if for whatever reason, the # tmp_tgl_2 layer failed to download and process, while other layers did. if prediction.tmp_tgl_2 is None: - logger.warning('tmp_tgl_2 is None for ModelRunPrediction.id == %s', prediction.id) + logger.warning("tmp_tgl_2 is None for ModelRunPrediction.id == %s", prediction.id) else: station_prediction.tmp_tgl_2 = prediction.tmp_tgl_2 @@ -315,7 +268,7 @@ def _process_prediction(self, # rh_tgl_2 layer failed to download and process, while other layers did. if prediction.rh_tgl_2 is None: # This is unexpected, so we log it. - logger.warning('rh_tgl_2 is None for ModelRunPrediction.id == %s', prediction.id) + logger.warning("rh_tgl_2 is None for ModelRunPrediction.id == %s", prediction.id) station_prediction.rh_tgl_2 = None else: station_prediction.rh_tgl_2 = prediction.rh_tgl_2 @@ -328,10 +281,8 @@ def _process_prediction(self, # Calculate the delta_precipitation and 24 hour precip based on station's previous prediction_timestamp # for the same model run self.session.flush() - station_prediction.precip_24h = self._calculate_past_24_hour_precip( - station, model_run, prediction, station_prediction) - station_prediction.delta_precip = self._calculate_delta_precip( - station, model_run, prediction, station_prediction) + station_prediction.precip_24h = self._calculate_past_24_hour_precip(station, model_run, prediction, station_prediction) + station_prediction.delta_precip = self._calculate_delta_precip(station, model_run, prediction, station_prediction) # Get the closest wind speed if prediction.wind_tgl_10 is not None: @@ -344,7 +295,7 @@ def _process_prediction(self, # Dealing with a numerical weather model that only has predictions at 3 hour intervals, # so no 20:00 UTC prediction available in the trained linear regression self._add_interpolated_bias_adjustments_to_prediction(station_prediction, machine) - + else: # No interpolation required self._add_bias_adjustments_to_prediction(station_prediction, machine) @@ -354,15 +305,15 @@ def _process_prediction(self, # Add this prediction to the session (we'll commit it later.) self.session.add(station_prediction) - def _calculate_past_24_hour_precip(self, station: WeatherStation, model_run: PredictionModelRunTimestamp, - prediction: ModelRunPrediction, station_prediction: WeatherStationModelPrediction): - """ Calculate the predicted precipitation over the previous 24 hours within the specified model run. + def _calculate_past_24_hour_precip( + self, station: WeatherStation, model_run: PredictionModelRunTimestamp, prediction: ModelRunPrediction, station_prediction: WeatherStationModelPrediction + ): + """Calculate the predicted precipitation over the previous 24 hours within the specified model run. If the model run does not contain a prediction timestamp for 24 hours prior to the current prediction, - return the predicted precipitation from the previous run of the same model for the same time frame. """ + return the predicted precipitation from the previous run of the same model for the same time frame.""" start_prediction_timestamp = prediction.prediction_timestamp - timedelta(days=1) # Check if a prediction exists for this model run 24 hours in the past - previous_prediction_from_same_model_run = get_weather_station_model_prediction(self.session, station.code, - model_run.id, start_prediction_timestamp) + previous_prediction_from_same_model_run = get_weather_station_model_prediction(self.session, station.code, model_run.id, start_prediction_timestamp) # If a prediction from 24 hours ago from the same model run exists, return the difference in cumulative precipitation # between now and then as our total for the past 24 hours. We can end up with very very small negative numbers due # to floating point math, so return absolute value to avoid displaying -0.0. @@ -373,24 +324,23 @@ def _calculate_past_24_hour_precip(self, station: WeatherStation, model_run: Pre # We use actual precipitation from our API hourly_actuals table to make up the missing hours. prediction_timestamp = station_prediction.prediction_timestamp # Create new datetime with time of 00:00 hours as the end time. - end_prediction_timestamp = datetime(year=prediction_timestamp.year, - month=prediction_timestamp.month, - day=prediction_timestamp.day, - tzinfo=timezone.utc) - actual_precip = get_accumulated_precipitation( - self.session, station.code, start_prediction_timestamp, end_prediction_timestamp) + end_prediction_timestamp = datetime(year=prediction_timestamp.year, month=prediction_timestamp.month, day=prediction_timestamp.day, tzinfo=timezone.utc) + actual_precip = get_accumulated_precipitation(self.session, station.code, start_prediction_timestamp, end_prediction_timestamp) return actual_precip + station_prediction.apcp_sfc_0 def _calculate_delta_precip(self, station, model_run, prediction, station_prediction): - """ Calculate the station_prediction's delta_precip based on the previous precip + """Calculate the station_prediction's delta_precip based on the previous precip prediction for the station """ - results = self.session.query(WeatherStationModelPrediction).\ - filter(WeatherStationModelPrediction.station_code == station.code).\ - filter(WeatherStationModelPrediction.prediction_model_run_timestamp_id == model_run.id).\ - filter(WeatherStationModelPrediction.prediction_timestamp < prediction.prediction_timestamp).\ - order_by(WeatherStationModelPrediction.prediction_timestamp.desc()).\ - limit(1).first() + results = ( + self.session.query(WeatherStationModelPrediction) + .filter(WeatherStationModelPrediction.station_code == station.code) + .filter(WeatherStationModelPrediction.prediction_model_run_timestamp_id == model_run.id) + .filter(WeatherStationModelPrediction.prediction_timestamp < prediction.prediction_timestamp) + .order_by(WeatherStationModelPrediction.prediction_timestamp.desc()) + .limit(1) + .first() + ) # If there exists a previous prediction for the station from the same model run if results is not None: return station_prediction.apcp_sfc_0 - results.apcp_sfc_0 @@ -399,23 +349,15 @@ def _calculate_delta_precip(self, station, model_run, prediction, station_predic # model type). In this case, delta_precip will be equal to the apcp return station_prediction.apcp_sfc_0 - def _process_model_run_for_station(self, - model_run: PredictionModelRunTimestamp, - station: WeatherStation, - model_type: ModelEnum): - """ Process the model run for the provided station. - """ + def _process_model_run_for_station(self, model_run: PredictionModelRunTimestamp, station: WeatherStation, model_type: ModelEnum): + """Process the model run for the provided station.""" # Extract the coordinate. coordinate = [station.long, station.lat] # Lookup the grid our weather station is in. - logger.info("Getting grid for coordinate %s and model %s", - coordinate, model_run.prediction_model) + logger.info("Getting grid for coordinate %s and model %s", coordinate, model_run.prediction_model) machine = StationMachineLearning( - session=self.session, - model=model_run.prediction_model, - target_coordinate=coordinate, - station_code=station.code, - max_learn_date=model_run.prediction_run_timestamp) + session=self.session, model=model_run.prediction_model, target_coordinate=coordinate, station_code=station.code, max_learn_date=model_run.prediction_run_timestamp + ) machine.learn() # Get all the predictions associated to this particular model run. @@ -428,41 +370,29 @@ def _process_model_run_for_station(self, for prediction in query: # NAM model requires manual calculation of cumulative precip if model_type == ModelEnum.NAM: - nam_cumulative_precip, prediction.apcp_sfc_0 = accumulate_nam_precipitation( - nam_cumulative_precip, prediction, model_run.prediction_run_timestamp.hour) - if (prev_prediction is not None - and prev_prediction.prediction_timestamp.hour == 18 - and prediction.prediction_timestamp.hour == 21): + nam_cumulative_precip, prediction.apcp_sfc_0 = accumulate_nam_precipitation(nam_cumulative_precip, prediction, model_run.prediction_run_timestamp.hour) + if prev_prediction is not None and prev_prediction.prediction_timestamp.hour == 18 and prediction.prediction_timestamp.hour == 21: noon_prediction = construct_interpolated_noon_prediction(prev_prediction, prediction, SCALAR_MODEL_VALUE_KEYS_FOR_INTERPOLATION) - self._process_prediction( - noon_prediction, station, model_run, machine, True) - self._process_prediction( - prediction, station, model_run, machine, False) + self._process_prediction(noon_prediction, station, model_run, machine, True) + self._process_prediction(prediction, station, model_run, machine, False) prev_prediction = prediction def _mark_model_run_interpolated(self, model_run: PredictionModelRunTimestamp): - """ Having completely processed a model run, we can mark it has having been interpolated. - """ + """Having completely processed a model run, we can mark it has having been interpolated.""" model_run.interpolated = True - logger.info('marking %s as interpolated', model_run) + logger.info("marking %s as interpolated", model_run) self.session.add(model_run) self.session.commit() def process(self, model_type: ModelEnum): - """ Entry point to start processing model runs that have not yet had their predictions interpolated - """ + """Entry point to start processing model runs that have not yet had their predictions interpolated""" # Get model runs that are complete (fully downloaded), but not yet interpolated. - query = get_prediction_model_run_timestamp_records( - self.session, complete=True, interpolated=False, model_type=model_type) - model_processed = False + query = get_prediction_model_run_timestamp_records(self.session, complete=True, interpolated=False, model_type=model_type) for model_run, model in query: - model_processed = True - logger.info('model %s', model) - logger.info('model_run %s', model_run) + logger.info("model %s", model) + logger.info("model_run %s", model_run) # Process the model run. self._process_model_run(model_run, model_type) # Mark the model run as interpolated. self._mark_model_run_interpolated(model_run) - if model_processed: - refresh_morecast2_materialized_view(self.session) diff --git a/api/app/weather_models/fetch/predictions.py b/api/app/weather_models/fetch/predictions.py index 1426ed85a..9a6e648e2 100644 --- a/api/app/weather_models/fetch/predictions.py +++ b/api/app/weather_models/fetch/predictions.py @@ -1,22 +1,24 @@ -""" Code for fetching data for API. -""" +"""Code for fetching data for API.""" from itertools import groupby import logging from typing import List import datetime from datetime import time +from time import perf_counter from collections import defaultdict import pytz from sqlalchemy.orm import Session import app.db.database from app.schemas.morecast_v2 import WeatherIndeterminate -from app.schemas.weather_models import (WeatherStationModelPredictionValues, WeatherModelPredictionValues, WeatherModelRun, - ModelRunPredictions, - WeatherStationModelRunsPredictions) +from app.schemas.weather_models import WeatherStationModelPredictionValues, WeatherModelPredictionValues, WeatherModelRun, ModelRunPredictions, WeatherStationModelRunsPredictions from app.db.models.weather_models import WeatherStationModelPrediction -from app.db.crud.weather_models import (get_latest_station_model_prediction_per_day, get_station_model_predictions, - get_station_model_prediction_from_previous_model_run, get_latest_station_prediction_mat_view) +from app.db.crud.weather_models import ( + get_latest_station_model_prediction_per_day, + get_station_model_predictions, + get_station_model_prediction_from_previous_model_run, + get_latest_station_prediction, +) import app.stations from app.utils.time import get_days_from_range from app.weather_models import ModelEnum @@ -25,34 +27,26 @@ class MatchingStationNotFoundException(Exception): - """ Exception raised when station cannot be found. """ + """Exception raised when station cannot be found.""" def _fetch_delta_precip_for_prev_model_run( - session: Session, - model: ModelEnum, - prediction: WeatherStationModelPrediction, - prev_station_predictions: dict, - prediction_model_run_timestamp: datetime.datetime): + session: Session, model: ModelEnum, prediction: WeatherStationModelPrediction, prev_station_predictions: dict, prediction_model_run_timestamp: datetime.datetime +): # Look if we can find the previous value in memory if prediction.prediction_timestamp in prev_station_predictions[prediction.station_code]: prev_station_prediction = prev_station_predictions[prediction.station_code] - return prev_station_prediction[prediction.prediction_timestamp]['prediction'].delta_precipitation + return prev_station_prediction[prediction.prediction_timestamp]["prediction"].delta_precipitation # Uh oh - couldn't find it - let's go look in the database. # This should only happen in extreme edge cases! - prev_prediction = get_station_model_prediction_from_previous_model_run( - session, prediction.station_code, model, prediction.prediction_timestamp, - prediction_model_run_timestamp) + prev_prediction = get_station_model_prediction_from_previous_model_run(session, prediction.station_code, model, prediction.prediction_timestamp, prediction_model_run_timestamp) if prev_prediction: return prev_prediction.delta_precip return None -async def fetch_model_run_predictions_by_station_code( - model: ModelEnum, - station_codes: List[int], - time_of_interest: datetime) -> List[WeatherStationModelRunsPredictions]: - """ Fetch model predictions from database based on list of station codes, for a specified datetime. +async def fetch_model_run_predictions_by_station_code(model: ModelEnum, station_codes: List[int], time_of_interest: datetime) -> List[WeatherStationModelRunsPredictions]: + """Fetch model predictions from database based on list of station codes, for a specified datetime. Predictions are grouped by station and model run. """ # We're interested in the 5 days prior to and 10 days following the time_of_interest. @@ -62,31 +56,26 @@ async def fetch_model_run_predictions_by_station_code( async def fetch_model_run_predictions_by_station_code_and_date_range( - model: ModelEnum, - station_codes: List[int], - start_time: datetime.datetime, - end_time: datetime.datetime) -> List[WeatherStationModelRunsPredictions]: - """ Fetch model predictions from database based on list of station codes and date range. + model: ModelEnum, station_codes: List[int], start_time: datetime.datetime, end_time: datetime.datetime +) -> List[WeatherStationModelRunsPredictions]: + """Fetch model predictions from database based on list of station codes and date range. Predictions are grouped by station and model run. """ # send the query (ordered by prediction date.) with app.db.database.get_read_session_scope() as session: - historic_predictions = get_station_model_predictions( - session, station_codes, model, start_time, end_time) + historic_predictions = get_station_model_predictions(session, station_codes, model, start_time, end_time) return await marshall_predictions(session, model, station_codes, historic_predictions) -async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_range(model: ModelEnum, - station_codes: List[int], - start_time: datetime.datetime, - end_time: datetime.datetime) -> List[WeatherStationModelRunsPredictions]: +async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_range( + model: ModelEnum, station_codes: List[int], start_time: datetime.datetime, end_time: datetime.datetime +) -> List[WeatherStationModelRunsPredictions]: results = [] days = get_days_from_range(start_time, end_time) stations = {station.code: station for station in await app.stations.get_stations_by_codes(station_codes)} with app.db.database.get_read_session_scope() as session: - for day in days: day_results = [] vancouver_tz = pytz.timezone("America/Vancouver") @@ -94,8 +83,7 @@ async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_rang day_start = vancouver_tz.localize(datetime.datetime.combine(day, time.min)) day_end = vancouver_tz.localize(datetime.datetime.combine(day, time.max)) - daily_result = get_latest_station_model_prediction_per_day( - session, station_codes, model, day_start, day_end) + daily_result = get_latest_station_model_prediction_per_day(session, station_codes, model, day_start, day_end) for id, timestamp, model_abbrev, station_code, rh, temp, bias_adjusted_temp, bias_adjusted_rh, precip_24hours, wind_dir, wind_speed, update_date in daily_result: day_results.append( WeatherStationModelPredictionValues( @@ -110,8 +98,9 @@ async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_rang wind_speed=wind_speed, wind_direction=wind_dir, datetime=timestamp, - update_date=update_date - )) + update_date=update_date, + ) + ) # sort the list by station_code day_results.sort(key=lambda x: x.station.code) @@ -124,10 +113,10 @@ async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_rang return results -async def fetch_latest_model_run_predictions_by_station_code_and_date_range(session: Session, - station_codes: List[int], - start_time: datetime.datetime, - end_time: datetime.datetime) -> List[WeatherIndeterminate]: +async def fetch_latest_model_run_predictions_by_station_code_and_date_range( + session: Session, station_codes: List[int], start_time: datetime.datetime, end_time: datetime.datetime +) -> List[WeatherIndeterminate]: + cffdrs_start = perf_counter() results: List[WeatherIndeterminate] = [] days = get_days_from_range(start_time, end_time) stations = {station.code: station for station in await app.stations.get_stations_by_codes(station_codes)} @@ -138,10 +127,22 @@ async def fetch_latest_model_run_predictions_by_station_code_and_date_range(sess day_start = vancouver_tz.localize(datetime.datetime.combine(day, time.min)) day_end = vancouver_tz.localize(datetime.datetime.combine(day, time.max)) - daily_result = get_latest_station_prediction_mat_view( - session, active_station_codes, day_start, day_end) - for timestamp, model_abbrev, station_code, rh, temp, bias_adjusted_temp, bias_adjusted_rh, bias_adjusted_wind_speed, bias_adjusted_wdir, precip_24hours, bias_adjusted_precip_24h, wind_dir, wind_speed, update_date in daily_result: - + daily_result = get_latest_station_prediction(session, active_station_codes, day_start, day_end) + for ( + timestamp, + model_abbrev, + station_code, + rh, + temp, + bias_adjusted_temp, + bias_adjusted_rh, + bias_adjusted_wind_speed, + bias_adjusted_wdir, + precip_24hours, + bias_adjusted_precip_24h, + wind_dir, + wind_speed, + ) in daily_result: # Create two WeatherIndeterminates, one for model predictions and one for bias corrected predictions results.append( WeatherIndeterminate( @@ -153,21 +154,29 @@ async def fetch_latest_model_run_predictions_by_station_code_and_date_range(sess relative_humidity=rh, precipitation=precip_24hours, wind_direction=wind_dir, - wind_speed=wind_speed - )) + wind_speed=wind_speed, + ) + ) results.append( WeatherIndeterminate( station_code=station_code, station_name=stations[station_code].name, - determinate=f'{model_abbrev}_BIAS', + determinate=f"{model_abbrev}_BIAS", utc_timestamp=timestamp, temperature=bias_adjusted_temp, relative_humidity=bias_adjusted_rh, precipitation=bias_adjusted_precip_24h, wind_speed=bias_adjusted_wind_speed, - wind_direction=bias_adjusted_wdir - )) - return post_process_fetched_predictions(results) + wind_direction=bias_adjusted_wdir, + ) + ) + post_processed_results = post_process_fetched_predictions(results) + cffdrs_end = perf_counter() + delta = cffdrs_end - cffdrs_start + # Any delta below 100 milliseconds is just noise in the logs. + if delta > 0.1: + logger.info("%f delta count before and after latest prediction model query", delta) + return post_processed_results def post_process_fetched_predictions(weather_indeterminates: List[WeatherIndeterminate]): @@ -193,26 +202,17 @@ async def marshall_predictions(session: Session, model: ModelEnum, station_codes # day, so we need to look at the accumulated precip from the previous model run to calculate the # delta_precip precip_value = None - if prediction.prediction_timestamp == prediction_model_run_timestamp.prediction_run_timestamp and \ - prediction.prediction_timestamp.hour > 0: - precip_value = _fetch_delta_precip_for_prev_model_run( - session, - model, - prediction, - station_predictions, - prediction_model_run_timestamp.prediction_run_timestamp) + if prediction.prediction_timestamp == prediction_model_run_timestamp.prediction_run_timestamp and prediction.prediction_timestamp.hour > 0: + precip_value = _fetch_delta_precip_for_prev_model_run(session, model, prediction, station_predictions, prediction_model_run_timestamp.prediction_run_timestamp) # This condition catches situations where we are not at hour 000 of the model run, or where it is # hour 000 but there was nothing returned from _fetch_delta_precip_for_prev_model_run() if precip_value is None: precip_value = prediction.delta_precip station_predictions[prediction.station_code][prediction.prediction_timestamp] = { - 'model_run': WeatherModelRun( - datetime=prediction_model_run_timestamp.prediction_run_timestamp, - name=prediction_model.name, - abbreviation=model, - projection=prediction_model.projection + "model_run": WeatherModelRun( + datetime=prediction_model_run_timestamp.prediction_run_timestamp, name=prediction_model.name, abbreviation=model, projection=prediction_model.projection ), - 'prediction': WeatherModelPredictionValues( + "prediction": WeatherModelPredictionValues( temperature=prediction.tmp_tgl_2, bias_adjusted_temperature=prediction.bias_adjusted_temperature, relative_humidity=prediction.rh_tgl_2, @@ -220,8 +220,8 @@ async def marshall_predictions(session: Session, model: ModelEnum, station_codes delta_precipitation=precip_value, wind_speed=prediction.wind_tgl_10, wind_direction=prediction.wdir_tgl_10, - datetime=prediction.prediction_timestamp - ) + datetime=prediction.prediction_timestamp, + ), } # Re-structure the data, grouping data by station and model run. @@ -231,19 +231,12 @@ async def marshall_predictions(session: Session, model: ModelEnum, station_codes for station_code, predictions in station_predictions.items(): model_run_dict = {} for prediction in predictions.values(): - - if prediction['model_run'].datetime in model_run_dict: - model_run_predictions = model_run_dict[prediction['model_run'].datetime] + if prediction["model_run"].datetime in model_run_dict: + model_run_predictions = model_run_dict[prediction["model_run"].datetime] else: - model_run_predictions = ModelRunPredictions( - model_run=prediction['model_run'], - values=[] - ) - model_run_dict[prediction['model_run'].datetime] = model_run_predictions - model_run_predictions.values.append(prediction['prediction']) + model_run_predictions = ModelRunPredictions(model_run=prediction["model_run"], values=[]) + model_run_dict[prediction["model_run"].datetime] = model_run_predictions + model_run_predictions.values.append(prediction["prediction"]) - response.append(WeatherStationModelRunsPredictions( - station=stations[station_code], - model_runs=list(model_run_dict.values()) - )) + response.append(WeatherStationModelRunsPredictions(station=stations[station_code], model_runs=list(model_run_dict.values()))) return response diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index d45936432..3638c085e 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,6 +1,8 @@ -FROM ankane/pgslice:v0.6.1 +FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04 -RUN apk update && apk add unzip bash +RUN apt-get update && \ + apt-get install -y build-essential libpq-dev postgresql-client-16 ruby-full && \ + gem install pgslice # Download the Amazon CLI installer. ADD "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" /tmp/awscliv2.zip @@ -10,6 +12,5 @@ USER root RUN unzip /tmp/awscliv2.zip -d /tmp/ &&\ /tmp/aws/install - COPY fill_partition_data.sh . COPY partition_and_archive.sh . \ No newline at end of file diff --git a/openshift/pgslice/docker/fill_partition_data.sh b/openshift/pgslice/docker/fill_partition_data.sh index a7556a61b..71fb1bd87 100755 --- a/openshift/pgslice/docker/fill_partition_data.sh +++ b/openshift/pgslice/docker/fill_partition_data.sh @@ -49,12 +49,13 @@ then exit 1 fi -export PGSLICE_URL="postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}" +ENCODED_PASS=$(python3 -c "import urllib.parse; print(urllib.parse.quote('${PG_PASSWORD}'))") +PGSLICE_URL=postgresql://${PG_USER}:${ENCODED_PASS}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE} # Fill the partitions with data from the original table -pgslice fill $TABLE +pgslice fill $TABLE --url $PGSLICE_URL # Analyze for query planner -pgslice analyze $TABLE +pgslice analyze $TABLE --url $PGSLICE_URL # Swap the intermediate table with the original table -pgslice swap $TABLE +pgslice swap $TABLE --url $PGSLICE_URL # Fill the rest (rows inserted between the first fill and the swap) -pgslice fill $TABLE --swapped \ No newline at end of file +pgslice fill $TABLE --swapped --url $PGSLICE_URL \ No newline at end of file diff --git a/openshift/pgslice/docker/partition_and_archive.sh b/openshift/pgslice/docker/partition_and_archive.sh index bd1821980..b34e3bed7 100755 --- a/openshift/pgslice/docker/partition_and_archive.sh +++ b/openshift/pgslice/docker/partition_and_archive.sh @@ -49,17 +49,26 @@ then exit 1 fi -export PGSLICE_URL = "postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}" +ENCODED_PASS=$(python3 -c "import urllib.parse; print(urllib.parse.quote('${PG_PASSWORD}'))") +PGSLICE_URL=postgresql://${PG_USER}:${ENCODED_PASS}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE} # Add partitions to the intermediate table (assumes it already exists) -pgslice add_partitions $TABLE --intermediate --future 1 +pgslice add_partitions $TABLE --intermediate --future 1 --url $PGSLICE_URL # Fill the partitions with data from the original table -pgslice fill $TABLE +pgslice fill $TABLE --url $PGSLICE_URL # Analyze for query planner -pgslice analyze $TABLE +pgslice analyze $TABLE --url $PGSLICE_URL # Swap the intermediate table with the original table -pgslice swap $TABLE +pgslice swap $TABLE --url $PGSLICE_URL # Fill the rest (rows inserted between the first fill and the swap) -pgslice fill $TABLE --swapped +pgslice fill $TABLE --swapped --url $PGSLICE_URL + + # Dump any retired tables to S3 and drop -pg_dump -c -Fc -t ${TABLE}_retired $PGSLICE_URL | gzip | AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY}" AWS_SECRET_ACCESS_KEY="${AWS_SECRET_KEY}" aws --endpoint="https://${AWS_HOSTNAME}" s3 cp - "s3://${AWS_BUCKET}/retired/${TABLE}_retired.dump.gz" +# borrowing a lot from https://github.com/BCDevOps/backup-container +_timestamp=`date +\%Y-\%m-\%d_%H-%M-%S` +_datestamp=`date +\%Y/\%m` +_target_filename="${PG_HOSTNAME}_${TABLE}_retired_${_timestamp}.sql.gz" +_target_folder="${PG_HOSTNAME}_${PG_DATABASE}/${_datestamp}" + +pg_dump -c -Fc -t ${TABLE}_retired $PGSLICE_URL | gzip | AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY AWS_SECRET_ACCESS_KEY=$AWS_SECRET_KEY aws --endpoint="https://${AWS_HOSTNAME}" s3 cp - "s3://${AWS_BUCKET}/retired/${_target_folder}/${_target_filename}" psql -c "DROP TABLE ${TABLE}_retired" $PGSLICE_URL \ No newline at end of file