diff --git a/.pylintrc b/.pylintrc index b2481f6..e3c7327 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,2 +1,2 @@ [MASTER] -disable=fixme,logging-fstring-interpolation,too-many-arguments \ No newline at end of file +disable=fixme,logging-fstring-interpolation,too-many-arguments,too-few-public-methods \ No newline at end of file diff --git a/src/alert.py b/src/alert.py new file mode 100644 index 0000000..2934edb --- /dev/null +++ b/src/alert.py @@ -0,0 +1,31 @@ +""" +Enum for alert levels and data class of type +(AlertLevel, String) containing the alert message +""" +from __future__ import annotations +from dataclasses import dataclass +from enum import Enum + + +class AlertLevel(Enum): + """ + Alert Levels ranging from + None (the lowest) to Slack (the highest) + """ + + NONE = 0 + LOG = 1 + SLACK = 2 + + +@dataclass +class Alert: + """Encodes a "tuple" of AlertLevel with a message""" + + level: AlertLevel + message: str + + @classmethod + def default(cls) -> Alert: + """Default alert level is non with no message.""" + return Alert(AlertLevel.NONE, "") diff --git a/src/query_monitor/base.py b/src/query_monitor/base.py index 9fdc1c7..cfe2167 100644 --- a/src/query_monitor/base.py +++ b/src/query_monitor/base.py @@ -1,87 +1,57 @@ """ Abstract class containing Base/Default QueryMonitor attributes. """ -from __future__ import annotations - -import logging.config -from abc import ABC +from abc import ABC, abstractmethod +from dataclasses import dataclass from typing import Optional -from duneapi.api import DuneAPI from duneapi.types import QueryParameter, DuneRecord -from slack.web.client import WebClient -log = logging.getLogger(__name__) -logging.config.fileConfig(fname="logging.conf", disable_existing_loggers=False) +from src.alert import Alert + + +@dataclass +class QueryData: + """Basic data structure constituting a Dune Analytics Query.""" + + name: str + query_id: int + params: Optional[list[QueryParameter]] = None -class BaseQueryMonitor(ABC): +class QueryBase(ABC): """ - Abstract base class for Dune Query Monitoring. - Contains several default/fallback methods, + Base class for Dune Query. that are extended on in some implementations. """ - def __init__( - self, - name: str, - query_id: int, - params: Optional[list[QueryParameter]] = None, - threshold: int = 0, - ): - self.query_id = query_id - self.fixed_params = params if params else [] - self.name = name - # Threshold for alert worthy number of results. - self.threshold = threshold + def __init__(self, query: QueryData): + self.query = query - def result_url(self) -> str: - """Returns a link to query results excluding fixed parameters""" - return f"https://dune.com/queries/{self.query_id}" + @property + def query_id(self) -> int: + """Returns (nested) query ID - for easier access""" + return self.query.query_id - def refresh(self, dune: DuneAPI) -> list[DuneRecord]: - """Executes dune query by ID, and fetches the results by job ID returned""" - # TODO - this could probably live in the base duneapi library. - job_id = dune.execute(self.query_id, self.parameters()) - return dune.get_results(job_id) + @property + def name(self) -> str: + """Returns (nested) query name - for easier access""" + return self.query.name def parameters(self) -> list[QueryParameter]: """ Base implementation only has fixed parameters, extensions (like WindowedQueryMonitor) would append additional parameters to the fixed ones """ - return self.fixed_params + return self.query.params or [] + + def result_url(self) -> str: + """Returns a link to query results excluding fixed parameters""" + return f"https://dune.com/queries/{self.query_id}" - def alert_message(self, num_results: int) -> str: + @abstractmethod + def get_alert(self, results: list[DuneRecord]) -> Alert: """ Default Alert message if not special implementation is provided. Says which query returned how many results along with a link to Dune. """ - return ( - f"{self.name} - detected {num_results} cases. " - f"Results available at {self.result_url()}" - ) - - def run_loop( - self, dune: DuneAPI, slack_client: WebClient, alert_channel: str - ) -> None: - """ - Standard run-loop refreshing query, fetching results and alerting if necessary. - """ - log.info(f'Refreshing "{self.name}" query {self.result_url()}') - results = self.refresh(dune) - if len(results) > self.threshold: - log.error(self.alert_message(len(results))) - slack_client.chat_postMessage( - channel=alert_channel, - text=self.alert_message(len(results)), - # Do not show link preview! - # https://api.slack.com/reference/messaging/link-unfurling - unfurl_media=False, - ) - else: - log.info(f"No {self.name} detected") - - -class QueryMonitor(BaseQueryMonitor): - """This is essentially the base query monitor with all default methods""" diff --git a/src/query_monitor/factory.py b/src/query_monitor/factory.py index 8524664..c91e5ed 100644 --- a/src/query_monitor/factory.py +++ b/src/query_monitor/factory.py @@ -7,28 +7,34 @@ from duneapi.types import QueryParameter from src.models import TimeWindow, LeftBound -from src.query_monitor.base import BaseQueryMonitor, QueryMonitor +from src.query_monitor.base import QueryBase, QueryData from src.query_monitor.left_bounded import LeftBoundedQueryMonitor +from src.query_monitor.result_threshold import ResultThresholdQuery from src.query_monitor.windowed import WindowedQueryMonitor -def load_from_config(config_yaml: str) -> BaseQueryMonitor: +def load_from_config(config_yaml: str) -> QueryBase: """Loads a QueryMonitor object from yaml configuration file""" with open(config_yaml, "r", encoding="utf-8") as yaml_file: cfg = yaml.load(yaml_file, yaml.Loader) - name, query_id = cfg["name"], cfg["id"] + query = QueryData( + name=cfg["name"], + query_id=cfg["id"], + params=[ + QueryParameter.from_dict(param_cfg) + for param_cfg in cfg.get("parameters", []) + ], + ) + threshold = cfg.get("threshold", 0) - params = [ - QueryParameter.from_dict(param_cfg) for param_cfg in cfg.get("parameters", []) - ] if "window" in cfg: # Windowed Query window = TimeWindow.from_cfg(cfg["window"]) - return WindowedQueryMonitor(name, query_id, window, params, threshold) + return WindowedQueryMonitor(query, window, threshold) if "left_bound" in cfg: # Left Bounded Query left_bound = LeftBound.from_cfg(cfg["left_bound"]) - return LeftBoundedQueryMonitor(name, query_id, left_bound, params, threshold) + return LeftBoundedQueryMonitor(query, left_bound, threshold) - return QueryMonitor(name, query_id, params, threshold) + return ResultThresholdQuery(query, threshold) diff --git a/src/query_monitor/left_bounded.py b/src/query_monitor/left_bounded.py index f875bb3..fd79cc3 100644 --- a/src/query_monitor/left_bounded.py +++ b/src/query_monitor/left_bounded.py @@ -1,14 +1,15 @@ """Implementation of BaseQueryMonitor for queries beginning from StartTime""" +from __future__ import annotations import urllib.parse -from typing import Optional from duneapi.types import QueryParameter from src.models import LeftBound -from src.query_monitor.base import BaseQueryMonitor +from src.query_monitor.base import QueryData +from src.query_monitor.result_threshold import ResultThresholdQuery -class LeftBoundedQueryMonitor(BaseQueryMonitor): +class LeftBoundedQueryMonitor(ResultThresholdQuery): """ All queries here, must have `StartTime` as parameter. This is set by an instance's left_bound attribute. @@ -16,18 +17,16 @@ class LeftBoundedQueryMonitor(BaseQueryMonitor): def __init__( self, - name: str, - query_id: int, + query: QueryData, left_bound: LeftBound, - params: Optional[list[QueryParameter]] = None, threshold: int = 0, ): - super().__init__(name, query_id, params, threshold) + super().__init__(query, threshold) self.left_bound = left_bound def parameters(self) -> list[QueryParameter]: """Similar to the base model, but with left bound parameter appended""" - return self.fixed_params + self.left_bound.as_query_parameters() + return (self.query.params or []) + self.left_bound.as_query_parameters() def result_url(self) -> str: """Returns a link to the query""" diff --git a/src/query_monitor/result_threshold.py b/src/query_monitor/result_threshold.py new file mode 100644 index 0000000..179d0d0 --- /dev/null +++ b/src/query_monitor/result_threshold.py @@ -0,0 +1,30 @@ +""" +Elementary implementation of QueryBase that alerts when +number of results returned is > `threshold` +""" +from duneapi.types import DuneRecord + +from src.alert import Alert, AlertLevel +from src.query_monitor.base import QueryBase, QueryData + + +class ResultThresholdQuery(QueryBase): + """This is essentially the base query monitor with all default methods""" + + def __init__(self, query: QueryData, threshold: int = 0): + super().__init__(query) + self.threshold = threshold + + def get_alert(self, results: list[DuneRecord]) -> Alert: + """ + Default Alert message if not special implementation is provided. + Says which query returned how many results along with a link to Dune. + """ + num_results = len(results) + if num_results > self.threshold: + return Alert( + level=AlertLevel.SLACK, + message=f"{self.name} - detected {num_results} cases. " + f"Results available at {self.result_url()}", + ) + return Alert.default() diff --git a/src/query_monitor/windowed.py b/src/query_monitor/windowed.py index d8fd7d2..1f0cdc3 100644 --- a/src/query_monitor/windowed.py +++ b/src/query_monitor/windowed.py @@ -1,21 +1,22 @@ """ Implementation of BaseQueryMonitor for "windowed" queries having StartTime and EndTime """ +from __future__ import annotations import logging.config import urllib.parse from datetime import datetime, timedelta -from typing import Optional from duneapi.types import QueryParameter from src.models import TimeWindow -from src.query_monitor.base import BaseQueryMonitor +from src.query_monitor.base import QueryData +from src.query_monitor.result_threshold import ResultThresholdQuery log = logging.getLogger(__name__) logging.config.fileConfig(fname="logging.conf", disable_existing_loggers=False) -class WindowedQueryMonitor(BaseQueryMonitor): +class WindowedQueryMonitor(ResultThresholdQuery): """ All queries here, must have `StartTime` and `EndTime` as parameters, set by an instance's window attribute via window.as_query_parameters() @@ -25,18 +26,16 @@ class WindowedQueryMonitor(BaseQueryMonitor): def __init__( self, - name: str, - query_id: int, + query: QueryData, window: TimeWindow, - params: Optional[list[QueryParameter]] = None, threshold: int = 0, ): - super().__init__(name, query_id, params, threshold) + super().__init__(query, threshold) self._set_window(window) def parameters(self) -> list[QueryParameter]: """Similar to the base model, but with window parameters appended""" - return self.fixed_params + self.window.as_query_parameters() + return (self.query.params or []) + self.window.as_query_parameters() def result_url(self) -> str: """Returns a link to the query""" diff --git a/src/runner.py b/src/runner.py new file mode 100644 index 0000000..023f031 --- /dev/null +++ b/src/runner.py @@ -0,0 +1,49 @@ +""" +Query Runner takes any implementation of QueryBase, a dune connection and slack client. +It is responsible for refreshing the query, fetching results, +passing results onto the query and alerting when necessary. +""" +from __future__ import annotations + +import logging.config + +from duneapi.api import DuneAPI +from duneapi.types import DuneRecord + +from src.alert import AlertLevel +from src.query_monitor.base import QueryBase +from src.slack_client import BasicSlackClient + +log = logging.getLogger(__name__) +logging.config.fileConfig(fname="logging.conf", disable_existing_loggers=False) + + +class QueryRunner: + """ + Refreshes a Dune Query, fetches results and alerts slack if necessary + """ + + def __init__(self, query: QueryBase, dune: DuneAPI, slack_client: BasicSlackClient): + self.query = query + self.dune = dune + self.slack_client = slack_client + + def refresh(self) -> list[DuneRecord]: + """Executes dune query by ID, and fetches the results by job ID returned""" + # TODO - this could probably live in the base duneapi library. + query = self.query + log.info(f'Refreshing "{query.name}" query {query.result_url()}') + job_id = self.dune.execute(query.query_id, query.parameters()) + return self.dune.get_results(job_id) + + def run_loop(self) -> None: + """ + Standard run-loop refreshing query, fetching results and alerting if necessary. + """ + results = self.refresh() + alert = self.query.get_alert(results) + if alert.level == AlertLevel.SLACK: + log.warning(alert.message) + self.slack_client.post(alert.message) + elif alert.level == AlertLevel.LOG: + log.info(alert.message) diff --git a/src/slack_client.py b/src/slack_client.py new file mode 100644 index 0000000..80c2b94 --- /dev/null +++ b/src/slack_client.py @@ -0,0 +1,34 @@ +""" +Since channel is fixed at the beginning and nobody wants to see unfurled media +(especially in an alert), this tiny class encapsulates a few things that would +otherwise be unnecessarily repeated. +""" +import ssl + +import certifi +from slack.web.client import WebClient + + +class BasicSlackClient: + """ + Basic Slack Client with message post functionality + constructed from an API token and channel + """ + + def __init__(self, token: str, channel: str) -> None: + self.client = WebClient( + token=token, + # https://stackoverflow.com/questions/59808346/python-3-slack-client-ssl-sslcertverificationerror + ssl=ssl.create_default_context(cafile=certifi.where()), + ) + self.channel = channel + + def post(self, message: str) -> None: + """Posts `message` to `self.channel` excluding link previews.""" + self.client.chat_postMessage( + channel=self.channel, + text=message, + # Do not show link preview! + # https://api.slack.com/reference/messaging/link-unfurling + unfurl_media=False, + ) diff --git a/src/slackbot.py b/src/slackbot.py index ecadc9b..8182ec1 100644 --- a/src/slackbot.py +++ b/src/slackbot.py @@ -3,14 +3,30 @@ """ import argparse import os -import ssl -import certifi import dotenv from duneapi.api import DuneAPI -from slack.web.client import WebClient from src.query_monitor.factory import load_from_config +from src.runner import QueryRunner +from src.slack_client import BasicSlackClient + + +def run_slackbot(config_yaml: str) -> None: + """ + This is the main method of the program. + Instantiate a query runner, and execute its run_loop + """ + dotenv.load_dotenv() + query_runner = QueryRunner( + query=load_from_config(config_yaml), + dune=DuneAPI.new_from_environment(), + slack_client=BasicSlackClient( + token=os.environ["SLACK_TOKEN"], channel=os.environ["SLACK_ALERT_CHANNEL"] + ), + ) + query_runner.run_loop() + if __name__ == "__main__": parser = argparse.ArgumentParser("Missing Tokens") @@ -21,16 +37,4 @@ required=True, ) args = parser.parse_args() - - dotenv.load_dotenv() - query_monitor = load_from_config(args.query_config) - - query_monitor.run_loop( - dune=DuneAPI.new_from_environment(), - slack_client=WebClient( - token=os.environ["SLACK_TOKEN"], - # https://stackoverflow.com/questions/59808346/python-3-slack-client-ssl-sslcertverificationerror - ssl=ssl.create_default_context(cafile=certifi.where()), - ), - alert_channel=os.environ["SLACK_ALERT_CHANNEL"], - ) + run_slackbot(args.query_config) diff --git a/tests/test_implementations.py b/tests/unit/test_implementations.py similarity index 66% rename from tests/test_implementations.py rename to tests/unit/test_implementations.py index 09ced2e..d3f1365 100644 --- a/tests/test_implementations.py +++ b/tests/unit/test_implementations.py @@ -3,10 +3,11 @@ from duneapi.types import QueryParameter -from src.models import TimeWindow -from src.query_monitor.base import QueryMonitor +from src.alert import Alert, AlertLevel +from src.query_monitor.base import QueryData from src.query_monitor.factory import load_from_config -from src.query_monitor.windowed import WindowedQueryMonitor +from src.query_monitor.result_threshold import ResultThresholdQuery +from src.query_monitor.windowed import WindowedQueryMonitor, TimeWindow from src.query_monitor.left_bounded import LeftBoundedQueryMonitor @@ -19,21 +20,18 @@ def setUp(self) -> None: QueryParameter.number_type("Text", 12), QueryParameter.date_type("Date", "2021-01-01 12:34:56"), ] - self.monitor = QueryMonitor( - name="Monitor", query_id=0, params=self.query_params - ) + query = QueryData(name="Monitor", query_id=0, params=self.query_params) + self.monitor = ResultThresholdQuery(query) self.windowed_monitor = WindowedQueryMonitor( - name="Windowed Monitor", - query_id=1, + query, window=TimeWindow(start=self.date), - params=self.query_params, ) def test_result_url(self): self.assertEqual(self.monitor.result_url(), "https://dune.com/queries/0") self.assertEqual( self.windowed_monitor.result_url(), - "https://dune.com/queries/1?StartTime=1985-03-10+00%3A00%3A00&EndTime=1985-03-10+06%3A00%3A00", + "https://dune.com/queries/0?StartTime=1985-03-10+00%3A00%3A00&EndTime=1985-03-10+06%3A00%3A00", ) def test_parameters(self): @@ -45,27 +43,33 @@ def test_parameters(self): def test_alert_message(self): self.assertEqual( - self.monitor.alert_message(1), - f"{self.monitor.name} - detected 1 cases. " - f"Results available at {self.monitor.result_url()}", + self.monitor.get_alert([{}]), + Alert( + level=AlertLevel.SLACK, + message=f"{self.monitor.name} - detected 1 cases. " + f"Results available at {self.monitor.result_url()}", + ), ) self.assertEqual( - self.windowed_monitor.alert_message(2), - f"{self.windowed_monitor.name} - detected 2 cases. " - f"Results available at {self.windowed_monitor.result_url()}", + self.windowed_monitor.get_alert([{}, {}]), + Alert( + level=AlertLevel.SLACK, + message=f"{self.windowed_monitor.name} - detected 2 cases. " + f"Results available at {self.windowed_monitor.result_url()}", + ), ) class TestFactory(unittest.TestCase): def test_load_from_config(self): no_params_monitor = load_from_config("./tests/data/no-params.yaml") - self.assertTrue(isinstance(no_params_monitor, QueryMonitor)) + self.assertTrue(isinstance(no_params_monitor, ResultThresholdQuery)) self.assertEqual(no_params_monitor.parameters(), []) with_params_monitor = load_from_config("./tests/data/with-params.yaml") self.assertGreater(len(with_params_monitor.parameters()), 0) - self.assertTrue(isinstance(with_params_monitor, QueryMonitor)) + self.assertTrue(isinstance(with_params_monitor, ResultThresholdQuery)) windowed_monitor = load_from_config("./tests/data/windowed-query.yaml") self.assertTrue(isinstance(windowed_monitor, WindowedQueryMonitor)) diff --git a/tests/test_models.py b/tests/unit/test_models.py similarity index 98% rename from tests/test_models.py rename to tests/unit/test_models.py index c821892..9302984 100644 --- a/tests/test_models.py +++ b/tests/unit/test_models.py @@ -4,7 +4,7 @@ from duneapi.types import QueryParameter -from src.models import TimeWindow, LeftBound, TimeUnit +from src.models import LeftBound, TimeUnit, TimeWindow class TestTimeWindow(unittest.TestCase):