Skip to content

Commit

Permalink
Partitioned table follow up (#4118)
Browse files Browse the repository at this point in the history
- Use python to url encode pg pass for connection
- Apply encoding fix to `partition_and_archive.sh`
- Replace materialized view lookup with partitioned table lookup
  • Loading branch information
conbrad authored Nov 21, 2024
1 parent 6743a6d commit bb74983
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 501 deletions.
388 changes: 146 additions & 242 deletions api/app/db/crud/weather_models.py

Large diffs are not rendered by default.

256 changes: 93 additions & 163 deletions api/app/jobs/common_model_fetchers.py

Large diffs are not rendered by default.

155 changes: 74 additions & 81 deletions api/app/weather_models/fetch/predictions.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
""" Code for fetching data for API.
"""
"""Code for fetching data for API."""

from itertools import groupby
import logging
from typing import List
import datetime
from datetime import time
from time import perf_counter
from collections import defaultdict
import pytz
from sqlalchemy.orm import Session
import app.db.database
from app.schemas.morecast_v2 import WeatherIndeterminate
from app.schemas.weather_models import (WeatherStationModelPredictionValues, WeatherModelPredictionValues, WeatherModelRun,
ModelRunPredictions,
WeatherStationModelRunsPredictions)
from app.schemas.weather_models import WeatherStationModelPredictionValues, WeatherModelPredictionValues, WeatherModelRun, ModelRunPredictions, WeatherStationModelRunsPredictions
from app.db.models.weather_models import WeatherStationModelPrediction
from app.db.crud.weather_models import (get_latest_station_model_prediction_per_day, get_station_model_predictions,
get_station_model_prediction_from_previous_model_run, get_latest_station_prediction_mat_view)
from app.db.crud.weather_models import (
get_latest_station_model_prediction_per_day,
get_station_model_predictions,
get_station_model_prediction_from_previous_model_run,
get_latest_station_prediction,
)
import app.stations
from app.utils.time import get_days_from_range
from app.weather_models import ModelEnum
Expand All @@ -25,34 +27,26 @@


class MatchingStationNotFoundException(Exception):
""" Exception raised when station cannot be found. """
"""Exception raised when station cannot be found."""


def _fetch_delta_precip_for_prev_model_run(
session: Session,
model: ModelEnum,
prediction: WeatherStationModelPrediction,
prev_station_predictions: dict,
prediction_model_run_timestamp: datetime.datetime):
session: Session, model: ModelEnum, prediction: WeatherStationModelPrediction, prev_station_predictions: dict, prediction_model_run_timestamp: datetime.datetime
):
# Look if we can find the previous value in memory
if prediction.prediction_timestamp in prev_station_predictions[prediction.station_code]:
prev_station_prediction = prev_station_predictions[prediction.station_code]
return prev_station_prediction[prediction.prediction_timestamp]['prediction'].delta_precipitation
return prev_station_prediction[prediction.prediction_timestamp]["prediction"].delta_precipitation
# Uh oh - couldn't find it - let's go look in the database.
# This should only happen in extreme edge cases!
prev_prediction = get_station_model_prediction_from_previous_model_run(
session, prediction.station_code, model, prediction.prediction_timestamp,
prediction_model_run_timestamp)
prev_prediction = get_station_model_prediction_from_previous_model_run(session, prediction.station_code, model, prediction.prediction_timestamp, prediction_model_run_timestamp)
if prev_prediction:
return prev_prediction.delta_precip
return None


async def fetch_model_run_predictions_by_station_code(
model: ModelEnum,
station_codes: List[int],
time_of_interest: datetime) -> List[WeatherStationModelRunsPredictions]:
""" Fetch model predictions from database based on list of station codes, for a specified datetime.
async def fetch_model_run_predictions_by_station_code(model: ModelEnum, station_codes: List[int], time_of_interest: datetime) -> List[WeatherStationModelRunsPredictions]:
"""Fetch model predictions from database based on list of station codes, for a specified datetime.
Predictions are grouped by station and model run.
"""
# We're interested in the 5 days prior to and 10 days following the time_of_interest.
Expand All @@ -62,40 +56,34 @@ async def fetch_model_run_predictions_by_station_code(


async def fetch_model_run_predictions_by_station_code_and_date_range(
model: ModelEnum,
station_codes: List[int],
start_time: datetime.datetime,
end_time: datetime.datetime) -> List[WeatherStationModelRunsPredictions]:
""" Fetch model predictions from database based on list of station codes and date range.
model: ModelEnum, station_codes: List[int], start_time: datetime.datetime, end_time: datetime.datetime
) -> List[WeatherStationModelRunsPredictions]:
"""Fetch model predictions from database based on list of station codes and date range.
Predictions are grouped by station and model run.
"""
# send the query (ordered by prediction date.)
with app.db.database.get_read_session_scope() as session:
historic_predictions = get_station_model_predictions(
session, station_codes, model, start_time, end_time)
historic_predictions = get_station_model_predictions(session, station_codes, model, start_time, end_time)

return await marshall_predictions(session, model, station_codes, historic_predictions)


async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_range(model: ModelEnum,
station_codes: List[int],
start_time: datetime.datetime,
end_time: datetime.datetime) -> List[WeatherStationModelRunsPredictions]:
async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_range(
model: ModelEnum, station_codes: List[int], start_time: datetime.datetime, end_time: datetime.datetime
) -> List[WeatherStationModelRunsPredictions]:
results = []
days = get_days_from_range(start_time, end_time)
stations = {station.code: station for station in await app.stations.get_stations_by_codes(station_codes)}

with app.db.database.get_read_session_scope() as session:

for day in days:
day_results = []
vancouver_tz = pytz.timezone("America/Vancouver")

day_start = vancouver_tz.localize(datetime.datetime.combine(day, time.min))
day_end = vancouver_tz.localize(datetime.datetime.combine(day, time.max))

daily_result = get_latest_station_model_prediction_per_day(
session, station_codes, model, day_start, day_end)
daily_result = get_latest_station_model_prediction_per_day(session, station_codes, model, day_start, day_end)
for id, timestamp, model_abbrev, station_code, rh, temp, bias_adjusted_temp, bias_adjusted_rh, precip_24hours, wind_dir, wind_speed, update_date in daily_result:
day_results.append(
WeatherStationModelPredictionValues(
Expand All @@ -110,8 +98,9 @@ async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_rang
wind_speed=wind_speed,
wind_direction=wind_dir,
datetime=timestamp,
update_date=update_date
))
update_date=update_date,
)
)
# sort the list by station_code
day_results.sort(key=lambda x: x.station.code)

Expand All @@ -124,10 +113,10 @@ async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_rang
return results


async def fetch_latest_model_run_predictions_by_station_code_and_date_range(session: Session,
station_codes: List[int],
start_time: datetime.datetime,
end_time: datetime.datetime) -> List[WeatherIndeterminate]:
async def fetch_latest_model_run_predictions_by_station_code_and_date_range(
session: Session, station_codes: List[int], start_time: datetime.datetime, end_time: datetime.datetime
) -> List[WeatherIndeterminate]:
cffdrs_start = perf_counter()
results: List[WeatherIndeterminate] = []
days = get_days_from_range(start_time, end_time)
stations = {station.code: station for station in await app.stations.get_stations_by_codes(station_codes)}
Expand All @@ -138,10 +127,22 @@ async def fetch_latest_model_run_predictions_by_station_code_and_date_range(sess
day_start = vancouver_tz.localize(datetime.datetime.combine(day, time.min))
day_end = vancouver_tz.localize(datetime.datetime.combine(day, time.max))

daily_result = get_latest_station_prediction_mat_view(
session, active_station_codes, day_start, day_end)
for timestamp, model_abbrev, station_code, rh, temp, bias_adjusted_temp, bias_adjusted_rh, bias_adjusted_wind_speed, bias_adjusted_wdir, precip_24hours, bias_adjusted_precip_24h, wind_dir, wind_speed, update_date in daily_result:

daily_result = get_latest_station_prediction(session, active_station_codes, day_start, day_end)
for (
timestamp,
model_abbrev,
station_code,
rh,
temp,
bias_adjusted_temp,
bias_adjusted_rh,
bias_adjusted_wind_speed,
bias_adjusted_wdir,
precip_24hours,
bias_adjusted_precip_24h,
wind_dir,
wind_speed,
) in daily_result:
# Create two WeatherIndeterminates, one for model predictions and one for bias corrected predictions
results.append(
WeatherIndeterminate(
Expand All @@ -153,21 +154,29 @@ async def fetch_latest_model_run_predictions_by_station_code_and_date_range(sess
relative_humidity=rh,
precipitation=precip_24hours,
wind_direction=wind_dir,
wind_speed=wind_speed
))
wind_speed=wind_speed,
)
)
results.append(
WeatherIndeterminate(
station_code=station_code,
station_name=stations[station_code].name,
determinate=f'{model_abbrev}_BIAS',
determinate=f"{model_abbrev}_BIAS",
utc_timestamp=timestamp,
temperature=bias_adjusted_temp,
relative_humidity=bias_adjusted_rh,
precipitation=bias_adjusted_precip_24h,
wind_speed=bias_adjusted_wind_speed,
wind_direction=bias_adjusted_wdir
))
return post_process_fetched_predictions(results)
wind_direction=bias_adjusted_wdir,
)
)
post_processed_results = post_process_fetched_predictions(results)
cffdrs_end = perf_counter()
delta = cffdrs_end - cffdrs_start
# Any delta below 100 milliseconds is just noise in the logs.
if delta > 0.1:
logger.info("%f delta count before and after latest prediction model query", delta)
return post_processed_results


def post_process_fetched_predictions(weather_indeterminates: List[WeatherIndeterminate]):
Expand All @@ -193,35 +202,26 @@ async def marshall_predictions(session: Session, model: ModelEnum, station_codes
# day, so we need to look at the accumulated precip from the previous model run to calculate the
# delta_precip
precip_value = None
if prediction.prediction_timestamp == prediction_model_run_timestamp.prediction_run_timestamp and \
prediction.prediction_timestamp.hour > 0:
precip_value = _fetch_delta_precip_for_prev_model_run(
session,
model,
prediction,
station_predictions,
prediction_model_run_timestamp.prediction_run_timestamp)
if prediction.prediction_timestamp == prediction_model_run_timestamp.prediction_run_timestamp and prediction.prediction_timestamp.hour > 0:
precip_value = _fetch_delta_precip_for_prev_model_run(session, model, prediction, station_predictions, prediction_model_run_timestamp.prediction_run_timestamp)
# This condition catches situations where we are not at hour 000 of the model run, or where it is
# hour 000 but there was nothing returned from _fetch_delta_precip_for_prev_model_run()
if precip_value is None:
precip_value = prediction.delta_precip
station_predictions[prediction.station_code][prediction.prediction_timestamp] = {
'model_run': WeatherModelRun(
datetime=prediction_model_run_timestamp.prediction_run_timestamp,
name=prediction_model.name,
abbreviation=model,
projection=prediction_model.projection
"model_run": WeatherModelRun(
datetime=prediction_model_run_timestamp.prediction_run_timestamp, name=prediction_model.name, abbreviation=model, projection=prediction_model.projection
),
'prediction': WeatherModelPredictionValues(
"prediction": WeatherModelPredictionValues(
temperature=prediction.tmp_tgl_2,
bias_adjusted_temperature=prediction.bias_adjusted_temperature,
relative_humidity=prediction.rh_tgl_2,
bias_adjusted_relative_humidity=prediction.bias_adjusted_rh,
delta_precipitation=precip_value,
wind_speed=prediction.wind_tgl_10,
wind_direction=prediction.wdir_tgl_10,
datetime=prediction.prediction_timestamp
)
datetime=prediction.prediction_timestamp,
),
}

# Re-structure the data, grouping data by station and model run.
Expand All @@ -231,19 +231,12 @@ async def marshall_predictions(session: Session, model: ModelEnum, station_codes
for station_code, predictions in station_predictions.items():
model_run_dict = {}
for prediction in predictions.values():

if prediction['model_run'].datetime in model_run_dict:
model_run_predictions = model_run_dict[prediction['model_run'].datetime]
if prediction["model_run"].datetime in model_run_dict:
model_run_predictions = model_run_dict[prediction["model_run"].datetime]
else:
model_run_predictions = ModelRunPredictions(
model_run=prediction['model_run'],
values=[]
)
model_run_dict[prediction['model_run'].datetime] = model_run_predictions
model_run_predictions.values.append(prediction['prediction'])
model_run_predictions = ModelRunPredictions(model_run=prediction["model_run"], values=[])
model_run_dict[prediction["model_run"].datetime] = model_run_predictions
model_run_predictions.values.append(prediction["prediction"])

response.append(WeatherStationModelRunsPredictions(
station=stations[station_code],
model_runs=list(model_run_dict.values())
))
response.append(WeatherStationModelRunsPredictions(station=stations[station_code], model_runs=list(model_run_dict.values())))
return response
7 changes: 4 additions & 3 deletions openshift/pgslice/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
FROM ankane/pgslice:v0.6.1
FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04

RUN apk update && apk add unzip bash
RUN apt-get update && \
apt-get install -y build-essential libpq-dev postgresql-client-16 ruby-full && \
gem install pgslice

# Download the Amazon CLI installer.
ADD "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" /tmp/awscliv2.zip
Expand All @@ -10,6 +12,5 @@ USER root
RUN unzip /tmp/awscliv2.zip -d /tmp/ &&\
/tmp/aws/install


COPY fill_partition_data.sh .
COPY partition_and_archive.sh .
11 changes: 6 additions & 5 deletions openshift/pgslice/docker/fill_partition_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ then
exit 1
fi

export PGSLICE_URL="postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}"
ENCODED_PASS=$(python3 -c "import urllib.parse; print(urllib.parse.quote('${PG_PASSWORD}'))")
PGSLICE_URL=postgresql://${PG_USER}:${ENCODED_PASS}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}
# Fill the partitions with data from the original table
pgslice fill $TABLE
pgslice fill $TABLE --url $PGSLICE_URL
# Analyze for query planner
pgslice analyze $TABLE
pgslice analyze $TABLE --url $PGSLICE_URL
# Swap the intermediate table with the original table
pgslice swap $TABLE
pgslice swap $TABLE --url $PGSLICE_URL
# Fill the rest (rows inserted between the first fill and the swap)
pgslice fill $TABLE --swapped
pgslice fill $TABLE --swapped --url $PGSLICE_URL
23 changes: 16 additions & 7 deletions openshift/pgslice/docker/partition_and_archive.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,26 @@ then
exit 1
fi

export PGSLICE_URL = "postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}"
ENCODED_PASS=$(python3 -c "import urllib.parse; print(urllib.parse.quote('${PG_PASSWORD}'))")
PGSLICE_URL=postgresql://${PG_USER}:${ENCODED_PASS}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}
# Add partitions to the intermediate table (assumes it already exists)
pgslice add_partitions $TABLE --intermediate --future 1
pgslice add_partitions $TABLE --intermediate --future 1 --url $PGSLICE_URL
# Fill the partitions with data from the original table
pgslice fill $TABLE
pgslice fill $TABLE --url $PGSLICE_URL
# Analyze for query planner
pgslice analyze $TABLE
pgslice analyze $TABLE --url $PGSLICE_URL
# Swap the intermediate table with the original table
pgslice swap $TABLE
pgslice swap $TABLE --url $PGSLICE_URL
# Fill the rest (rows inserted between the first fill and the swap)
pgslice fill $TABLE --swapped
pgslice fill $TABLE --swapped --url $PGSLICE_URL


# Dump any retired tables to S3 and drop
pg_dump -c -Fc -t ${TABLE}_retired $PGSLICE_URL | gzip | AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY}" AWS_SECRET_ACCESS_KEY="${AWS_SECRET_KEY}" aws --endpoint="https://${AWS_HOSTNAME}" s3 cp - "s3://${AWS_BUCKET}/retired/${TABLE}_retired.dump.gz"
# borrowing a lot from https://github.com/BCDevOps/backup-container
_timestamp=`date +\%Y-\%m-\%d_%H-%M-%S`
_datestamp=`date +\%Y/\%m`
_target_filename="${PG_HOSTNAME}_${TABLE}_retired_${_timestamp}.sql.gz"
_target_folder="${PG_HOSTNAME}_${PG_DATABASE}/${_datestamp}"

pg_dump -c -Fc -t ${TABLE}_retired $PGSLICE_URL | gzip | AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY AWS_SECRET_ACCESS_KEY=$AWS_SECRET_KEY aws --endpoint="https://${AWS_HOSTNAME}" s3 cp - "s3://${AWS_BUCKET}/retired/${_target_folder}/${_target_filename}"
psql -c "DROP TABLE ${TABLE}_retired" $PGSLICE_URL

0 comments on commit bb74983

Please sign in to comment.