diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fb54f9..8a14f04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### 🚀 New -* Add the `/logs/notifications/{mjd}` route to retrieve the Overwatcher notifications for a given MJD. +* Create a new router `/notifications` to create and retrieve notifications. New notifications are sent over Slack or email depending on parameters and the notification level. * Add notifications section to night log email and plain-text version. diff --git a/src/lvmapi/app.py b/src/lvmapi/app.py index ebdc760..d0e6ea4 100644 --- a/src/lvmapi/app.py +++ b/src/lvmapi/app.py @@ -23,6 +23,7 @@ kubernetes, logs, macros, + notifications, overwatcher, slack, spectrographs, @@ -49,6 +50,7 @@ app.include_router(kubernetes.router) app.include_router(actors.router) app.include_router(logs.router) +app.include_router(notifications.router) @app.get("/id") diff --git a/src/lvmapi/config.yaml b/src/lvmapi/config.yaml index 85ed8f1..c9347e2 100644 --- a/src/lvmapi/config.yaml +++ b/src/lvmapi/config.yaml @@ -6,7 +6,9 @@ influxdb: slack: token: null - channel: lvm-notifications + default_channel: lvm-notifications + level_channels: + CRITICAL: lvm-critical database: uri: postgresql://sdss_user@10.8.38.26:5432/lvmdb diff --git a/src/lvmapi/routers/logs.py b/src/lvmapi/routers/logs.py index ac72e34..4d0a0d6 100644 --- a/src/lvmapi/routers/logs.py +++ b/src/lvmapi/routers/logs.py @@ -18,6 +18,7 @@ from sdsstools import get_sjd +from lvmapi.routers.notifications import Notification from lvmapi.tasks import get_exposure_data_task from lvmapi.tools.logs import ( add_night_log_comment, @@ -29,7 +30,6 @@ get_exposures, get_night_log_data, get_night_log_mjds, - get_notifications, get_plaintext_night_log, get_spectro_mjds, ) @@ -104,18 +104,6 @@ class NightLogPostComment(BaseModel): ] = None -class Notification(BaseModel): - """An Overwatcher notification.""" - - date: Annotated[datetime, Field(description="The notification datetime.")] - message: Annotated[str, Field(description="The notification message.")] - level: Annotated[str, Field(description="The level of the notification.")] - payload: Annotated[ - dict | None, - Field(description="The payload of the notification."), - ] = None - - router = APIRouter(prefix="/logs", tags=["logs"]) @@ -174,25 +162,6 @@ async def route_get_exposures( return list(map(str, exposures)) -@router.get("/notifications/{mjd}", summary="Returns notifications for an MJD.") -async def route_get_notifications( - mjd: Annotated[ - int, - Path(description="The SJD for which to list notifications. 0 for current SJD."), - ], -) -> list[Notification]: - """Returns a list of notifications for an MJD.""" - - mjd = mjd if mjd > 0 else get_sjd("LCO") - notifications = await get_notifications(mjd) - - return [ - Notification(**notification) - for notification in notifications - if notification["message"] != "I am alive!" - ] - - @router.get("/night-logs", summary="List of night log MJDs.") async def route_get_night_logs(): """Returns a list of MJDs with night log data.""" @@ -247,6 +216,8 @@ async def route_get_night_logs_mjd( ): """Returns the night log data for an MJD.""" + from .notifications import route_get_notifications + mjd = mjd if mjd > 0 else get_sjd("LCO") data = await get_night_log_data(mjd) diff --git a/src/lvmapi/routers/notifications.py b/src/lvmapi/routers/notifications.py new file mode 100644 index 0000000..f472831 --- /dev/null +++ b/src/lvmapi/routers/notifications.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# @Author: José Sánchez-Gallego (gallegoj@uw.edu) +# @Date: 2024-11-09 +# @Filename: notifications.py +# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) + +from __future__ import annotations + +import datetime + +from typing import Annotated, Any + +from fastapi import APIRouter, HTTPException, Path +from pydantic import BaseModel, Field + +from sdsstools import get_sjd + +from lvmapi.tools.notifications import ( + NotificationLevel, + create_notification, + get_notifications, +) + + +router = APIRouter(prefix="/notifications", tags=["notifications"]) + + +class Notification(BaseModel): + """An Overwatcher notification.""" + + date: Annotated[ + datetime.datetime, + Field(description="The notification datetime."), + ] + message: Annotated[ + str, + Field(description="The notification message."), + ] + level: Annotated[ + str, + Field(description="The level of the notification."), + ] + payload: Annotated[ + dict | None, + Field(description="The payload of the notification."), + ] = None + + +class NotificationPost(BaseModel): + """Model for the ``/notifications/create`` endpoint payload.""" + + message: Annotated[ + str, + Field(description="The notification message."), + ] + level: Annotated[ + str, + Field(description="The level of the notification."), + ] = "INFO" + payload: Annotated[ + dict[str, Any], Field(description="The payload of the notification.") + ] = {} + slack_channel: Annotated[ + str | bool | None, + Field(description="The Slack channel where to send the message."), + ] = None + email_on_critical: Annotated[ + bool, Field(description="Whether to send an email if the level is CRITICAL.") + ] = True + write_to_database: Annotated[ + bool, Field(description="Whether to write the notification to the database.") + ] = True + slack_extra_params: Annotated[ + dict[str, Any], + Field(description="Extra parameters to pass to the Slack message."), + ] = {} + + +@router.get("/{mjd}", summary="Returns notifications for an MJD.") +async def route_get_notifications( + mjd: Annotated[ + int, + Path(description="The SJD for which to list notifications. 0 for current SJD."), + ], +) -> list[Notification]: + """Returns a list of notifications for an MJD.""" + + mjd = mjd if mjd > 0 else get_sjd("LCO") + notifications = await get_notifications(mjd) + + return [ + Notification(**notification) + for notification in notifications + if notification["message"] != "I am alive!" + ] + + +@router.post("/create", summary="Create a new notification.") +async def route_post_create_notification(notification: NotificationPost): + """Creates a new notification, optionally emitting Slack and email messages.""" + + try: + await create_notification( + notification.message, + level=NotificationLevel(notification.level.upper()), + payload=notification.payload, + slack_channel=notification.slack_channel, + email_on_critical=notification.email_on_critical, + write_to_database=notification.write_to_database, + slack_extra_params=notification.slack_extra_params, + ) + except Exception as ee: + raise HTTPException(500, detail=str(ee)) diff --git a/src/lvmapi/routers/slack.py b/src/lvmapi/routers/slack.py index 474947d..a3fa352 100644 --- a/src/lvmapi/routers/slack.py +++ b/src/lvmapi/routers/slack.py @@ -17,7 +17,7 @@ from lvmapi import config -default_channel = config["slack.channel"] +default_channel = config["slack.default_channel"] default_user: str = "LVM" diff --git a/src/lvmapi/tools/general.py b/src/lvmapi/tools/general.py index 8e95aba..1aa9cc5 100644 --- a/src/lvmapi/tools/general.py +++ b/src/lvmapi/tools/general.py @@ -11,6 +11,13 @@ import functools from datetime import datetime, timedelta +from typing import Any + +import psycopg +import psycopg.sql + +from lvmapi import config + __all__ = ["timed_cache"] @@ -49,3 +56,70 @@ def _wrapped(*args, **kwargs): return _wrapped return _wrapper + + +def get_db_connect(): + """Returns a connection to the database.""" + + uri = config["database.uri"] + + return psycopg.AsyncConnection.connect(uri) + + +async def insert_to_database( + table_name: str, + data: list[dict[str, Any]], + columns: list[str] | None = None, +): + """Inserts data into the database. + + Parameters + ---------- + table_name + The table in the database where to insert the data. Can be in the format + ``schema.table_name``. + data + The data to ingest, as a list of dictionaries in which each dictionary + is a mapping of column name in ``table`` to the value to ingest. + columns + A list of table columns. If not passed, the column names are inferred from + the first element in the data. In this case you must ensure that all the + elements in the data contain entries for all the columns (use :obj:`None` + to fill missing data). + + """ + + if len(data) == 0: + return + + columns = columns or list(data[0].keys()) + + schema: str | None + table: psycopg.sql.Identifier + if "." in table_name: + schema, table_name = table_name.split(".") + table = psycopg.sql.Identifier(schema, table_name) + else: + table = psycopg.sql.Identifier(table_name) + + columns_sql = [psycopg.sql.Identifier(col) for col in columns] + + column_placeholders = ("{}, " * len(columns))[0:-2] + values_placeholders = ("%s, " * len(columns))[0:-2] + + query = psycopg.sql.SQL( + "INSERT INTO {} (" + + column_placeholders + + ") VALUES (" + + values_placeholders + + ");" + ).format( + table, + *columns_sql, + ) + + async with await get_db_connect() as aconn: + async with aconn.cursor() as acursor: + for row in data: + values = [row.get(col, None) for col in columns] + await acursor.execute(query, values) diff --git a/src/lvmapi/tools/logs.py b/src/lvmapi/tools/logs.py index ea3843d..d10835f 100644 --- a/src/lvmapi/tools/logs.py +++ b/src/lvmapi/tools/logs.py @@ -24,13 +24,13 @@ import psycopg from astropy.time import Time from jinja2 import Environment, FileSystemLoader -from psycopg.rows import dict_row from psycopg.sql import SQL, Identifier from pydantic import BaseModel from sdsstools import get_sjd, run_in_executor from lvmapi import config +from lvmapi.tools.notifications import get_notifications from lvmapi.tools.rabbitmq import CluClient from lvmapi.tools.slack import post_message @@ -724,51 +724,3 @@ async def email_night_log( await post_message( f"The night log for MJD {sjd} can be found <{lvmweb_url}|here>." ) - - -async def fill_notifications_mjd(): - """Fills MJD field for records in the notifications table. - - This is a temporary function to fill the MJD field in the notifications table and - only needed due to a bug in the Overwatcher which was not filling the MJD field. - - """ - - uri = config["database.uri"] - table = Identifier(*config["database.tables.notification"].split(".")) - - # Start by getting a list of records with null MJD. - query = SQL("SELECT pk, date FROM {table} WHERE mjd IS NULL").format(table=table) - async with await psycopg.AsyncConnection.connect(uri) as aconn: - async with aconn.cursor() as acursor: - await acursor.execute(query) - records = await acursor.fetchall() - - for pk, date in records: - sjd = get_sjd("LCO", date=date) - query = SQL("UPDATE {table} SET mjd = %s WHERE pk = %s").format(table=table) - async with await psycopg.AsyncConnection.connect(uri) as aconn: - async with aconn.cursor() as acursor: - await acursor.execute(query, (sjd, pk)) - await aconn.commit() - - -async def get_notifications(sjd: int | None = None): - """Returns the notifications for an SJD.""" - - sjd = sjd or get_sjd("LCO") - - uri = config["database.uri"] - table = Identifier(*config["database.tables.notification"].split(".")) - - query = SQL(""" - SELECT * FROM {table} - WHERE mjd = %s AND message != %s ORDER BY date ASC - """) - - async with await psycopg.AsyncConnection.connect(uri) as aconn: - async with aconn.cursor(row_factory=dict_row) as acursor: - await acursor.execute(query.format(table=table), (sjd, "I am alive!")) - notifications = await acursor.fetchall() - - return notifications diff --git a/src/lvmapi/tools/notifications.py b/src/lvmapi/tools/notifications.py new file mode 100644 index 0000000..eda04b8 --- /dev/null +++ b/src/lvmapi/tools/notifications.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# @Author: José Sánchez-Gallego (gallegoj@uw.edu) +# @Date: 2024-11-09 +# @Filename: notifications.py +# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) + +from __future__ import annotations + +import datetime +import enum +import json + +from typing import Any, cast + +import psycopg +from psycopg.rows import dict_row +from psycopg.sql import SQL, Identifier + +from sdsstools import get_sjd + +from lvmapi import config +from lvmapi.tools.general import insert_to_database +from lvmapi.tools.slack import post_message + + +__all__ = ["get_notifications"] + + +class NotificationLevel(enum.Enum): + """Allowed notification levels.""" + + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + CRITICAL = "CRITICAL" + + +async def fill_notifications_mjd(): + """Fills MJD field for records in the notifications table. + + This is a temporary function to fill the MJD field in the notifications table and + only needed due to a bug in the Overwatcher which was not filling the MJD field. + + """ + + uri = config["database.uri"] + table = Identifier(*config["database.tables.notification"].split(".")) + + # Start by getting a list of records with null MJD. + query = SQL("SELECT pk, date FROM {table} WHERE mjd IS NULL").format(table=table) + async with await psycopg.AsyncConnection.connect(uri) as aconn: + async with aconn.cursor() as acursor: + await acursor.execute(query) + records = await acursor.fetchall() + + for pk, date in records: + sjd = get_sjd("LCO", date=date) + query = SQL("UPDATE {table} SET mjd = %s WHERE pk = %s").format(table=table) + async with await psycopg.AsyncConnection.connect(uri) as aconn: + async with aconn.cursor() as acursor: + await acursor.execute(query, (sjd, pk)) + await aconn.commit() + + +async def get_notifications(sjd: int | None = None): + """Returns the notifications for an SJD.""" + + sjd = sjd or get_sjd("LCO") + + uri = config["database.uri"] + table = Identifier(*config["database.tables.notification"].split(".")) + + query = SQL(""" + SELECT * FROM {table} + WHERE mjd = %s AND message != %s ORDER BY date ASC + """) + + async with await psycopg.AsyncConnection.connect(uri) as aconn: + async with aconn.cursor(row_factory=dict_row) as acursor: + await acursor.execute(query.format(table=table), (sjd, "I am alive!")) + notifications = await acursor.fetchall() + + return notifications + + +async def create_notification( + message: str, + level: NotificationLevel | str = NotificationLevel.INFO, + payload: dict[str, Any] = {}, + slack_channel: str | bool | None = None, + email_on_critical: bool = True, + write_to_database: bool = True, + slack_extra_params: dict[str, Any] = {}, +): + """Creates a new notification. + + Parameters + ---------- + message + The message of the notification. Can be formatted in Markdown. + level + The level of the notification. + payload + A dictionary with additional information to be stored with the notification. + This data is not send over email or Slack, only stored in the database. + slack_channel + The Slack channel where to send the notification. If not provided, the default + channel is used. Can be set to false to disable sending the Slack notification. + email_on_critical + Whether to send an email if the notification level is ``CRITICAL``. + write_to_database + Whether to write the notification to the database. + slack_extra_params + A dictionary of extra parameters to pass to ``post_message``. + + """ + + date = datetime.datetime.now() + payload_str = json.dumps(payload) + level = NotificationLevel(level) + + table = config["database.tables.notification"] + + slack = slack_channel is not False + email = email_on_critical and level == NotificationLevel.CRITICAL + + if write_to_database: + await insert_to_database( + table, + [ + { + "date": date, + "mjd": get_sjd("LCO", date=date), + "message": message, + "level": level.value.lower(), + "payload": payload_str, + "slack": slack, + "email": email, + } + ], + columns=["date", "mjd", "message", "level", "payload", "slack", "email"], + ) + + if slack_channel is not False: + default_channel: str + if slack_channel is None or slack_channel is True: + default_channel = cast(str, config["slack.default_channel"]) + else: + default_channel = slack_channel + + # We send the message to the default channel plus any other channel that + # matches the level of the notification. + channels: set[str] = {default_channel} + + level_channels = cast(dict[str, str], config["slack.level_channels"]) + if level.value in level_channels: + channels.add(level_channels[level.value]) + + # Send Slack message(s) + for channel in channels: + mentions = ( + ["@channel"] + if level == NotificationLevel.CRITICAL + or level == NotificationLevel.ERROR + else [] + ) + await post_message( + message, + channel=channel, + mentions=mentions, + **slack_extra_params, + ) diff --git a/src/lvmapi/tools/slack.py b/src/lvmapi/tools/slack.py index eec85e4..a0bcd38 100644 --- a/src/lvmapi/tools/slack.py +++ b/src/lvmapi/tools/slack.py @@ -90,7 +90,7 @@ async def post_message( if text is not None and blocks is not None: raise ValueError("Cannot specify both text and blocks.") - channel = channel or config["slack.channel"] + channel = channel or config["slack.default_channel"] assert channel is not None client = get_api_client()