-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Percent grass curing from CWFIS (#3385)
Co-authored-by: Conor Brady <con.brad@gmail.com>
- Loading branch information
Showing
10 changed files
with
466 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
"""Percent grass curing | ||
Revision ID: 2b3755392ad8 | ||
Revises: 5845f568a975 | ||
Create Date: 2024-02-01 16:45:05.914743 | ||
""" | ||
from alembic import op | ||
import sqlalchemy as sa | ||
from app.db.models.common import TZTimeStamp | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = '2b3755392ad8' | ||
down_revision = '5845f568a975' | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
# ### commands auto generated by Alembic ### | ||
op.create_table('percent_grass_curing', | ||
sa.Column('id', sa.Integer(), nullable=False), | ||
sa.Column('station_code', sa.Integer(), nullable=False), | ||
sa.Column('percent_grass_curing', sa.Float(), nullable=False), | ||
sa.Column('for_date', TZTimeStamp(), nullable=False), | ||
sa.PrimaryKeyConstraint('id'), | ||
comment='Record containing information about percent grass curing from the CFWIS.' | ||
) | ||
op.create_index(op.f('ix_percent_grass_curing_for_date'), 'percent_grass_curing', ['for_date'], unique=False) | ||
op.create_index(op.f('ix_percent_grass_curing_id'), 'percent_grass_curing', ['id'], unique=False) | ||
op.create_index(op.f('ix_percent_grass_curing_station_code'), 'percent_grass_curing', ['station_code'], unique=False) | ||
# ### end Alembic commands ### | ||
|
||
|
||
def downgrade(): | ||
# ### commands auto generated by Alembic ### | ||
op.drop_index(op.f('ix_percent_grass_curing_station_code'), table_name='percent_grass_curing') | ||
op.drop_index(op.f('ix_percent_grass_curing_id'), table_name='percent_grass_curing') | ||
op.drop_index(op.f('ix_percent_grass_curing_for_date'), table_name='percent_grass_curing') | ||
op.drop_table('percent_grass_curing') | ||
# ### end Alembic commands ### |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
""" CRUD operations relating to processing grass curing | ||
""" | ||
from sqlalchemy import select | ||
from sqlalchemy.ext.asyncio import AsyncSession | ||
from sqlalchemy.sql import func | ||
from app.db.models.grass_curing import PercentGrassCuring | ||
|
||
|
||
async def save_percent_grass_curing(session: AsyncSession, percent_grass_curing: PercentGrassCuring): | ||
""" Add a new PercentGrassCuring record. | ||
:param session: A session object for asynchronous database access. | ||
:type session: AsyncSession | ||
:param percent_grass_curing: The record to be saved. | ||
:type percent_grass_curing: PercentGrassCuring | ||
""" | ||
session.add(percent_grass_curing) | ||
|
||
|
||
async def get_last_percent_grass_curing_for_date(session: AsyncSession): | ||
""" Get the last date for which a PercentGrassCuring record exists. | ||
:param session: A session object for asynchronous database access. | ||
:type session: AsyncSession | ||
""" | ||
stmt = select(func.max(PercentGrassCuring.for_date)) | ||
result = await session.execute(stmt) | ||
return result.scalar() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from app.db.models import Base | ||
from sqlalchemy import (Column, Float, Integer, Sequence) | ||
from app.db.models.common import TZTimeStamp | ||
|
||
|
||
class PercentGrassCuring(Base): | ||
""" Daily percent grass curing per weather station. """ | ||
__tablename__ = 'percent_grass_curing' | ||
__table_args__ = ( | ||
{'comment': 'Record containing information about percent grass curing from the CFWIS.'} | ||
) | ||
|
||
id = Column(Integer, Sequence('percent_grass_curing_id_seq'), | ||
primary_key=True, nullable=False, index=True) | ||
station_code = Column(Integer, nullable=False, index=True) | ||
percent_grass_curing = Column(Float, nullable=False, index=False) | ||
for_date = Column(TZTimeStamp, nullable=False, index=True) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
from affine import Affine | ||
from datetime import date | ||
from osgeo import gdal | ||
import asyncio | ||
import logging | ||
import math | ||
import numpy as np | ||
import os | ||
import requests | ||
import sys | ||
import tempfile | ||
import xml.etree.ElementTree as ET | ||
from app import configure_logging | ||
from app.db.crud.grass_curing import get_last_percent_grass_curing_for_date, save_percent_grass_curing | ||
from app.db.database import get_async_read_session_scope, get_async_write_session_scope | ||
from app.geospatial import WGS84 | ||
from app.db.models.grass_curing import PercentGrassCuring | ||
from app.rocketchat_notifications import send_rocketchat_notification | ||
from app.stations import get_stations_asynchronously | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
GRASS_CURING_FILE_NAME_3978 = "grass_curing_epsg_3978.tif" | ||
GRASS_CURING_FILE_NAME_4326 = "grass_curing_epsg_4326.tif" | ||
GRASS_CURING_COVERAGE_ID = "public:pc_current" | ||
WCS_URL = "https://cwfis.cfs.nrcan.gc.ca/geoserver/public/wcs" | ||
|
||
|
||
class OwsException(Exception): | ||
""" Raise when OWS service returns an exception report.""" | ||
|
||
|
||
class GrassCuringJob(): | ||
""" Job that downloads and processes percent grass curing data from the CWFIS. """ | ||
|
||
async def _get_grass_curing_wcs_raster(self, path: str): | ||
""" Gets the current percent grass curing as a tif from the CFWIS Web Coverage Service. """ | ||
session = requests.session() | ||
param_dict= { | ||
"service": "WCS", | ||
"version": "2.0.0", | ||
"request": "GetCoverage", | ||
"format": "image/geotiff", | ||
"coverageId": GRASS_CURING_COVERAGE_ID | ||
} | ||
response = session.get(WCS_URL, params = param_dict) | ||
# Check HTTP response code for error condition | ||
response.raise_for_status() | ||
# GeoServer can return an exception report as xml within a 200 response. Check if we have a content type header | ||
# of application/xml and check if an exception is present. | ||
if "Content-Type" in response.headers and response.headers["Content-Type"] == "application/xml": | ||
root_element = ET.fromstring(response.content) | ||
if "ExceptionReport" in root_element.tag: | ||
message = f"GeoServer returned an exception when attempting to download percent grass curing from URL: {response.request.url}" | ||
logger.error(message) | ||
raise OwsException(message) | ||
file_path = os.path.join(path, GRASS_CURING_FILE_NAME_3978) | ||
with open(file_path, 'wb') as file: | ||
file.write(response.content) | ||
|
||
|
||
def _reproject_to_epsg_4326(self, path: str): | ||
""" Transforms coordinates in source_file geotiff to EPSG:3005 (BC Albers), | ||
writes to newly created geotiff called <new_filename>.tif | ||
:param path: A path to a temporary directory containing the percent grass curing tif. | ||
""" | ||
destination_path = f"{path}/{GRASS_CURING_FILE_NAME_4326}" | ||
source_path = f"{path}/{GRASS_CURING_FILE_NAME_3978}" | ||
source_data = gdal.Open(source_path, gdal.GA_ReadOnly) | ||
gdal.Warp(destination_path, source_data, dstSRS=WGS84) | ||
del source_data | ||
|
||
|
||
def _yield_value_for_stations(self, data_source, stations): | ||
""" Given a list of stations, and a gdal dataset, yield the grass curing value for each station | ||
:param data_source: A gdal representation of a raster image of percent grass curing. | ||
:param stations: A list of weather station objects. | ||
:return: A tuple of a weather station code and the percent grass curing at its location. | ||
""" | ||
raster_band = data_source.GetRasterBand(1) | ||
data_np_array = raster_band.ReadAsArray() | ||
data_np_array = np.array(data_np_array) | ||
forward_transform = Affine.from_gdal(*data_source.GetGeoTransform()) | ||
reverse_transform = ~forward_transform | ||
for station in stations: | ||
longitude = station.long | ||
latitude = station.lat | ||
px, py = reverse_transform * (longitude, latitude) | ||
px = math.floor(px) | ||
py = math.floor(py) | ||
yield (station.code, data_np_array[py][px]) | ||
|
||
|
||
async def _get_last_for_date(self): | ||
""" Get the date of the most recently processed percent grass curing data.""" | ||
async with get_async_read_session_scope() as session: | ||
result = await get_last_percent_grass_curing_for_date(session) | ||
return result | ||
|
||
|
||
async def _process_grass_curing(self): | ||
""" Download and process percent grass curing data. """ | ||
async with get_async_write_session_scope() as session: | ||
today = date.today() | ||
logger.info(f"Starting collection of percent grass curing data from CWFIS for {today}.") | ||
with tempfile.TemporaryDirectory() as temp_dir: | ||
await self._get_grass_curing_wcs_raster(temp_dir) | ||
self._reproject_to_epsg_4326(temp_dir) | ||
|
||
# Open the reprojected grass curing data | ||
raster = gdal.Open(f"{temp_dir}/{GRASS_CURING_FILE_NAME_4326}") | ||
stations = await get_stations_asynchronously() | ||
for station, value in self._yield_value_for_stations(raster, stations): | ||
percent_grass_curing = PercentGrassCuring(for_date=today, | ||
percent_grass_curing=value, | ||
station_code=station ) | ||
await save_percent_grass_curing(session, percent_grass_curing) | ||
logger.info(f"Finished processing percent grass curing data from CFWIS for {today}.") | ||
|
||
|
||
async def _run_grass_curing(self): | ||
""" Entry point for running the job. """ | ||
last_processed = await self._get_last_for_date() | ||
if last_processed is None or last_processed.date() < date.today(): | ||
await self._process_grass_curing() | ||
else: | ||
logger.info(f"Percent grass curing processing is up to date as of {date.today()}") | ||
|
||
|
||
def main(): | ||
""" Kicks off asynchronous processing of VIIRS snow coverage data. | ||
""" | ||
try: | ||
# We don't want gdal to silently swallow errors. | ||
gdal.UseExceptions() | ||
logger.debug('Begin processing VIIRS snow coverage data.') | ||
|
||
bot = GrassCuringJob() | ||
loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(loop) | ||
loop.run_until_complete(bot._run_grass_curing()) | ||
|
||
# Exit with 0 - success. | ||
sys.exit(os.EX_OK) | ||
except Exception as exception: | ||
# Exit non 0 - failure. | ||
logger.error("An error occurred while processing CWFIS grass curing data.", exc_info=exception) | ||
rc_message = ':scream: Encountered an error while processing CWFIS grass curing data.' | ||
send_rocketchat_notification(rc_message, exception) | ||
sys.exit(os.EX_SOFTWARE) | ||
|
||
if __name__ == '__main__': | ||
configure_logging() | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
""" Unit testing for CWFIS grass curing data processing """ | ||
|
||
import os | ||
import pytest | ||
from datetime import datetime | ||
from pytest_mock import MockerFixture | ||
from app.jobs import grass_curing | ||
from app.jobs.grass_curing import GrassCuringJob | ||
|
||
|
||
def test_grass_curing_job_fail(mocker: MockerFixture, | ||
monkeypatch): | ||
""" | ||
Test that when the bot fails, a message is sent to rocket-chat, and our exit code is 1. | ||
""" | ||
|
||
async def mock__run_grass_curing(self): | ||
raise OSError("Error") | ||
|
||
monkeypatch.setattr(GrassCuringJob, '_run_grass_curing', mock__run_grass_curing) | ||
rocket_chat_spy = mocker.spy(grass_curing, 'send_rocketchat_notification') | ||
|
||
with pytest.raises(SystemExit) as excinfo: | ||
grass_curing.main() | ||
# Assert that we exited with an error code. | ||
assert excinfo.value.code == os.EX_SOFTWARE | ||
# Assert that rocket chat was called. | ||
assert rocket_chat_spy.call_count == 1 | ||
|
||
|
||
def test_grass_curing_job_exits_without_error_when_no_work_required(monkeypatch): | ||
""" Test that grass_curing_job exits without error when no data needs to be processed. | ||
""" | ||
async def mock__get_last_for_date(self): | ||
return datetime.now() | ||
|
||
monkeypatch.setattr(GrassCuringJob, '_get_last_for_date', mock__get_last_for_date) | ||
|
||
with pytest.raises(SystemExit) as excinfo: | ||
grass_curing.main() | ||
# Assert that we exited with an error code. | ||
assert excinfo.value.code == os.EX_OK | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
#!/bin/sh -l | ||
# | ||
source "$(dirname ${0})/common/common" | ||
|
||
#% | ||
#% OpenShift Deploy Helper | ||
#% | ||
#% Intended for use with a pull request-based pipeline. | ||
#% Suffixes incl.: pr-###. | ||
#% | ||
#% Usage: | ||
#% | ||
#% ${THIS_FILE} [SUFFIX] [apply] | ||
#% | ||
#% Examples: | ||
#% | ||
#% Provide a PR number. Defaults to a dry-run. | ||
#% ${THIS_FILE} pr-0 | ||
#% | ||
#% Apply when satisfied. | ||
#% ${THIS_FILE} pr-0 apply | ||
#% | ||
|
||
# Target project override for Dev or Prod deployments | ||
# | ||
PROJ_TARGET="${PROJ_TARGET:-${PROJ_DEV}}" | ||
|
||
# Specify a default schedule to run daily at 5am-ish | ||
SCHEDULE="${SCHEDULE:-$((3 + $RANDOM % 54)) 5 * * *}" | ||
|
||
# Process template | ||
OC_PROCESS="oc -n ${PROJ_TARGET} process -f ${TEMPLATE_PATH}/grass_curing.cronjob.yaml \ | ||
-p JOB_NAME=grass-curing-${APP_NAME}-${SUFFIX} \ | ||
-p APP_LABEL=${APP_NAME}-${SUFFIX} \ | ||
-p NAME=${APP_NAME} \ | ||
-p SUFFIX=${SUFFIX} \ | ||
-p SCHEDULE=\"${SCHEDULE}\" \ | ||
-p POSTGRES_DATABASE=${POSTGRES_DATABASE:-${APP_NAME}} \ | ||
-p POSTGRES_USER=wps-crunchydb-${SUFFIX} \ | ||
-p POSTGRES_WRITE_HOST=wps-crunchydb-${SUFFIX}-primary \ | ||
-p POSTGRES_READ_HOST=wps-crunchydb-${SUFFIX}-primary \ | ||
-p CRUNCHYDB_USER=wps-crunchydb-${SUFFIX}-pguser-wps-crunchydb-${SUFFIX} \ | ||
${PROJ_TOOLS:+ "-p PROJ_TOOLS=${PROJ_TOOLS}"} \ | ||
${IMAGE_REGISTRY:+ "-p IMAGE_REGISTRY=${IMAGE_REGISTRY}"}" | ||
|
||
# Apply template (apply or use --dry-run) | ||
# | ||
OC_APPLY="oc -n ${PROJ_TARGET} apply -f -" | ||
[ "${APPLY}" ] || OC_APPLY="${OC_APPLY} --dry-run" | ||
|
||
# Execute commands | ||
# | ||
eval "${OC_PROCESS}" | ||
eval "${OC_PROCESS} | ${OC_APPLY}" | ||
|
||
# Provide oc command instruction | ||
# | ||
display_helper "${OC_PROCESS} | ${OC_APPLY}" |
Oops, something went wrong.