diff --git a/.github/workflows/deployment.yml b/.github/workflows/deployment.yml index 46d41dc88..eb4d00e52 100644 --- a/.github/workflows/deployment.yml +++ b/.github/workflows/deployment.yml @@ -175,6 +175,12 @@ jobs: oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}" PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_viirs_snow_cronjob.sh ${SUFFIX} apply + - name: GRASS CURING cronjob + shell: bash + run: | + oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}" + PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_grass_curing_cronjob.sh ${SUFFIX} apply + # TODO: Delete once pmtiles has run for some time # deploy-tileserv: # name: Deploy tileserv to Dev diff --git a/api/alembic/versions/2b3755392ad8_percent_grass_curing.py b/api/alembic/versions/2b3755392ad8_percent_grass_curing.py new file mode 100644 index 000000000..94244636d --- /dev/null +++ b/api/alembic/versions/2b3755392ad8_percent_grass_curing.py @@ -0,0 +1,41 @@ +"""Percent grass curing + +Revision ID: 2b3755392ad8 +Revises: 5845f568a975 +Create Date: 2024-02-01 16:45:05.914743 + +""" +from alembic import op +import sqlalchemy as sa +from app.db.models.common import TZTimeStamp + +# revision identifiers, used by Alembic. +revision = '2b3755392ad8' +down_revision = '5845f568a975' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic ### + op.create_table('percent_grass_curing', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('station_code', sa.Integer(), nullable=False), + sa.Column('percent_grass_curing', sa.Float(), nullable=False), + sa.Column('for_date', TZTimeStamp(), nullable=False), + sa.PrimaryKeyConstraint('id'), + comment='Record containing information about percent grass curing from the CFWIS.' + ) + op.create_index(op.f('ix_percent_grass_curing_for_date'), 'percent_grass_curing', ['for_date'], unique=False) + op.create_index(op.f('ix_percent_grass_curing_id'), 'percent_grass_curing', ['id'], unique=False) + op.create_index(op.f('ix_percent_grass_curing_station_code'), 'percent_grass_curing', ['station_code'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic ### + op.drop_index(op.f('ix_percent_grass_curing_station_code'), table_name='percent_grass_curing') + op.drop_index(op.f('ix_percent_grass_curing_id'), table_name='percent_grass_curing') + op.drop_index(op.f('ix_percent_grass_curing_for_date'), table_name='percent_grass_curing') + op.drop_table('percent_grass_curing') + # ### end Alembic commands ### diff --git a/api/app/db/crud/grass_curing.py b/api/app/db/crud/grass_curing.py new file mode 100644 index 000000000..4be14d804 --- /dev/null +++ b/api/app/db/crud/grass_curing.py @@ -0,0 +1,28 @@ +""" CRUD operations relating to processing grass curing +""" +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.sql import func +from app.db.models.grass_curing import PercentGrassCuring + + +async def save_percent_grass_curing(session: AsyncSession, percent_grass_curing: PercentGrassCuring): + """ Add a new PercentGrassCuring record. + + :param session: A session object for asynchronous database access. + :type session: AsyncSession + :param percent_grass_curing: The record to be saved. + :type percent_grass_curing: PercentGrassCuring + """ + session.add(percent_grass_curing) + + +async def get_last_percent_grass_curing_for_date(session: AsyncSession): + """ Get the last date for which a PercentGrassCuring record exists. + + :param session: A session object for asynchronous database access. + :type session: AsyncSession + """ + stmt = select(func.max(PercentGrassCuring.for_date)) + result = await session.execute(stmt) + return result.scalar() diff --git a/api/app/db/models/__init__.py b/api/app/db/models/__init__.py index 7e0721833..91cae4d5f 100644 --- a/api/app/db/models/__init__.py +++ b/api/app/db/models/__init__.py @@ -17,3 +17,4 @@ ClassifiedHfi, RunTypeEnum, ShapeTypeEnum, FuelType, HighHfiArea, RunParameters) from app.db.models.morecast_v2 import MorecastForecastRecord from app.db.models.snow import ProcessedSnow, SnowSourceEnum +from app.db.models.grass_curing import PercentGrassCuring diff --git a/api/app/db/models/grass_curing.py b/api/app/db/models/grass_curing.py new file mode 100644 index 000000000..fef9336c0 --- /dev/null +++ b/api/app/db/models/grass_curing.py @@ -0,0 +1,18 @@ +from app.db.models import Base +from sqlalchemy import (Column, Float, Integer, Sequence) +from app.db.models.common import TZTimeStamp + + +class PercentGrassCuring(Base): + """ Daily percent grass curing per weather station. """ + __tablename__ = 'percent_grass_curing' + __table_args__ = ( + {'comment': 'Record containing information about percent grass curing from the CFWIS.'} + ) + + id = Column(Integer, Sequence('percent_grass_curing_id_seq'), + primary_key=True, nullable=False, index=True) + station_code = Column(Integer, nullable=False, index=True) + percent_grass_curing = Column(Float, nullable=False, index=False) + for_date = Column(TZTimeStamp, nullable=False, index=True) + \ No newline at end of file diff --git a/api/app/jobs/grass_curing.py b/api/app/jobs/grass_curing.py new file mode 100644 index 000000000..d8be00502 --- /dev/null +++ b/api/app/jobs/grass_curing.py @@ -0,0 +1,156 @@ +from affine import Affine +from datetime import date +from osgeo import gdal +import asyncio +import logging +import math +import numpy as np +import os +import requests +import sys +import tempfile +import xml.etree.ElementTree as ET +from app import configure_logging +from app.db.crud.grass_curing import get_last_percent_grass_curing_for_date, save_percent_grass_curing +from app.db.database import get_async_read_session_scope, get_async_write_session_scope +from app.geospatial import WGS84 +from app.db.models.grass_curing import PercentGrassCuring +from app.rocketchat_notifications import send_rocketchat_notification +from app.stations import get_stations_asynchronously + +logger = logging.getLogger(__name__) + +GRASS_CURING_FILE_NAME_3978 = "grass_curing_epsg_3978.tif" +GRASS_CURING_FILE_NAME_4326 = "grass_curing_epsg_4326.tif" +GRASS_CURING_COVERAGE_ID = "public:pc_current" +WCS_URL = "https://cwfis.cfs.nrcan.gc.ca/geoserver/public/wcs" + + +class OwsException(Exception): + """ Raise when OWS service returns an exception report.""" + + +class GrassCuringJob(): + """ Job that downloads and processes percent grass curing data from the CWFIS. """ + + async def _get_grass_curing_wcs_raster(self, path: str): + """ Gets the current percent grass curing as a tif from the CFWIS Web Coverage Service. """ + session = requests.session() + param_dict= { + "service": "WCS", + "version": "2.0.0", + "request": "GetCoverage", + "format": "image/geotiff", + "coverageId": GRASS_CURING_COVERAGE_ID + } + response = session.get(WCS_URL, params = param_dict) + # Check HTTP response code for error condition + response.raise_for_status() + # GeoServer can return an exception report as xml within a 200 response. Check if we have a content type header + # of application/xml and check if an exception is present. + if "Content-Type" in response.headers and response.headers["Content-Type"] == "application/xml": + root_element = ET.fromstring(response.content) + if "ExceptionReport" in root_element.tag: + message = f"GeoServer returned an exception when attempting to download percent grass curing from URL: {response.request.url}" + logger.error(message) + raise OwsException(message) + file_path = os.path.join(path, GRASS_CURING_FILE_NAME_3978) + with open(file_path, 'wb') as file: + file.write(response.content) + + + def _reproject_to_epsg_4326(self, path: str): + """ Transforms coordinates in source_file geotiff to EPSG:3005 (BC Albers), + writes to newly created geotiff called .tif + + :param path: A path to a temporary directory containing the percent grass curing tif. + """ + destination_path = f"{path}/{GRASS_CURING_FILE_NAME_4326}" + source_path = f"{path}/{GRASS_CURING_FILE_NAME_3978}" + source_data = gdal.Open(source_path, gdal.GA_ReadOnly) + gdal.Warp(destination_path, source_data, dstSRS=WGS84) + del source_data + + + def _yield_value_for_stations(self, data_source, stations): + """ Given a list of stations, and a gdal dataset, yield the grass curing value for each station + + :param data_source: A gdal representation of a raster image of percent grass curing. + :param stations: A list of weather station objects. + :return: A tuple of a weather station code and the percent grass curing at its location. + """ + raster_band = data_source.GetRasterBand(1) + data_np_array = raster_band.ReadAsArray() + data_np_array = np.array(data_np_array) + forward_transform = Affine.from_gdal(*data_source.GetGeoTransform()) + reverse_transform = ~forward_transform + for station in stations: + longitude = station.long + latitude = station.lat + px, py = reverse_transform * (longitude, latitude) + px = math.floor(px) + py = math.floor(py) + yield (station.code, data_np_array[py][px]) + + + async def _get_last_for_date(self): + """ Get the date of the most recently processed percent grass curing data.""" + async with get_async_read_session_scope() as session: + result = await get_last_percent_grass_curing_for_date(session) + return result + + + async def _process_grass_curing(self): + """ Download and process percent grass curing data. """ + async with get_async_write_session_scope() as session: + today = date.today() + logger.info(f"Starting collection of percent grass curing data from CWFIS for {today}.") + with tempfile.TemporaryDirectory() as temp_dir: + await self._get_grass_curing_wcs_raster(temp_dir) + self._reproject_to_epsg_4326(temp_dir) + + # Open the reprojected grass curing data + raster = gdal.Open(f"{temp_dir}/{GRASS_CURING_FILE_NAME_4326}") + stations = await get_stations_asynchronously() + for station, value in self._yield_value_for_stations(raster, stations): + percent_grass_curing = PercentGrassCuring(for_date=today, + percent_grass_curing=value, + station_code=station ) + await save_percent_grass_curing(session, percent_grass_curing) + logger.info(f"Finished processing percent grass curing data from CFWIS for {today}.") + + + async def _run_grass_curing(self): + """ Entry point for running the job. """ + last_processed = await self._get_last_for_date() + if last_processed is None or last_processed.date() < date.today(): + await self._process_grass_curing() + else: + logger.info(f"Percent grass curing processing is up to date as of {date.today()}") + + +def main(): + """ Kicks off asynchronous processing of VIIRS snow coverage data. + """ + try: + # We don't want gdal to silently swallow errors. + gdal.UseExceptions() + logger.debug('Begin processing VIIRS snow coverage data.') + + bot = GrassCuringJob() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(bot._run_grass_curing()) + + # Exit with 0 - success. + sys.exit(os.EX_OK) + except Exception as exception: + # Exit non 0 - failure. + logger.error("An error occurred while processing CWFIS grass curing data.", exc_info=exception) + rc_message = ':scream: Encountered an error while processing CWFIS grass curing data.' + send_rocketchat_notification(rc_message, exception) + sys.exit(os.EX_SOFTWARE) + +if __name__ == '__main__': + configure_logging() + main() \ No newline at end of file diff --git a/api/app/tests/jobs/test_grass_curing.py b/api/app/tests/jobs/test_grass_curing.py new file mode 100644 index 000000000..c29cb9612 --- /dev/null +++ b/api/app/tests/jobs/test_grass_curing.py @@ -0,0 +1,43 @@ +""" Unit testing for CWFIS grass curing data processing """ + +import os +import pytest +from datetime import datetime +from pytest_mock import MockerFixture +from app.jobs import grass_curing +from app.jobs.grass_curing import GrassCuringJob + + +def test_grass_curing_job_fail(mocker: MockerFixture, + monkeypatch): + """ + Test that when the bot fails, a message is sent to rocket-chat, and our exit code is 1. + """ + + async def mock__run_grass_curing(self): + raise OSError("Error") + + monkeypatch.setattr(GrassCuringJob, '_run_grass_curing', mock__run_grass_curing) + rocket_chat_spy = mocker.spy(grass_curing, 'send_rocketchat_notification') + + with pytest.raises(SystemExit) as excinfo: + grass_curing.main() + # Assert that we exited with an error code. + assert excinfo.value.code == os.EX_SOFTWARE + # Assert that rocket chat was called. + assert rocket_chat_spy.call_count == 1 + + +def test_grass_curing_job_exits_without_error_when_no_work_required(monkeypatch): + """ Test that grass_curing_job exits without error when no data needs to be processed. + """ + async def mock__get_last_for_date(self): + return datetime.now() + + monkeypatch.setattr(GrassCuringJob, '_get_last_for_date', mock__get_last_for_date) + + with pytest.raises(SystemExit) as excinfo: + grass_curing.main() + # Assert that we exited with an error code. + assert excinfo.value.code == os.EX_OK + \ No newline at end of file diff --git a/openshift/scripts/oc_deploy_to_production.sh b/openshift/scripts/oc_deploy_to_production.sh index 19cbc746f..362b1103e 100755 --- a/openshift/scripts/oc_deploy_to_production.sh +++ b/openshift/scripts/oc_deploy_to_production.sh @@ -55,6 +55,8 @@ echo C-Haines PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_c_haines_cronjob.sh prod ${RUN_TYPE} echo VIIRS Snow PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_viirs_snow_cronjob.sh prod ${RUN_TYPE} +echo Grass Curing +PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_grass_curing_cronjob.sh prod ${RUN_TYPE} echo BC FireWeather cronjobs echo "Run forecast at 8h30 PDT and 16h30 PDT (so before and after noon)" PROJ_TARGET=${PROJ_TARGET} SCHEDULE="30 * * * *" bash $(dirname ${0})/oc_provision_wfwx_noon_forecasts_cronjob.sh prod ${RUN_TYPE} diff --git a/openshift/scripts/oc_provision_grass_curing_cronjob.sh b/openshift/scripts/oc_provision_grass_curing_cronjob.sh new file mode 100644 index 000000000..097b5c667 --- /dev/null +++ b/openshift/scripts/oc_provision_grass_curing_cronjob.sh @@ -0,0 +1,58 @@ +#!/bin/sh -l +# +source "$(dirname ${0})/common/common" + +#% +#% OpenShift Deploy Helper +#% +#% Intended for use with a pull request-based pipeline. +#% Suffixes incl.: pr-###. +#% +#% Usage: +#% +#% ${THIS_FILE} [SUFFIX] [apply] +#% +#% Examples: +#% +#% Provide a PR number. Defaults to a dry-run. +#% ${THIS_FILE} pr-0 +#% +#% Apply when satisfied. +#% ${THIS_FILE} pr-0 apply +#% + +# Target project override for Dev or Prod deployments +# +PROJ_TARGET="${PROJ_TARGET:-${PROJ_DEV}}" + +# Specify a default schedule to run daily at 5am-ish +SCHEDULE="${SCHEDULE:-$((3 + $RANDOM % 54)) 5 * * *}" + +# Process template +OC_PROCESS="oc -n ${PROJ_TARGET} process -f ${TEMPLATE_PATH}/grass_curing.cronjob.yaml \ +-p JOB_NAME=grass-curing-${APP_NAME}-${SUFFIX} \ +-p APP_LABEL=${APP_NAME}-${SUFFIX} \ +-p NAME=${APP_NAME} \ +-p SUFFIX=${SUFFIX} \ +-p SCHEDULE=\"${SCHEDULE}\" \ +-p POSTGRES_DATABASE=${POSTGRES_DATABASE:-${APP_NAME}} \ +-p POSTGRES_USER=wps-crunchydb-${SUFFIX} \ +-p POSTGRES_WRITE_HOST=wps-crunchydb-${SUFFIX}-primary \ +-p POSTGRES_READ_HOST=wps-crunchydb-${SUFFIX}-primary \ +-p CRUNCHYDB_USER=wps-crunchydb-${SUFFIX}-pguser-wps-crunchydb-${SUFFIX} \ +${PROJ_TOOLS:+ "-p PROJ_TOOLS=${PROJ_TOOLS}"} \ +${IMAGE_REGISTRY:+ "-p IMAGE_REGISTRY=${IMAGE_REGISTRY}"}" + +# Apply template (apply or use --dry-run) +# +OC_APPLY="oc -n ${PROJ_TARGET} apply -f -" +[ "${APPLY}" ] || OC_APPLY="${OC_APPLY} --dry-run" + +# Execute commands +# +eval "${OC_PROCESS}" +eval "${OC_PROCESS} | ${OC_APPLY}" + +# Provide oc command instruction +# +display_helper "${OC_PROCESS} | ${OC_APPLY}" diff --git a/openshift/templates/grass_curing.cronjob.yaml b/openshift/templates/grass_curing.cronjob.yaml new file mode 100644 index 000000000..7d1f45170 --- /dev/null +++ b/openshift/templates/grass_curing.cronjob.yaml @@ -0,0 +1,113 @@ +kind: Template +apiVersion: template.openshift.io/v1 +metadata: + name: ${JOB_NAME}-cronjob-template + annotations: + description: "Scheduled task to download and process percent grass curing data from the Canadian Wildland Fire Information System." + tags: "cronjob,grass curing" +labels: + app.kubernetes.io/part-of: "${NAME}" + app: ${NAME}-${SUFFIX} +parameters: + - name: NAME + description: Module name + value: wps + - name: GLOBAL_NAME + description: Name of global Module + value: wps-global + - name: SUFFIX + description: Deployment suffix, e.g. pr-### + required: true + - name: PROJ_TOOLS + value: e1e498-tools + - name: JOB_NAME + value: grass-curing + - name: IMAGE_REGISTRY + required: true + value: image-registry.openshift-image-registry.svc:5000 + - name: POSTGRES_WRITE_HOST + required: true + - name: POSTGRES_READ_HOST + required: true + - name: POSTGRES_USER + required: true + - name: POSTGRES_DATABASE + required: true + - name: CRUNCHYDB_USER + required: true + - name: SCHEDULE + required: true + - name: APP_LABEL + required: true +objects: + - kind: CronJob + apiVersion: batch/v1 + metadata: + name: ${JOB_NAME} + spec: + schedule: ${SCHEDULE} + # We use the "Replace" policy, because we never want the cronjobs to run concurrently, + # and if for whatever reason a cronjob gets stuck, we want the next run to proceed. + # If we were to use Forbid, and a cronjob gets stuck, then we'd stop gathering data until someone + # noticed. We don't want that. + concurrencyPolicy: "Replace" + jobTemplate: + metadata: + labels: + cronjob: ${JOB_NAME} + app: ${APP_LABEL} + spec: + template: + spec: + containers: + - name: ${JOB_NAME} + image: ${IMAGE_REGISTRY}/${PROJ_TOOLS}/${NAME}-api-${SUFFIX}:${SUFFIX} + imagePullPolicy: "Always" + command: + ["poetry", "run", "python", "-m", "app.jobs.grass_curing"] + env: + - name: POSTGRES_READ_USER + value: ${POSTGRES_USER} + - name: POSTGRES_WRITE_USER + value: ${POSTGRES_USER} + - name: POSTGRES_PASSWORD + valueFrom: + secretKeyRef: + name: ${CRUNCHYDB_USER} + key: password + - name: POSTGRES_WRITE_HOST + value: ${POSTGRES_WRITE_HOST} + - name: POSTGRES_READ_HOST + value: ${POSTGRES_READ_HOST} + - name: POSTGRES_PORT + value: "5432" + - name: POSTGRES_DATABASE + value: ${POSTGRES_DATABASE} + - name: ROCKET_URL_POST_MESSAGE + valueFrom: + configMapKeyRef: + name: ${GLOBAL_NAME} + key: rocket.chat-url-post-message + - name: ROCKET_CHANNEL + valueFrom: + configMapKeyRef: + name: ${GLOBAL_NAME} + key: rocket.chat-channel + - name: ROCKET_USER_ID + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: rocket.chat-user-id-secret + - name: ROCKET_AUTH_TOKEN + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: rocket.chat-auth-token-secret + resources: + limits: + cpu: "1" + memory: 1024Mi + requests: + cpu: "500m" + memory: 512Mi + restartPolicy: OnFailure