Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bias adjusted precip #3202

Merged
merged 11 commits into from
Nov 1, 2023
68 changes: 68 additions & 0 deletions api/alembic/versions/f2e027a47a3f_bias_adjusted_precip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""bias adjusted precip

Revision ID: f2e027a47a3f
Revises: 5b745fe0bd7a
Create Date: 2023-10-30 11:34:21.603046

"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = 'f2e027a47a3f'
down_revision = '5b745fe0bd7a'
branch_labels = None
depends_on = None


def upgrade():
op.add_column('weather_station_model_predictions', sa.Column('bias_adjusted_precip_24h', sa.Float(), nullable=True))
# Drop morecast_2_materialized view and recreate with the bias_adjusted_precip_24h field
op.execute("DROP MATERIALIZED VIEW morecast_2_materialized_view;")
op.execute("""
CREATE MATERIALIZED VIEW morecast_2_materialized_view AS
SELECT weather_station_model_predictions.prediction_timestamp, prediction_models.abbreviation, weather_station_model_predictions.station_code,
weather_station_model_predictions.rh_tgl_2, weather_station_model_predictions.tmp_tgl_2, weather_station_model_predictions.bias_adjusted_temperature,
weather_station_model_predictions.bias_adjusted_rh, weather_station_model_predictions.precip_24h, weather_station_model_predictions.wdir_tgl_10,
weather_station_model_predictions.wind_tgl_10, weather_station_model_predictions.bias_adjusted_wind_speed, weather_station_model_predictions.bias_adjusted_wdir,
weather_station_model_predictions.bias_adjusted_precip_24h, weather_station_model_predictions.update_date,
weather_station_model_predictions.prediction_model_run_timestamp_id
FROM weather_station_model_predictions
JOIN prediction_model_run_timestamps
ON weather_station_model_predictions.prediction_model_run_timestamp_id = prediction_model_run_timestamps.id JOIN prediction_models
ON prediction_model_run_timestamps.prediction_model_id = prediction_models.id
JOIN (
SELECT max(weather_station_model_predictions.prediction_timestamp) AS latest_prediction, weather_station_model_predictions.station_code AS station_code,
date(weather_station_model_predictions.prediction_timestamp) AS unique_day
FROM weather_station_model_predictions
WHERE date_part('hour', weather_station_model_predictions.prediction_timestamp) = 20
GROUP BY weather_station_model_predictions.station_code, date(weather_station_model_predictions.prediction_timestamp)
) AS latest
ON weather_station_model_predictions.prediction_timestamp = latest.latest_prediction AND weather_station_model_predictions.station_code = latest.station_code
ORDER BY weather_station_model_predictions.update_date DESC;""")


def downgrade():
# Drop morecast_2_materialized view before dropping the column in the table in order to avoid dependency issues
op.execute("DROP MATERIALIZED VIEW morecast_2_materialized_view;")
op.drop_column('weather_station_model_predictions', 'bias_adjusted_precip_24h')
op.execute("""
CREATE MATERIALIZED VIEW morecast_2_materialized_view AS
SELECT weather_station_model_predictions.prediction_timestamp, prediction_models.abbreviation, weather_station_model_predictions.station_code,
weather_station_model_predictions.rh_tgl_2, weather_station_model_predictions.tmp_tgl_2, weather_station_model_predictions.bias_adjusted_temperature,
weather_station_model_predictions.bias_adjusted_rh, weather_station_model_predictions.precip_24h, weather_station_model_predictions.wdir_tgl_10,
weather_station_model_predictions.wind_tgl_10, weather_station_model_predictions.bias_adjusted_wind_speed, weather_station_model_predictions.bias_adjusted_wdir,
weather_station_model_predictions.update_date
FROM weather_station_model_predictions
JOIN prediction_model_run_timestamps
ON weather_station_model_predictions.prediction_model_run_timestamp_id = prediction_model_run_timestamps.id JOIN prediction_models
ON prediction_model_run_timestamps.prediction_model_id = prediction_models.id
JOIN (
SELECT max(weather_station_model_predictions.prediction_timestamp) AS latest_prediction, weather_station_model_predictions.station_code AS station_code,
date(weather_station_model_predictions.prediction_timestamp) AS unique_day
FROM weather_station_model_predictions
WHERE date_part('hour', weather_station_model_predictions.prediction_timestamp) = 20
GROUP BY weather_station_model_predictions.station_code, date(weather_station_model_predictions.prediction_timestamp)
) AS latest
ON weather_station_model_predictions.prediction_timestamp = latest.latest_prediction AND weather_station_model_predictions.station_code = latest.station_code
ORDER BY weather_station_model_predictions.update_date DESC;""")
76 changes: 74 additions & 2 deletions api/app/db/crud/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
"""
import datetime
from typing import List
from sqlalchemy import and_, select
from sqlalchemy import and_, select, text
from sqlalchemy.sql import func
from sqlalchemy.orm import Session
from app.db.models.weather_models import ModelRunPrediction, PredictionModelRunTimestamp
from app.db.models.weather_models import (ModelRunPrediction, PredictionModel, PredictionModelRunTimestamp,
WeatherStationModelPrediction)
from app.db.models.observations import HourlyActual


Expand Down Expand Up @@ -69,3 +70,74 @@ def get_accumulated_precipitation(session: Session, station_code: int, start_dat
if result is None:
return 0
return result


def get_accumulated_precip_by_24h_interval(session: Session, station_code: int, start_datetime: datetime, end_datetime: datetime):
""" Get the accumulated precip for 24 hour intervals for a given station code within the specified time interval.
:param session: The ORM/database session.
:param station_code: The numeric code identifying the weather station of interest.
:param start_datetime: The earliest date and time of interest.
:param end_datetime: The latest date and time of interest.

Note: I couldn't construct this query in SQLAlchemy, hence the need for the 'text' based query.

generate_series(\'{}\', \'{}\', '24 hours'::interval)

This gives us a one column table of dates separated by 24 hours between the start and end dates. For example, if start and end dates are 2023-10-31 20:00:00 to 2023-11-03 20:00:00 we would have a table like:

day
2023-10-31 20:00:00
2023-11-01 20:00:00
2023-11-02 20:00:00
2023-11-03 20:00:00

We then join the HourlyActuals table so that we can sum hourly precip in a 24 hour period. The join is based on the weather_date field in the HourlyActuals table being in a 24 hour range using this odd looking syntax:

weather_date <@ tstzrange(day, day + '24 hours', '(]')

Using 2023-10-31 20:00:00 as an example, rows with the following dates would match. The (] syntax means the lower bound is excluded but the upper bound is included.

2023-10-31 21:00:00
2023-10-31 22:00:00
2023-10-31 23:00:00
2023-11-01 00:00:00
2023-11-01 01:00:00
....
2023-11-01 19:00:00
2023-11-01 20:00:00
"""
stmt = """
SELECT day, station_code, sum(precipitation) actual_precip_24h
FROM
generate_series(\'{}\', \'{}\', '24 hours'::interval) day
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with this syntax, but is this creating a series of days based on 24 hour intervals, then selecting each weather date based on it's hour being less than the subset of the series, then summing up the precip based on the hourly actuals that fit in that window? (There's probably a better way of expressing this in English)...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have/need an index on hourly_actuals.weather_date for this query?

Copy link
Collaborator Author

@dgboss dgboss Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, you have the gist of it. I'm glad you asked this question, I think I found an off-by-one day logic error.

I'll add to the comment with a description of what this is doing.

generate_series(\'{}\', \'{}\', '24 hours'::interval)
This gives us a one column table of dates separated by 24 hours between the start and end dates. For example, if start and end dates are 2023-10-31 20:00:00 to 2023-11-03 20:00:00 we would have a table like:

day
2023-10-31 20:00:00
2023-11-01 20:00:00
2023-11-02 20:00:00
2023-11-03 20:00:00

We then join the HourlyActuals table so that we can sum hourly precip in a 24 hour period. The join is based on the weather_date field in the HourlyActuals table being in a 24 hour range using this odd looking syntax:

weather_date <@ tstzrange(day, day + '24 hours', '(]')

Using 2023-10-31 20:00:00 as an example, rows with the following dates would match. The (] syntax means the lower bound is excluded but the upper bound is included.

2023-10-31 21:00:00
2023-10-31 22:00:00
2023-10-31 23:00:00
2023-11-01 00:00:00
2023-11-01 01:00:00
....
2023-11-01 19:00:00
2023-11-01 20:00:00

My off by one error is that right now I am attributing this accumulated precipitation to October 31, but it should actually be for Nov 1 (we need the preceding 24 hour precip).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An index on weather_date is a great idea!

LEFT JOIN
hourly_actuals
ON
weather_date <@ tstzrange(day - INTERVAL '24 hours', day, '(]')
WHERE
station_code = {}
GROUP BY
day, station_code;
""".format(start_datetime, end_datetime, station_code)
result = session.execute(text(stmt))
return result.all()


def get_predicted_daily_precip(session: Session, model: PredictionModel, station_code: int, start_datetime: datetime, end_datetime: datetime):
""" Gets rows from WeatherStationModelPrediction for the given model and station within the
specified time interval at 20:00:00 UTC each day.
:param session: The ORM/database session
:param model: The numeric weather prediction model
:param station_code: The code identifying the weather station.
:param start_datetime: The earliest date and time of interest.
:param end_datetime: The latest date and time of interest.
"""
result = session.query(WeatherStationModelPrediction)\
.join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id)\
.filter(PredictionModelRunTimestamp.prediction_model_id == model.id)\
.filter(WeatherStationModelPrediction.station_code == station_code)\
.filter(WeatherStationModelPrediction.prediction_timestamp >= start_datetime)\
.filter(WeatherStationModelPrediction.prediction_timestamp < end_datetime)\
.filter(func.date_part('hour', WeatherStationModelPrediction.prediction_timestamp) == 20)\
.order_by(WeatherStationModelPrediction.prediction_timestamp)
return result.all()
1 change: 1 addition & 0 deletions api/app/db/crud/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def get_latest_station_prediction_mat_view(session: Session,
MoreCast2MaterializedView.bias_adjusted_wind_speed,
MoreCast2MaterializedView.bias_adjusted_wdir,
MoreCast2MaterializedView.precip_24h,
MoreCast2MaterializedView.bias_adjusted_precip_24h,
MoreCast2MaterializedView.wdir_tgl_10,
MoreCast2MaterializedView.wind_tgl_10,
MoreCast2MaterializedView.update_date).\
Expand Down
5 changes: 4 additions & 1 deletion api/app/db/models/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,13 @@ class WeatherStationModelPrediction(Base):
default=time_utils.get_utc_now(), index=True)
# Precipitation predicted for the previous 24 hour period.
precip_24h = Column(Float, nullable=True)
# Precipitation predicted for the previous 24 hour period adjusted for bias.
bias_adjusted_precip_24h = Column(Float, nullable=True)

def __str__(self):
return ('{self.station_code} {self.prediction_timestamp} {self.tmp_tgl_2} {self.bias_adjusted_temperature} '
'{self.rh_tgl_2} {self.bias_adjusted_rh} {self.wdir_tgl_10} {self.bias_adjusted_wdir} {self.wind_tgl_10} '
'{self.bias_adjusted_wind_speed} {self.apcp_sfc_0} {self.delta_precip}').format(self=self)
'{self.bias_adjusted_wind_speed} {self.apcp_sfc_0} {self.delta_precip} {self.bias_adjusted_precip_24h}').format(self=self)


class MoreCast2MaterializedView(Base):
Expand All @@ -267,6 +269,7 @@ class MoreCast2MaterializedView(Base):
primary_key=True, nullable=False, index=True)
abbreviation = Column(String, nullable=False)
apcp_sfc_0 = Column(Float, nullable=False)
bias_adjusted_precip_24h = Column(Float, nullable=False)
bias_adjusted_rh = Column(Float, nullable=False)
bias_adjusted_temperature = Column(Float, nullable=False)
bias_adjusted_wind_speed = Column(Float, nullable=False)
Expand Down
7 changes: 6 additions & 1 deletion api/app/jobs/common_model_fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@
station_prediction.apcp_sfc_0 = 0.0
else:
station_prediction.apcp_sfc_0 = prediction.apcp_sfc_0
# Calculate the delta_precipitation based on station's previous prediction_timestamp
# Calculate the delta_precipitation and 24 hour precip based on station's previous prediction_timestamp
# for the same model run
self.session.flush()
station_prediction.precip_24h = self._calculate_past_24_hour_precip(
Expand Down Expand Up @@ -280,6 +280,11 @@
station_prediction.wdir_tgl_10,
station_prediction.prediction_timestamp
)
# Predict the 24 hour precipitation
station_prediction.bias_adjusted_precip_24h = machine.predict_precipitation(

Check warning on line 284 in api/app/jobs/common_model_fetchers.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/common_model_fetchers.py#L284

Added line #L284 was not covered by tests
station_prediction.precip_24h,
station_prediction.prediction_timestamp
)

# Update the update time (this might be an update)
station_prediction.update_date = time_utils.get_utc_now()
Expand Down
4 changes: 2 additions & 2 deletions api/app/tests/fixtures/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def guess_filename(self, url: str, params: dict):
if params:
filename = filename + '_'
for key, value in params.items():
if type(value) == str:
if type(value) == str: # noqa: E721
value = value.replace(' ', '_')
filename = '{}_{}_{}'.format(filename, key, value)
filename = filename[:200] + '.json'
Expand All @@ -110,7 +110,7 @@ def lookup_fixture_filename(self,
lookup = nested_defaultdict(lookup)
fixture = lookup[url][verb][str(params)][str(data)]

if type(fixture) is not str:
if type(fixture) is not str: # noqa: E721
if config.get('AUTO_MAKE_FIXTURES'):
logger.warning('inserting fixture into lookup')
lookup[url][verb][str(params)][str(data)] = self.guess_filename(url, params)
Expand Down
41 changes: 40 additions & 1 deletion api/app/tests/weather_models/crud.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
""" Some crud responses used to mock our calls to app.db.crud
"""
from datetime import datetime
from app.db.models.weather_models import ModelRunPrediction
from app.db.models.weather_models import ModelRunPrediction, WeatherStationModelPrediction
from app.db.models.observations import HourlyActual


class MockActualPrecip:
day: datetime
actual_precip_24h: float

def __init__(self, day, actual_precip_24h):
self.day=day
self.actual_precip_24h=actual_precip_24h


def get_actuals_left_outer_join_with_predictions(*args):
""" Fixed response as replacement for app.db.crud.observations.get_actuals_left_outer_join_with_predictions
"""
Expand Down Expand Up @@ -89,3 +98,33 @@ def get_actuals_left_outer_join_with_predictions(*args):
prediction_timestamp=datetime(2020, 10, 11, 21))]
]
return result

def get_accumulated_precip_by_24h_interval(*args):
""" Fixed response as replacement for app.db.crud.observations.get_accumulated_precip_by_24h_interval
"""
return [
MockActualPrecip(
day=datetime(2023,10,10,20,0,0),
actual_precip_24h=3

),
MockActualPrecip(
day=datetime(2023,10,11,20,0,0),
actual_precip_24h=3
)
]


def get_predicted_daily_precip(*args):
return [
WeatherStationModelPrediction(
bias_adjusted_precip_24h=None,
precip_24h=3,
prediction_timestamp=datetime(2023,10,10,20,0,0)
),
WeatherStationModelPrediction(
bias_adjusted_precip_24h=None,
precip_24h=3,
prediction_timestamp=datetime(2023,10,11,20,0,0)
)
]
4 changes: 0 additions & 4 deletions api/app/tests/weather_models/test_env_canada_gdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import pytest
import requests
from sqlalchemy.orm import Session
from shapely import wkt
from app.jobs import env_canada
from app.jobs import common_model_fetchers
import app.utils.time as time_utils
Expand Down Expand Up @@ -48,9 +47,6 @@ def mock_get_actuals_left_outer_join_with_predictions(monkeypatch):
@pytest.fixture()
def mock_database(monkeypatch):
""" Mocked out database queries """
geom = ("POLYGON ((-120.525 50.77500000000001, -120.375 50.77500000000001,-120.375 50.62500000000001,"
" -120.525 50.62500000000001, -120.525 50.77500000000001))")
shape = wkt.loads(geom)
gdps_url = ('https://dd.weather.gc.ca/model_gem_global/15km/grib2/lat_lon/00/000/'
'CMC_glb_TMP_TGL_2_latlon.15x.15_2020052100_P000.grib2')
gdps_processed_model_run = ProcessedModelRunUrl(url=gdps_url)
Expand Down
5 changes: 0 additions & 5 deletions api/app/tests/weather_models/test_env_canada_hrdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import Optional
import pytest
import requests
import shapely
import datetime
from sqlalchemy.orm import Session
from pytest_mock import MockerFixture
Expand All @@ -29,10 +28,6 @@
@pytest.fixture()
def mock_database(monkeypatch):
""" Mocked out database queries """
geom = ("POLYGON ((-120.525 50.77500000000001, -120.375 50.77500000000001,-120.375 50.62500000000001,"
" -120.525 50.62500000000001, -120.525 50.77500000000001))")
shape = shapely.wkt.loads(geom)

hrdps_url = 'https://dd.weather.gc.ca/model_hrdps/continental/2.5km/' \
'00/001/20200521T00Z_MSC_HRDPS_RH_AGL-2m_RLatLon0.0225_PT001H.grib2'
hrdps_processed_model_run = ProcessedModelRunUrl(url=hrdps_url)
Expand Down
5 changes: 0 additions & 5 deletions api/app/tests/weather_models/test_env_canada_rdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pytest
import requests
from typing import Optional
from shapely import wkt
from sqlalchemy.orm import Session
import app.utils.time as time_utils
import app.weather_models.process_grib
Expand All @@ -24,10 +23,6 @@
@pytest.fixture()
def mock_database(monkeypatch):
""" Mocked out database queries """
geom = ("POLYGON ((-120.525 50.77500000000001, -120.375 50.77500000000001,-120.375 50.62500000000001,"
" -120.525 50.62500000000001, -120.525 50.77500000000001))")
shape = wkt.loads(geom)

rdps_url = ('https://dd.weather.gc.ca/model_gem_regional/10km/grib2/00/034/'
'CMC_reg_RH_TGL_2_ps10km_2020052100_P034.grib2')
rdps_processed_model_run = ProcessedModelRunUrl(url=rdps_url)
Expand Down
Loading
Loading