Skip to content

Commit

Permalink
Add /notifications endpoint and /notifications/create
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox committed Nov 9, 2024
1 parent 640729c commit 4811ad1
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 85 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

### 🚀 New

* Add the `/logs/notifications/{mjd}` route to retrieve the Overwatcher notifications for a given MJD.
* Create a new router `/notifications` to create and retrieve notifications. New notifications are sent over Slack or email depending on parameters and the notification level.
* Add notifications section to night log email and plain-text version.


Expand Down
2 changes: 2 additions & 0 deletions src/lvmapi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
kubernetes,
logs,
macros,
notifications,
overwatcher,
slack,
spectrographs,
Expand All @@ -49,6 +50,7 @@
app.include_router(kubernetes.router)
app.include_router(actors.router)
app.include_router(logs.router)
app.include_router(notifications.router)


@app.get("/id")
Expand Down
4 changes: 3 additions & 1 deletion src/lvmapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ influxdb:

slack:
token: null
channel: lvm-notifications
default_channel: lvm-notifications
level_channels:
CRITICAL: lvm-critical

database:
uri: postgresql://sdss_user@10.8.38.26:5432/lvmdb
Expand Down
35 changes: 3 additions & 32 deletions src/lvmapi/routers/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from sdsstools import get_sjd

from lvmapi.routers.notifications import Notification
from lvmapi.tasks import get_exposure_data_task
from lvmapi.tools.logs import (
add_night_log_comment,
Expand All @@ -29,7 +30,6 @@
get_exposures,
get_night_log_data,
get_night_log_mjds,
get_notifications,
get_plaintext_night_log,
get_spectro_mjds,
)
Expand Down Expand Up @@ -104,18 +104,6 @@ class NightLogPostComment(BaseModel):
] = None


class Notification(BaseModel):
"""An Overwatcher notification."""

date: Annotated[datetime, Field(description="The notification datetime.")]
message: Annotated[str, Field(description="The notification message.")]
level: Annotated[str, Field(description="The level of the notification.")]
payload: Annotated[
dict | None,
Field(description="The payload of the notification."),
] = None


router = APIRouter(prefix="/logs", tags=["logs"])


Expand Down Expand Up @@ -174,25 +162,6 @@ async def route_get_exposures(
return list(map(str, exposures))


@router.get("/notifications/{mjd}", summary="Returns notifications for an MJD.")
async def route_get_notifications(
mjd: Annotated[
int,
Path(description="The SJD for which to list notifications. 0 for current SJD."),
],
) -> list[Notification]:
"""Returns a list of notifications for an MJD."""

mjd = mjd if mjd > 0 else get_sjd("LCO")
notifications = await get_notifications(mjd)

return [
Notification(**notification)
for notification in notifications
if notification["message"] != "I am alive!"
]


@router.get("/night-logs", summary="List of night log MJDs.")
async def route_get_night_logs():
"""Returns a list of MJDs with night log data."""
Expand Down Expand Up @@ -247,6 +216,8 @@ async def route_get_night_logs_mjd(
):
"""Returns the night log data for an MJD."""

from .notifications import route_get_notifications

mjd = mjd if mjd > 0 else get_sjd("LCO")
data = await get_night_log_data(mjd)

Expand Down
115 changes: 115 additions & 0 deletions src/lvmapi/routers/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2024-11-09
# @Filename: notifications.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import datetime

from typing import Annotated, Any

from fastapi import APIRouter, HTTPException, Path
from pydantic import BaseModel, Field

from sdsstools import get_sjd

from lvmapi.tools.notifications import (
NotificationLevel,
create_notification,
get_notifications,
)


router = APIRouter(prefix="/notifications", tags=["notifications"])


class Notification(BaseModel):
"""An Overwatcher notification."""

date: Annotated[
datetime.datetime,
Field(description="The notification datetime."),
]
message: Annotated[
str,
Field(description="The notification message."),
]
level: Annotated[
str,
Field(description="The level of the notification."),
]
payload: Annotated[
dict | None,
Field(description="The payload of the notification."),
] = None


class NotificationPost(BaseModel):
"""Model for the ``/notifications/create`` endpoint payload."""

message: Annotated[
str,
Field(description="The notification message."),
]
level: Annotated[
str,
Field(description="The level of the notification."),
] = "INFO"
payload: Annotated[
dict[str, Any], Field(description="The payload of the notification.")
] = {}
slack_channel: Annotated[
str | bool | None,
Field(description="The Slack channel where to send the message."),
] = None
email_on_critical: Annotated[
bool, Field(description="Whether to send an email if the level is CRITICAL.")
] = True
write_to_database: Annotated[
bool, Field(description="Whether to write the notification to the database.")
] = True
slack_extra_params: Annotated[
dict[str, Any],
Field(description="Extra parameters to pass to the Slack message."),
] = {}


@router.get("/{mjd}", summary="Returns notifications for an MJD.")
async def route_get_notifications(
mjd: Annotated[
int,
Path(description="The SJD for which to list notifications. 0 for current SJD."),
],
) -> list[Notification]:
"""Returns a list of notifications for an MJD."""

mjd = mjd if mjd > 0 else get_sjd("LCO")
notifications = await get_notifications(mjd)

return [
Notification(**notification)
for notification in notifications
if notification["message"] != "I am alive!"
]


@router.post("/create", summary="Create a new notification.")
async def route_post_create_notification(notification: NotificationPost):
"""Creates a new notification, optionally emitting Slack and email messages."""

try:
await create_notification(
notification.message,
level=NotificationLevel(notification.level.upper()),
payload=notification.payload,
slack_channel=notification.slack_channel,
email_on_critical=notification.email_on_critical,
write_to_database=notification.write_to_database,
slack_extra_params=notification.slack_extra_params,
)
except Exception as ee:
raise HTTPException(500, detail=str(ee))
2 changes: 1 addition & 1 deletion src/lvmapi/routers/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from lvmapi import config


default_channel = config["slack.channel"]
default_channel = config["slack.default_channel"]
default_user: str = "LVM"


Expand Down
74 changes: 74 additions & 0 deletions src/lvmapi/tools/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
import functools
from datetime import datetime, timedelta

from typing import Any

import psycopg
import psycopg.sql

from lvmapi import config


__all__ = ["timed_cache"]

Expand Down Expand Up @@ -49,3 +56,70 @@ def _wrapped(*args, **kwargs):
return _wrapped

return _wrapper


def get_db_connect():
"""Returns a connection to the database."""

uri = config["database.uri"]

return psycopg.AsyncConnection.connect(uri)


async def insert_to_database(
table_name: str,
data: list[dict[str, Any]],
columns: list[str] | None = None,
):
"""Inserts data into the database.
Parameters
----------
table_name
The table in the database where to insert the data. Can be in the format
``schema.table_name``.
data
The data to ingest, as a list of dictionaries in which each dictionary
is a mapping of column name in ``table`` to the value to ingest.
columns
A list of table columns. If not passed, the column names are inferred from
the first element in the data. In this case you must ensure that all the
elements in the data contain entries for all the columns (use :obj:`None`
to fill missing data).
"""

if len(data) == 0:
return

columns = columns or list(data[0].keys())

schema: str | None
table: psycopg.sql.Identifier
if "." in table_name:
schema, table_name = table_name.split(".")
table = psycopg.sql.Identifier(schema, table_name)
else:
table = psycopg.sql.Identifier(table_name)

columns_sql = [psycopg.sql.Identifier(col) for col in columns]

column_placeholders = ("{}, " * len(columns))[0:-2]
values_placeholders = ("%s, " * len(columns))[0:-2]

query = psycopg.sql.SQL(
"INSERT INTO {} ("
+ column_placeholders
+ ") VALUES ("
+ values_placeholders
+ ");"
).format(
table,
*columns_sql,
)

async with await get_db_connect() as aconn:
async with aconn.cursor() as acursor:
for row in data:
values = [row.get(col, None) for col in columns]
await acursor.execute(query, values)
50 changes: 1 addition & 49 deletions src/lvmapi/tools/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import psycopg
from astropy.time import Time
from jinja2 import Environment, FileSystemLoader
from psycopg.rows import dict_row
from psycopg.sql import SQL, Identifier
from pydantic import BaseModel

from sdsstools import get_sjd, run_in_executor

from lvmapi import config
from lvmapi.tools.notifications import get_notifications
from lvmapi.tools.rabbitmq import CluClient
from lvmapi.tools.slack import post_message

Expand Down Expand Up @@ -724,51 +724,3 @@ async def email_night_log(
await post_message(
f"The night log for MJD {sjd} can be found <{lvmweb_url}|here>."
)


async def fill_notifications_mjd():
"""Fills MJD field for records in the notifications table.
This is a temporary function to fill the MJD field in the notifications table and
only needed due to a bug in the Overwatcher which was not filling the MJD field.
"""

uri = config["database.uri"]
table = Identifier(*config["database.tables.notification"].split("."))

# Start by getting a list of records with null MJD.
query = SQL("SELECT pk, date FROM {table} WHERE mjd IS NULL").format(table=table)
async with await psycopg.AsyncConnection.connect(uri) as aconn:
async with aconn.cursor() as acursor:
await acursor.execute(query)
records = await acursor.fetchall()

for pk, date in records:
sjd = get_sjd("LCO", date=date)
query = SQL("UPDATE {table} SET mjd = %s WHERE pk = %s").format(table=table)
async with await psycopg.AsyncConnection.connect(uri) as aconn:
async with aconn.cursor() as acursor:
await acursor.execute(query, (sjd, pk))
await aconn.commit()


async def get_notifications(sjd: int | None = None):
"""Returns the notifications for an SJD."""

sjd = sjd or get_sjd("LCO")

uri = config["database.uri"]
table = Identifier(*config["database.tables.notification"].split("."))

query = SQL("""
SELECT * FROM {table}
WHERE mjd = %s AND message != %s ORDER BY date ASC
""")

async with await psycopg.AsyncConnection.connect(uri) as aconn:
async with aconn.cursor(row_factory=dict_row) as acursor:
await acursor.execute(query.format(table=table), (sjd, "I am alive!"))
notifications = await acursor.fetchall()

return notifications
Loading

0 comments on commit 4811ad1

Please sign in to comment.