Skip to content

Commit

Permalink
Percent grass curing from CWFIS (bcgov#3385)
Browse files Browse the repository at this point in the history
Co-authored-by: Conor Brady <con.brad@gmail.com>
  • Loading branch information
dgboss and conbrad authored Feb 7, 2024
1 parent 084cd7a commit 2bb2ee6
Show file tree
Hide file tree
Showing 10 changed files with 466 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ jobs:
oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}"
PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_viirs_snow_cronjob.sh ${SUFFIX} apply
- name: GRASS CURING cronjob
shell: bash
run: |
oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}"
PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_grass_curing_cronjob.sh ${SUFFIX} apply
# TODO: Delete once pmtiles has run for some time
# deploy-tileserv:
# name: Deploy tileserv to Dev
Expand Down
41 changes: 41 additions & 0 deletions api/alembic/versions/2b3755392ad8_percent_grass_curing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Percent grass curing
Revision ID: 2b3755392ad8
Revises: 5845f568a975
Create Date: 2024-02-01 16:45:05.914743
"""
from alembic import op
import sqlalchemy as sa
from app.db.models.common import TZTimeStamp

# revision identifiers, used by Alembic.
revision = '2b3755392ad8'
down_revision = '5845f568a975'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic ###
op.create_table('percent_grass_curing',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('station_code', sa.Integer(), nullable=False),
sa.Column('percent_grass_curing', sa.Float(), nullable=False),
sa.Column('for_date', TZTimeStamp(), nullable=False),
sa.PrimaryKeyConstraint('id'),
comment='Record containing information about percent grass curing from the CFWIS.'
)
op.create_index(op.f('ix_percent_grass_curing_for_date'), 'percent_grass_curing', ['for_date'], unique=False)
op.create_index(op.f('ix_percent_grass_curing_id'), 'percent_grass_curing', ['id'], unique=False)
op.create_index(op.f('ix_percent_grass_curing_station_code'), 'percent_grass_curing', ['station_code'], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic ###
op.drop_index(op.f('ix_percent_grass_curing_station_code'), table_name='percent_grass_curing')
op.drop_index(op.f('ix_percent_grass_curing_id'), table_name='percent_grass_curing')
op.drop_index(op.f('ix_percent_grass_curing_for_date'), table_name='percent_grass_curing')
op.drop_table('percent_grass_curing')
# ### end Alembic commands ###
28 changes: 28 additions & 0 deletions api/app/db/crud/grass_curing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
""" CRUD operations relating to processing grass curing
"""
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql import func
from app.db.models.grass_curing import PercentGrassCuring


async def save_percent_grass_curing(session: AsyncSession, percent_grass_curing: PercentGrassCuring):
""" Add a new PercentGrassCuring record.
:param session: A session object for asynchronous database access.
:type session: AsyncSession
:param percent_grass_curing: The record to be saved.
:type percent_grass_curing: PercentGrassCuring
"""
session.add(percent_grass_curing)


async def get_last_percent_grass_curing_for_date(session: AsyncSession):
""" Get the last date for which a PercentGrassCuring record exists.
:param session: A session object for asynchronous database access.
:type session: AsyncSession
"""
stmt = select(func.max(PercentGrassCuring.for_date))
result = await session.execute(stmt)
return result.scalar()
1 change: 1 addition & 0 deletions api/app/db/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
ClassifiedHfi, RunTypeEnum, ShapeTypeEnum, FuelType, HighHfiArea, RunParameters)
from app.db.models.morecast_v2 import MorecastForecastRecord
from app.db.models.snow import ProcessedSnow, SnowSourceEnum
from app.db.models.grass_curing import PercentGrassCuring
18 changes: 18 additions & 0 deletions api/app/db/models/grass_curing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from app.db.models import Base
from sqlalchemy import (Column, Float, Integer, Sequence)
from app.db.models.common import TZTimeStamp


class PercentGrassCuring(Base):
""" Daily percent grass curing per weather station. """
__tablename__ = 'percent_grass_curing'
__table_args__ = (
{'comment': 'Record containing information about percent grass curing from the CFWIS.'}
)

id = Column(Integer, Sequence('percent_grass_curing_id_seq'),
primary_key=True, nullable=False, index=True)
station_code = Column(Integer, nullable=False, index=True)
percent_grass_curing = Column(Float, nullable=False, index=False)
for_date = Column(TZTimeStamp, nullable=False, index=True)

156 changes: 156 additions & 0 deletions api/app/jobs/grass_curing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from affine import Affine
from datetime import date
from osgeo import gdal
import asyncio
import logging
import math
import numpy as np
import os
import requests
import sys
import tempfile
import xml.etree.ElementTree as ET
from app import configure_logging
from app.db.crud.grass_curing import get_last_percent_grass_curing_for_date, save_percent_grass_curing
from app.db.database import get_async_read_session_scope, get_async_write_session_scope
from app.geospatial import WGS84
from app.db.models.grass_curing import PercentGrassCuring
from app.rocketchat_notifications import send_rocketchat_notification
from app.stations import get_stations_asynchronously

logger = logging.getLogger(__name__)

GRASS_CURING_FILE_NAME_3978 = "grass_curing_epsg_3978.tif"
GRASS_CURING_FILE_NAME_4326 = "grass_curing_epsg_4326.tif"
GRASS_CURING_COVERAGE_ID = "public:pc_current"
WCS_URL = "https://cwfis.cfs.nrcan.gc.ca/geoserver/public/wcs"


class OwsException(Exception):
""" Raise when OWS service returns an exception report."""


class GrassCuringJob():
""" Job that downloads and processes percent grass curing data from the CWFIS. """

async def _get_grass_curing_wcs_raster(self, path: str):
""" Gets the current percent grass curing as a tif from the CFWIS Web Coverage Service. """
session = requests.session()
param_dict= {
"service": "WCS",
"version": "2.0.0",
"request": "GetCoverage",
"format": "image/geotiff",
"coverageId": GRASS_CURING_COVERAGE_ID
}
response = session.get(WCS_URL, params = param_dict)
# Check HTTP response code for error condition
response.raise_for_status()
# GeoServer can return an exception report as xml within a 200 response. Check if we have a content type header
# of application/xml and check if an exception is present.
if "Content-Type" in response.headers and response.headers["Content-Type"] == "application/xml":
root_element = ET.fromstring(response.content)
if "ExceptionReport" in root_element.tag:
message = f"GeoServer returned an exception when attempting to download percent grass curing from URL: {response.request.url}"
logger.error(message)
raise OwsException(message)
file_path = os.path.join(path, GRASS_CURING_FILE_NAME_3978)
with open(file_path, 'wb') as file:
file.write(response.content)


def _reproject_to_epsg_4326(self, path: str):
""" Transforms coordinates in source_file geotiff to EPSG:3005 (BC Albers),
writes to newly created geotiff called <new_filename>.tif
:param path: A path to a temporary directory containing the percent grass curing tif.
"""
destination_path = f"{path}/{GRASS_CURING_FILE_NAME_4326}"
source_path = f"{path}/{GRASS_CURING_FILE_NAME_3978}"
source_data = gdal.Open(source_path, gdal.GA_ReadOnly)
gdal.Warp(destination_path, source_data, dstSRS=WGS84)
del source_data


def _yield_value_for_stations(self, data_source, stations):
""" Given a list of stations, and a gdal dataset, yield the grass curing value for each station
:param data_source: A gdal representation of a raster image of percent grass curing.
:param stations: A list of weather station objects.
:return: A tuple of a weather station code and the percent grass curing at its location.
"""
raster_band = data_source.GetRasterBand(1)
data_np_array = raster_band.ReadAsArray()
data_np_array = np.array(data_np_array)
forward_transform = Affine.from_gdal(*data_source.GetGeoTransform())
reverse_transform = ~forward_transform
for station in stations:
longitude = station.long
latitude = station.lat
px, py = reverse_transform * (longitude, latitude)
px = math.floor(px)
py = math.floor(py)
yield (station.code, data_np_array[py][px])


async def _get_last_for_date(self):
""" Get the date of the most recently processed percent grass curing data."""
async with get_async_read_session_scope() as session:
result = await get_last_percent_grass_curing_for_date(session)
return result


async def _process_grass_curing(self):
""" Download and process percent grass curing data. """
async with get_async_write_session_scope() as session:
today = date.today()
logger.info(f"Starting collection of percent grass curing data from CWFIS for {today}.")
with tempfile.TemporaryDirectory() as temp_dir:
await self._get_grass_curing_wcs_raster(temp_dir)
self._reproject_to_epsg_4326(temp_dir)

# Open the reprojected grass curing data
raster = gdal.Open(f"{temp_dir}/{GRASS_CURING_FILE_NAME_4326}")
stations = await get_stations_asynchronously()
for station, value in self._yield_value_for_stations(raster, stations):
percent_grass_curing = PercentGrassCuring(for_date=today,
percent_grass_curing=value,
station_code=station )
await save_percent_grass_curing(session, percent_grass_curing)
logger.info(f"Finished processing percent grass curing data from CFWIS for {today}.")


async def _run_grass_curing(self):
""" Entry point for running the job. """
last_processed = await self._get_last_for_date()
if last_processed is None or last_processed.date() < date.today():
await self._process_grass_curing()
else:
logger.info(f"Percent grass curing processing is up to date as of {date.today()}")


def main():
""" Kicks off asynchronous processing of VIIRS snow coverage data.
"""
try:
# We don't want gdal to silently swallow errors.
gdal.UseExceptions()
logger.debug('Begin processing VIIRS snow coverage data.')

bot = GrassCuringJob()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(bot._run_grass_curing())

# Exit with 0 - success.
sys.exit(os.EX_OK)
except Exception as exception:
# Exit non 0 - failure.
logger.error("An error occurred while processing CWFIS grass curing data.", exc_info=exception)
rc_message = ':scream: Encountered an error while processing CWFIS grass curing data.'
send_rocketchat_notification(rc_message, exception)
sys.exit(os.EX_SOFTWARE)

if __name__ == '__main__':
configure_logging()
main()
43 changes: 43 additions & 0 deletions api/app/tests/jobs/test_grass_curing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
""" Unit testing for CWFIS grass curing data processing """

import os
import pytest
from datetime import datetime
from pytest_mock import MockerFixture
from app.jobs import grass_curing
from app.jobs.grass_curing import GrassCuringJob


def test_grass_curing_job_fail(mocker: MockerFixture,
monkeypatch):
"""
Test that when the bot fails, a message is sent to rocket-chat, and our exit code is 1.
"""

async def mock__run_grass_curing(self):
raise OSError("Error")

monkeypatch.setattr(GrassCuringJob, '_run_grass_curing', mock__run_grass_curing)
rocket_chat_spy = mocker.spy(grass_curing, 'send_rocketchat_notification')

with pytest.raises(SystemExit) as excinfo:
grass_curing.main()
# Assert that we exited with an error code.
assert excinfo.value.code == os.EX_SOFTWARE
# Assert that rocket chat was called.
assert rocket_chat_spy.call_count == 1


def test_grass_curing_job_exits_without_error_when_no_work_required(monkeypatch):
""" Test that grass_curing_job exits without error when no data needs to be processed.
"""
async def mock__get_last_for_date(self):
return datetime.now()

monkeypatch.setattr(GrassCuringJob, '_get_last_for_date', mock__get_last_for_date)

with pytest.raises(SystemExit) as excinfo:
grass_curing.main()
# Assert that we exited with an error code.
assert excinfo.value.code == os.EX_OK

2 changes: 2 additions & 0 deletions openshift/scripts/oc_deploy_to_production.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ echo C-Haines
PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_c_haines_cronjob.sh prod ${RUN_TYPE}
echo VIIRS Snow
PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_viirs_snow_cronjob.sh prod ${RUN_TYPE}
echo Grass Curing
PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_grass_curing_cronjob.sh prod ${RUN_TYPE}
echo BC FireWeather cronjobs
echo "Run forecast at 8h30 PDT and 16h30 PDT (so before and after noon)"
PROJ_TARGET=${PROJ_TARGET} SCHEDULE="30 * * * *" bash $(dirname ${0})/oc_provision_wfwx_noon_forecasts_cronjob.sh prod ${RUN_TYPE}
Expand Down
58 changes: 58 additions & 0 deletions openshift/scripts/oc_provision_grass_curing_cronjob.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/bin/sh -l
#
source "$(dirname ${0})/common/common"

#%
#% OpenShift Deploy Helper
#%
#% Intended for use with a pull request-based pipeline.
#% Suffixes incl.: pr-###.
#%
#% Usage:
#%
#% ${THIS_FILE} [SUFFIX] [apply]
#%
#% Examples:
#%
#% Provide a PR number. Defaults to a dry-run.
#% ${THIS_FILE} pr-0
#%
#% Apply when satisfied.
#% ${THIS_FILE} pr-0 apply
#%

# Target project override for Dev or Prod deployments
#
PROJ_TARGET="${PROJ_TARGET:-${PROJ_DEV}}"

# Specify a default schedule to run daily at 5am-ish
SCHEDULE="${SCHEDULE:-$((3 + $RANDOM % 54)) 5 * * *}"

# Process template
OC_PROCESS="oc -n ${PROJ_TARGET} process -f ${TEMPLATE_PATH}/grass_curing.cronjob.yaml \
-p JOB_NAME=grass-curing-${APP_NAME}-${SUFFIX} \
-p APP_LABEL=${APP_NAME}-${SUFFIX} \
-p NAME=${APP_NAME} \
-p SUFFIX=${SUFFIX} \
-p SCHEDULE=\"${SCHEDULE}\" \
-p POSTGRES_DATABASE=${POSTGRES_DATABASE:-${APP_NAME}} \
-p POSTGRES_USER=wps-crunchydb-${SUFFIX} \
-p POSTGRES_WRITE_HOST=wps-crunchydb-${SUFFIX}-primary \
-p POSTGRES_READ_HOST=wps-crunchydb-${SUFFIX}-primary \
-p CRUNCHYDB_USER=wps-crunchydb-${SUFFIX}-pguser-wps-crunchydb-${SUFFIX} \
${PROJ_TOOLS:+ "-p PROJ_TOOLS=${PROJ_TOOLS}"} \
${IMAGE_REGISTRY:+ "-p IMAGE_REGISTRY=${IMAGE_REGISTRY}"}"

# Apply template (apply or use --dry-run)
#
OC_APPLY="oc -n ${PROJ_TARGET} apply -f -"
[ "${APPLY}" ] || OC_APPLY="${OC_APPLY} --dry-run"

# Execute commands
#
eval "${OC_PROCESS}"
eval "${OC_PROCESS} | ${OC_APPLY}"

# Provide oc command instruction
#
display_helper "${OC_PROCESS} | ${OC_APPLY}"
Loading

0 comments on commit 2bb2ee6

Please sign in to comment.