Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enables bias adjusted wind direction #3149

Merged
merged 20 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/app/jobs/common_model_fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
delete_weather_station_model_predictions,
refresh_morecast2_materialized_view)
from app.weather_models.machine_learning import StationMachineLearning
from app.weather_models import ModelEnum, construct_interpolated_noon_prediction
from app.weather_models import SCALAR_MODEL_VALUE_KEYS, ModelEnum, construct_interpolated_noon_prediction
from app.schemas.stations import WeatherStation
from app import config, configure_logging
import app.utils.time as time_utils
Expand Down Expand Up @@ -362,7 +362,8 @@
if (prev_prediction is not None
and prev_prediction.prediction_timestamp.hour == 18
and prediction.prediction_timestamp.hour == 21):
noon_prediction = construct_interpolated_noon_prediction(prev_prediction, prediction)
noon_prediction = construct_interpolated_noon_prediction(

Check warning on line 365 in api/app/jobs/common_model_fetchers.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/common_model_fetchers.py#L365

Added line #L365 was not covered by tests
prev_prediction, prediction, SCALAR_MODEL_VALUE_KEYS)
self._process_prediction(
noon_prediction, station, model_run, machine)
self._process_prediction(
Expand Down
12 changes: 11 additions & 1 deletion api/app/tests/weather_models/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,68 @@ def get_actuals_left_outer_join_with_predictions(*args):
weather_date=datetime(2020, 10, 10, 18),
temperature=20,
temp_valid=True,
wind_direction=90,
relative_humidity=50,
rh_valid=True),
ModelRunPrediction(
ModelRunPrediction(
tmp_tgl_2=2,
rh_tgl_2=10,
apcp_sfc_0=2,
wdir_tgl_10=97,
prediction_timestamp=datetime(2020, 10, 10, 18))],
[HourlyActual(weather_date=datetime(2020, 10, 10, 19)), None],
[HourlyActual(weather_date=datetime(2020, 10, 10, 20),
temperature=25,
wind_direction=270,
temp_valid=True,
relative_humidity=70,
rh_valid=True), None],
[HourlyActual(
weather_date=datetime(2020, 10, 10, 21),
temperature=30,
temp_valid=True,
wind_direction=120,
relative_humidity=100,
rh_valid=True),
ModelRunPrediction(
tmp_tgl_2=1,
rh_tgl_2=20,
apcp_sfc_0=3,
wdir_tgl_10=101,
prediction_timestamp=datetime(2020, 10, 10, 21))],
# day 2
[HourlyActual(
weather_date=datetime(2020, 10, 11, 18),
temperature=20,
temp_valid=True,
wind_direction=121,
relative_humidity=50,
rh_valid=True),
ModelRunPrediction(
tmp_tgl_2=2,
rh_tgl_2=10,
apcp_sfc_0=2,
wdir_tgl_10=110,
prediction_timestamp=datetime(2020, 10, 11, 18))],
[HourlyActual(weather_date=datetime(2020, 10, 11, 19)), None],
[HourlyActual(weather_date=datetime(2020, 10, 11, 20),
temperature=27,
temp_valid=True,
wind_direction=98,
relative_humidity=60,
rh_valid=True), None],
[HourlyActual(
weather_date=datetime(2020, 10, 11, 21),
temperature=30,
wind_direction=118,
temp_valid=True,
relative_humidity=100,
rh_valid=True),
ModelRunPrediction(
tmp_tgl_2=1,
rh_tgl_2=20,
apcp_sfc_0=3,
wdir_tgl_10=111,
prediction_timestamp=datetime(2020, 10, 11, 21))]
]
return result
5 changes: 3 additions & 2 deletions api/app/weather_models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ def interpolate_wind_direction(prediction_a: ModelRunPrediction,


def construct_interpolated_noon_prediction(prediction_a: ModelRunPrediction,
prediction_b: ModelRunPrediction):
prediction_b: ModelRunPrediction,
model_keys):
""" Construct a noon prediction by interpolating.
"""
# create a noon prediction. (using utc hour 20, as that is solar noon in B.C.)
Expand All @@ -121,7 +122,7 @@ def construct_interpolated_noon_prediction(prediction_a: ModelRunPrediction,
timestamp_b = prediction_b.prediction_timestamp.timestamp()
noon_timestamp = noon_prediction.prediction_timestamp.timestamp()
# calculate interpolated values.
for key in SCALAR_MODEL_VALUE_KEYS:
for key in model_keys:
value_a = getattr(prediction_a, key)
value_b = getattr(prediction_b, key)
if value_a is None or value_b is None:
Expand Down
85 changes: 23 additions & 62 deletions api/app/weather_models/machine_learning.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
from app.db.models.weather_models import (PredictionModel, ModelRunPrediction)
from app.db.models.observations import HourlyActual
from app.db.crud.observations import get_actuals_left_outer_join_with_predictions
from app.weather_models.regression_model import RegressionModelsV2
from app.weather_models.sample import Samples


logger = getLogger(__name__)

# Corresponding key values on HourlyActual and SampleCollection
SAMPLE_VALUE_KEYS = ('temperature', 'relative_humidity', 'wind_speed', 'wind_direction')
SAMPLE_VALUE_KEYS = ('temperature', 'relative_humidity', 'wind_speed')
# Number of days of historical actual data to learn from when training model
MAX_DAYS_TO_LEARN = 19

Expand All @@ -38,59 +40,12 @@
"""

keys = ('temperature_wrapper', 'relative_humidity_wrapper',
'wind_speed_wrapper', 'wind_direction_wrapper')
'wind_speed_wrapper')

def __init__(self):
self.temperature_wrapper = LinearRegressionWrapper()
self.relative_humidity_wrapper = LinearRegressionWrapper()
self.wind_speed_wrapper = LinearRegressionWrapper()
self.wind_direction_wrapper = LinearRegressionWrapper()


class Samples:
""" Class for storing samples in buckets of hours.
e.g. a temperature sample consists of an x axis (predicted values) and a y axis (observed values) put
together in hour buckets.
"""

def __init__(self):
self._x = defaultdict(list)
self._y = defaultdict(list)

def hours(self):
""" Return all the hours used to bucket samples together. """
return self._x.keys()

def append_x(self, value, timestamp: datetime):
""" Append another predicted value. """
self._x[timestamp.hour].append(value)

def append_y(self, value, timestamp: datetime):
""" Append another observered values. """
self._y[timestamp.hour].append(value)

def np_x(self, hour):
""" Return numpy array of the predicted values, reshaped appropriately. """
return np.array(self._x[hour]).reshape((-1, 1))

def np_y(self, hour):
""" Return a numpy array of the observed values """
return np.array(self._y[hour])

def add_sample(self,
model_value: float,
actual_value: float,
timestamp: datetime,
model_key: str,
sample_key: str):
""" Add a sample, interpolating the model values spatially """
# Additional logging to assist with finding errors:
logger.info('adding sample for %s->%s with: model_values %s, actual_value: %s',
model_key, sample_key, model_value, actual_value)
# Add to the data we're going to learn from:
# Using two variables, the interpolated temperature value, and the hour of the day.
self.append_x(model_value, timestamp)
self.append_y(actual_value, timestamp)


class SampleCollection:
Expand All @@ -100,7 +55,6 @@
self.temperature = Samples()
self.relative_humidity = Samples()
self.wind_speed = Samples()
self.wind_direction = Samples()


class StationMachineLearning:
Expand All @@ -126,6 +80,7 @@
self.target_coordinate = target_coordinate
self.station_code = station_code
self.regression_models = defaultdict(RegressionModels)
self.regression_models_v2 = RegressionModelsV2()
self.max_learn_date = max_learn_date
# Maximum number of days to try to learn from. Experimentation has shown that
# about two weeks worth of data starts giving fairly good results compared to human forecasters.
Expand All @@ -152,12 +107,9 @@
# are None.
logger.warning('no model value for %s->%s', model_key, sample_key)

def _collect_data(self):
def _collect_data(self, start_date: datetime):
""" Collect data to use for machine learning.
"""
# Calculate the date to start learning from.
start_date = self.max_learn_date - \
timedelta(days=self.max_days_to_learn)
# Create a convenient structure to store samples in.
sample_collection = SampleCollection()

Expand All @@ -177,7 +129,8 @@
and prev_prediction.prediction_timestamp.hour == 18):
# If there's a gap in the data (like with the GLOBAL model) - then make up
# a noon prediction using interpolation, and add it as a sample.
noon_prediction = construct_interpolated_noon_prediction(prev_prediction, prediction)
noon_prediction = construct_interpolated_noon_prediction(
prev_prediction, prediction, SCALAR_MODEL_VALUE_KEYS)
self._add_sample_to_collection(
noon_prediction, prev_actual, sample_collection)

Expand All @@ -190,8 +143,12 @@
def learn(self):
""" Collect data and perform linear regression.
"""
# Calculate the date to start learning from.
start_date = self.max_learn_date - \
timedelta(days=self.max_days_to_learn)

# collect data
data = self._collect_data()
data = self._collect_data(start_date)

# iterate through the data, creating a regression model for each variable
# and each hour.
Expand All @@ -206,6 +163,12 @@
# how much sample data we actually had etc., and then not mark the model as being "good".
regression_model.good_model = True

# wdir specific using new structure for regression handling
query = get_actuals_left_outer_join_with_predictions(
self.session, self.model.id, self.station_code, start_date, self.max_learn_date)
self.regression_models_v2.collect_data(query)
self.regression_models_v2.train()

def predict_temperature(self, model_temperature: float, timestamp: datetime):
""" Predict the bias adjusted temperature for a given point in time, given a corresponding model
temperature.
Expand Down Expand Up @@ -255,9 +218,7 @@
: return: The bias-adjusted wind direction as predicted by the linear regression model.
"""
hour = timestamp.hour
if self.regression_models[hour].wind_direction_wrapper.good_model and model_wind_dir is not None:
predicted_wind_dir = self.regression_models[hour].wind_direction_wrapper.model.predict([[model_wind_dir]])[
0]
# a valid wind direction value is between 0 and 360. If the returned value is outside these bounds, correct it
return predicted_wind_dir % 360
return None
predicted_wind_dir = self.regression_models_v2._models[0].predict(hour, [[model_wind_dir]])
if predicted_wind_dir is None:
return None
return predicted_wind_dir % 360

Check warning on line 224 in api/app/weather_models/machine_learning.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/machine_learning.py#L221-L224

Added lines #L221 - L224 were not covered by tests
128 changes: 128 additions & 0 deletions api/app/weather_models/regression_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import logging
import math
from sklearn.exceptions import NotFittedError
from sklearn.linear_model import LinearRegression
from typing import Dict, List, Protocol
from collections import defaultdict
from abc import abstractmethod
from app.db.models.observations import HourlyActual
from app.db.models.weather_models import ModelRunPrediction
from app.weather_models import construct_interpolated_noon_prediction
from app.weather_models.sample import Samples

logger = logging.getLogger(__name__)

# maps weather orm model keys to actual weather orm model keys
model_2_actual_keys: Dict[str, str] = {
"wdir_tgl_10": "wind_direction"
}


class RegressionModelProto(Protocol):
_key: str
_models: defaultdict[int, LinearRegression]
_samples: Samples

@abstractmethod
def add_sample(self,
prediction: ModelRunPrediction,
actual: HourlyActual): raise NotImplementedError

@abstractmethod
def train(self): raise NotImplementedError

@abstractmethod
def predict(self, hour: int, model_wind_dir: List[List[int]]): raise NotImplementedError


class RegressionModel(RegressionModelProto):
"""
Default class to manage a regression dataset
"""

def __init__(self, model_key: str):
self._key = model_key
self._models = defaultdict(LinearRegression)
self._samples = Samples()

def train(self):
for hour in self._samples.hours():
self._models[hour].fit(self._samples.np_x(hour), self._samples.np_y(hour))
self._is_fitted = True

def get_model_at_hour(self, hour: int):
return self._models[hour]

Check warning on line 54 in api/app/weather_models/regression_model.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/regression_model.py#L54

Added line #L54 was not covered by tests

def predict(self, hour: int, model_wind_dir: List[List[int]]):
try:
prediction = self._models[hour].predict(model_wind_dir)
logger.info("Predicted wind dir for model: %s, hour: %s, prediction: %s", self._key, hour, prediction)
return prediction[0]
except NotFittedError as _:
return None

Check warning on line 62 in api/app/weather_models/regression_model.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/regression_model.py#L57-L62

Added lines #L57 - L62 were not covered by tests

def add_sample(self,
prediction: ModelRunPrediction,
actual: HourlyActual):
""" Add a sample, interpolating the model values spatially """

model_value = getattr(prediction, self._key)
actual_value = getattr(actual, model_2_actual_keys[self._key])

logger.info('adding sample for %s->%s with: model_values %s, actual_value: %s',
self._key, self._key, model_value, actual_value)

if model_value is not None:
if actual_value is None or math.isnan(actual_value):
# If for whatever reason we don't have an actual value, we skip this one.
logger.warning('no actual value for model key: %s, actual key: %s',

Check warning on line 78 in api/app/weather_models/regression_model.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/regression_model.py#L78

Added line #L78 was not covered by tests
self._key, model_2_actual_keys[self._key])
return

Check warning on line 80 in api/app/weather_models/regression_model.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/regression_model.py#L80

Added line #L80 was not covered by tests

# Add to the data we're going to learn from:
# Using two variables, the interpolated temperature value, and the hour of the day.
self._samples.append_x(model_value, actual.weather_date)
self._samples.append_y(actual_value, actual.weather_date)


class RegressionModelsV2:
""" Class for storing regression models.
TODO: migrate other models to this once wind direction is verified
"""

def __init__(self):
self._model_keys: List[str] = list(model_2_actual_keys.keys())
self._models: List[RegressionModelProto] = [
RegressionModel(model_key=self._model_keys[0])
]

def add_samples(self, prediction: ModelRunPrediction, actual: HourlyActual):
for model in self._models:
model.add_sample(prediction, actual)

def collect_data(self, query):
# We need to keep track of previous so that we can do interpolation for the global model.
prev_actual = None
prev_prediction = None
for actual, prediction in query:
if prev_actual != actual and prediction is not None:
if (prev_actual is not None
and prev_prediction is not None
and prev_actual.weather_date.hour == 20
and prediction.prediction_timestamp.hour == 21
and prev_prediction.prediction_timestamp.hour == 18):
# If there's a gap in the data (like with the GLOBAL model) - then make up
# a noon prediction using interpolation, and add it as a sample.
noon_prediction = construct_interpolated_noon_prediction(
prev_prediction, prediction, self._model_keys)
self.add_samples(noon_prediction, prev_actual)

self.add_samples(prediction, actual)
prev_prediction = prediction
prev_actual = actual

def train(self):
# iterate through the data, creating a regression model for each variable
# and each hour.
for model in self._models:
model.train()
Loading
Loading