Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Percent grass curing from CWFIS #3385

Merged
merged 14 commits into from
Feb 7, 2024
6 changes: 6 additions & 0 deletions .github/workflows/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ jobs:
oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}"
PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_viirs_snow_cronjob.sh ${SUFFIX} apply
- name: GRASS CURING cronjob
shell: bash
run: |
oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}"
PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_grass_curing_cronjob.sh ${SUFFIX} apply
# TODO: Delete once pmtiles has run for some time
# deploy-tileserv:
# name: Deploy tileserv to Dev
Expand Down
41 changes: 41 additions & 0 deletions api/alembic/versions/2b3755392ad8_percent_grass_curing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Percent grass curing

Revision ID: 2b3755392ad8
Revises: 5845f568a975
Create Date: 2024-02-01 16:45:05.914743

"""
from alembic import op
import sqlalchemy as sa
from app.db.models.common import TZTimeStamp

# revision identifiers, used by Alembic.
revision = '2b3755392ad8'
down_revision = '5845f568a975'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic ###
op.create_table('percent_grass_curing',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('station_code', sa.Integer(), nullable=False),
sa.Column('percent_grass_curing', sa.Float(), nullable=False),
sa.Column('for_date', TZTimeStamp(), nullable=False),
sa.PrimaryKeyConstraint('id'),
comment='Record containing information about percent grass curing from the CFWIS.'
)
op.create_index(op.f('ix_percent_grass_curing_for_date'), 'percent_grass_curing', ['for_date'], unique=False)
op.create_index(op.f('ix_percent_grass_curing_id'), 'percent_grass_curing', ['id'], unique=False)
op.create_index(op.f('ix_percent_grass_curing_station_code'), 'percent_grass_curing', ['station_code'], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic ###
op.drop_index(op.f('ix_percent_grass_curing_station_code'), table_name='percent_grass_curing')
op.drop_index(op.f('ix_percent_grass_curing_id'), table_name='percent_grass_curing')
op.drop_index(op.f('ix_percent_grass_curing_for_date'), table_name='percent_grass_curing')
op.drop_table('percent_grass_curing')
# ### end Alembic commands ###
28 changes: 28 additions & 0 deletions api/app/db/crud/grass_curing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
""" CRUD operations relating to processing snow coverage
dgboss marked this conversation as resolved.
Show resolved Hide resolved
"""
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql import func
from app.db.models.grass_curing import PercentGrassCuring


async def save_percent_grass_curing(session: AsyncSession, percent_grass_curing: PercentGrassCuring):
""" Add a new PercentGrassCuring record.

:param session: A session object for asynchronous database access.
:type session: AsyncSession
:param percent_grass_curing: The record to be saved.
:type percent_grass_curing: PercentGrassCuring
"""
session.add(percent_grass_curing)


async def get_last_percent_grass_curing_for_date(session: AsyncSession):
""" Get the last date for which a PercentGrassCuring record exists.

:param session: A session object for asynchronous database access.
:type session: AsyncSession
"""
stmt = select(func.max(PercentGrassCuring.for_date))
result = await session.execute(stmt)
return result.scalar()
1 change: 1 addition & 0 deletions api/app/db/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
ClassifiedHfi, RunTypeEnum, ShapeTypeEnum, FuelType, HighHfiArea, RunParameters)
from app.db.models.morecast_v2 import MorecastForecastRecord
from app.db.models.snow import ProcessedSnow, SnowSourceEnum
from app.db.models.grass_curing import PercentGrassCuring
18 changes: 18 additions & 0 deletions api/app/db/models/grass_curing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from app.db.models import Base
from sqlalchemy import (Column, Float, Integer, Sequence)
from app.db.models.common import TZTimeStamp


class PercentGrassCuring(Base):
""" Daily percent grass curing per weather station. """
__tablename__ = 'percent_grass_curing'
__table_args__ = (
{'comment': 'Record containing information about percent grass curing from the CFWIS.'}
)

id = Column(Integer, Sequence('percent_grass_curing_id_seq'),
primary_key=True, nullable=False, index=True)
station_code = Column(Integer, nullable=False, index=True)
percent_grass_curing = Column(Float, nullable=False, index=False)
for_date = Column(TZTimeStamp, nullable=False, index=True)

156 changes: 156 additions & 0 deletions api/app/jobs/grass_curing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from affine import Affine
from datetime import date
from osgeo import gdal
import asyncio
import logging
import math
import numpy as np
import os
import requests
import sys
import tempfile
import xml.etree.ElementTree as ET
from app import configure_logging
from app.db.crud.grass_curing import get_last_percent_grass_curing_for_date, save_percent_grass_curing
from app.db.database import get_async_read_session_scope, get_async_write_session_scope
from app.geospatial import WGS84
from app.db.models.grass_curing import PercentGrassCuring
from app.rocketchat_notifications import send_rocketchat_notification
from app.stations import get_stations_asynchronously

logger = logging.getLogger(__name__)

GRASS_CURING_FILE_NAME_3978 = "grass_curing_epsg_3978.tif"
GRASS_CURING_FILE_NAME_4326 = "grass_curing_epsg_4326.tif"
GRASS_CURING_COVERAGE_ID = "public:pc_current"
WCS_URL = "https://cwfis.cfs.nrcan.gc.ca/geoserver/public/wcs"


class OwsException(Exception):
""" Raise when OWS service returns an exception report."""


class GrassCuringJob():
""" Job that downloads and processes percent grass curing data from the CWFIS. """

async def _get_grass_curing_wcs_raster(self, path: str):
""" Gets the current percent grass curing as a tif from the CFWIS Web Coverage Service. """
session = requests.session()
param_dict= {

Check warning on line 39 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L38-L39

Added lines #L38 - L39 were not covered by tests
"service": "WCS",
"version": "2.0.0",
"request": "GetCoverage",
"format": "image/geotiff",
"coverageId": GRASS_CURING_COVERAGE_ID
}
response = session.get(WCS_URL, params = param_dict)

Check warning on line 46 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L46

Added line #L46 was not covered by tests
# Check HTTP response code for error condition
response.raise_for_status()

Check warning on line 48 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L48

Added line #L48 was not covered by tests
# GeoServer can return an exception report as xml within a 200 response. Check if we have a content type header
# of application/xml and check if an exception is present.
if "Content-Type" in response.headers and response.headers["Content-Type"] == "application/xml":
root_element = ET.fromstring(response.content)
if "ExceptionReport" in root_element.tag:
message = f"GeoServer returned an exception when attempting to download percent grass curing from URL: {response.request.url}"
logger.error(message)
raise OwsException(message)
file_path = os.path.join(path, GRASS_CURING_FILE_NAME_3978)
with open(file_path, 'wb') as file:
file.write(response.content)

Check warning on line 59 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L51-L59

Added lines #L51 - L59 were not covered by tests


def _reproject_to_epsg_4326(self, path: str):
""" Transforms coordinates in source_file geotiff to EPSG:3005 (BC Albers),
writes to newly created geotiff called <new_filename>.tif

:param path: A path to a temporary directory containing the percent grass curing tif.
"""
destination_path = f"{path}/{GRASS_CURING_FILE_NAME_4326}"
source_path = f"{path}/{GRASS_CURING_FILE_NAME_3978}"
source_data = gdal.Open(source_path, gdal.GA_ReadOnly)
gdal.Warp(destination_path, source_data, dstSRS=WGS84)
del source_data

Check warning on line 72 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L68-L72

Added lines #L68 - L72 were not covered by tests


def _yield_value_for_stations(self, data_source, stations):
""" Given a list of stations, and a gdal dataset, yield the grass curing value for each station

:param data_source: A gdal representation of a raster image of percent grass curing.
:param stations: A list of weather station objects.
:return: A tuple of a weather station code and the percent grass curing at its location.
"""
raster_band = data_source.GetRasterBand(1)
data_np_array = raster_band.ReadAsArray()
data_np_array = np.array(data_np_array)
forward_transform = Affine.from_gdal(*data_source.GetGeoTransform())
reverse_transform = ~forward_transform
for station in stations:
longitude = station.long
latitude = station.lat
px, py = reverse_transform * (longitude, latitude)
px = math.floor(px)
py = math.floor(py)
yield (station.code, data_np_array[py][px])

Check warning on line 93 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L82-L93

Added lines #L82 - L93 were not covered by tests


async def _get_last_for_date(self):
""" Get the date of the most recently processed percent grass curing data."""
async with get_async_read_session_scope() as session:
result = await get_last_percent_grass_curing_for_date(session)
return result

Check warning on line 100 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L98-L100

Added lines #L98 - L100 were not covered by tests


async def _process_grass_curing(self):
""" Download and process percent grass curing data. """
async with get_async_write_session_scope() as session:
today = date.today()
logger.info(f"Starting collection of percent grass curing data from CWFIS for {today}.")
with tempfile.TemporaryDirectory() as temp_dir:
await self._get_grass_curing_wcs_raster(temp_dir)
self._reproject_to_epsg_4326(temp_dir)

Check warning on line 110 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L105-L110

Added lines #L105 - L110 were not covered by tests

# Open the reprojected grass curing data
raster = gdal.Open(f"{temp_dir}/{GRASS_CURING_FILE_NAME_4326}")
stations = await get_stations_asynchronously()
for station, value in self._yield_value_for_stations(raster, stations):
percent_grass_curing = PercentGrassCuring(for_date=today,

Check warning on line 116 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L113-L116

Added lines #L113 - L116 were not covered by tests
percent_grass_curing=value,
station_code=station )
await save_percent_grass_curing(session, percent_grass_curing)
logger.info(f"Finished processing percent grass curing data from CFWIS for {today}.")

Check warning on line 120 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L119-L120

Added lines #L119 - L120 were not covered by tests


async def _run_grass_curing(self):
""" Entry point for running the job. """
last_processed = await self._get_last_for_date()
if last_processed is None or last_processed.date() < date.today():
await self._process_grass_curing()

Check warning on line 127 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L127

Added line #L127 was not covered by tests
else:
logger.info(f"Percent grass curing processing is up to date as of {date.today()}")


def main():
""" Kicks off asynchronous processing of VIIRS snow coverage data.
"""
try:
# We don't want gdal to silently swallow errors.
gdal.UseExceptions()
logger.debug('Begin processing VIIRS snow coverage data.')

bot = GrassCuringJob()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(bot._run_grass_curing())

# Exit with 0 - success.
sys.exit(os.EX_OK)
except Exception as exception:
# Exit non 0 - failure.
logger.error("An error occurred while processing VIIRS snow coverage data.", exc_info=exception)
dgboss marked this conversation as resolved.
Show resolved Hide resolved
rc_message = ':scream: Encountered an error while processing VIIRS snow data.'
dgboss marked this conversation as resolved.
Show resolved Hide resolved
send_rocketchat_notification(rc_message, exception)
sys.exit(os.EX_SOFTWARE)

if __name__ == '__main__':
configure_logging()
main()

Check warning on line 156 in api/app/jobs/grass_curing.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/grass_curing.py#L155-L156

Added lines #L155 - L156 were not covered by tests
43 changes: 43 additions & 0 deletions api/app/tests/jobs/test_grass_curing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
""" Unit testing for VIIRS snow data processing """
dgboss marked this conversation as resolved.
Show resolved Hide resolved

import os
import pytest
from datetime import datetime
from pytest_mock import MockerFixture
from app.jobs import grass_curing
from app.jobs.grass_curing import GrassCuringJob


def test_grass_curing_job_fail(mocker: MockerFixture,
monkeypatch):
"""
Test that when the bot fails, a message is sent to rocket-chat, and our exit code is 1.
"""

async def mock__run_grass_curing(self):
raise OSError("Error")

monkeypatch.setattr(GrassCuringJob, '_run_grass_curing', mock__run_grass_curing)
rocket_chat_spy = mocker.spy(grass_curing, 'send_rocketchat_notification')

with pytest.raises(SystemExit) as excinfo:
grass_curing.main()
# Assert that we exited with an error code.
assert excinfo.value.code == os.EX_SOFTWARE
# Assert that rocket chat was called.
assert rocket_chat_spy.call_count == 1


def test_grass_curing_job_exits_without_error_when_no_work_required(monkeypatch):
""" Test that grass_curing_job exits without error when no data needs to be processed.
"""
async def mock__get_last_for_date(self):
return datetime.now()

monkeypatch.setattr(GrassCuringJob, '_get_last_for_date', mock__get_last_for_date)

with pytest.raises(SystemExit) as excinfo:
grass_curing.main()
# Assert that we exited with an error code.
assert excinfo.value.code == os.EX_OK

2 changes: 2 additions & 0 deletions openshift/scripts/oc_deploy_to_production.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ echo C-Haines
PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_c_haines_cronjob.sh prod ${RUN_TYPE}
echo VIIRS Snow
PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_viirs_snow_cronjob.sh prod ${RUN_TYPE}
echo Grass Curing
PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_grass_curing_cronjob.sh prod ${RUN_TYPE}
echo BC FireWeather cronjobs
echo "Run forecast at 8h30 PDT and 16h30 PDT (so before and after noon)"
PROJ_TARGET=${PROJ_TARGET} SCHEDULE="30 * * * *" bash $(dirname ${0})/oc_provision_wfwx_noon_forecasts_cronjob.sh prod ${RUN_TYPE}
Expand Down
58 changes: 58 additions & 0 deletions openshift/scripts/oc_provision_grass_curing_cronjob.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/bin/sh -l
#
source "$(dirname ${0})/common/common"

#%
#% OpenShift Deploy Helper
#%
#% Intended for use with a pull request-based pipeline.
#% Suffixes incl.: pr-###.
#%
#% Usage:
#%
#% ${THIS_FILE} [SUFFIX] [apply]
#%
#% Examples:
#%
#% Provide a PR number. Defaults to a dry-run.
#% ${THIS_FILE} pr-0
#%
#% Apply when satisfied.
#% ${THIS_FILE} pr-0 apply
#%

# Target project override for Dev or Prod deployments
#
PROJ_TARGET="${PROJ_TARGET:-${PROJ_DEV}}"

# Specify a default schedule to run daily at 5am-ish
SCHEDULE="${SCHEDULE:-$((3 + $RANDOM % 54)) 5 * * *}"

# Process template
OC_PROCESS="oc -n ${PROJ_TARGET} process -f ${TEMPLATE_PATH}/grass_curing.cronjob.yaml \
-p JOB_NAME=grass-curing-${APP_NAME}-${SUFFIX} \
-p APP_LABEL=${APP_NAME}-${SUFFIX} \
-p NAME=${APP_NAME} \
-p SUFFIX=${SUFFIX} \
-p SCHEDULE=\"${SCHEDULE}\" \
-p POSTGRES_DATABASE=${POSTGRES_DATABASE:-${APP_NAME}} \
-p POSTGRES_USER=wps-crunchydb-${SUFFIX} \
-p POSTGRES_WRITE_HOST=wps-crunchydb-${SUFFIX}-primary \
-p POSTGRES_READ_HOST=wps-crunchydb-${SUFFIX}-primary \
-p CRUNCHYDB_USER=wps-crunchydb-${SUFFIX}-pguser-wps-crunchydb-${SUFFIX} \
${PROJ_TOOLS:+ "-p PROJ_TOOLS=${PROJ_TOOLS}"} \
${IMAGE_REGISTRY:+ "-p IMAGE_REGISTRY=${IMAGE_REGISTRY}"}"

# Apply template (apply or use --dry-run)
#
OC_APPLY="oc -n ${PROJ_TARGET} apply -f -"
[ "${APPLY}" ] || OC_APPLY="${OC_APPLY} --dry-run"

# Execute commands
#
eval "${OC_PROCESS}"
eval "${OC_PROCESS} | ${OC_APPLY}"

# Provide oc command instruction
#
display_helper "${OC_PROCESS} | ${OC_APPLY}"
Loading
Loading