Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Counter Query Monitor (Take 2) #18

Merged
merged 15 commits into from
Aug 29, 2022
Merged
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[MASTER]
disable=fixme,logging-fstring-interpolation,too-many-arguments
disable=fixme,logging-fstring-interpolation,too-many-arguments,too-few-public-methods
31 changes: 31 additions & 0 deletions src/alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Enum for alert levels and data class of type
(AlertLevel, String) containing the alert message
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum


class AlertLevel(Enum):
"""
Alert Levels ranging from
None (the lowest) to Slack (the highest)
"""

NONE = 0
LOG = 1
SLACK = 2


@dataclass
class Alert:
"""Encodes a "tuple" of AlertLevel with a message"""

kind: AlertLevel
value: str

@classmethod
def default(cls) -> Alert:
"""Default alert level is non with no message."""
return Alert(AlertLevel.NONE, "")
92 changes: 31 additions & 61 deletions src/query_monitor/base.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,57 @@
"""
Abstract class containing Base/Default QueryMonitor attributes.
"""
from __future__ import annotations

import logging.config
from abc import ABC
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional

from duneapi.api import DuneAPI
from duneapi.types import QueryParameter, DuneRecord
from slack.web.client import WebClient

log = logging.getLogger(__name__)
logging.config.fileConfig(fname="logging.conf", disable_existing_loggers=False)
from src.alert import Alert


@dataclass
class QueryData:
"""Basic data structure constituting a Dune Analytics Query."""

name: str
query_id: int
params: Optional[list[QueryParameter]] = None


class BaseQueryMonitor(ABC):
class QueryBase(ABC):
"""
Abstract base class for Dune Query Monitoring.
Contains several default/fallback methods,
Base class for Dune Query.
that are extended on in some implementations.
"""

def __init__(
self,
name: str,
query_id: int,
params: Optional[list[QueryParameter]] = None,
threshold: int = 0,
):
self.query_id = query_id
self.fixed_params = params if params else []
self.name = name
# Threshold for alert worthy number of results.
self.threshold = threshold
def __init__(self, query: QueryData):
self.query = query

def result_url(self) -> str:
"""Returns a link to query results excluding fixed parameters"""
return f"https://dune.com/queries/{self.query_id}"
@property
def query_id(self) -> int:
"""Returns (nested) query ID - for easier access"""
return self.query.query_id

def refresh(self, dune: DuneAPI) -> list[DuneRecord]:
"""Executes dune query by ID, and fetches the results by job ID returned"""
# TODO - this could probably live in the base duneapi library.
job_id = dune.execute(self.query_id, self.parameters())
return dune.get_results(job_id)
@property
def name(self) -> str:
"""Returns (nested) query name - for easier access"""
return self.query.name

def parameters(self) -> list[QueryParameter]:
"""
Base implementation only has fixed parameters,
extensions (like WindowedQueryMonitor) would append additional parameters to the fixed ones
"""
return self.fixed_params
return self.query.params or []

def result_url(self) -> str:
"""Returns a link to query results excluding fixed parameters"""
return f"https://dune.com/queries/{self.query_id}"

def alert_message(self, num_results: int) -> str:
@abstractmethod
def alert_message(self, results: list[DuneRecord]) -> Alert:
"""
Default Alert message if not special implementation is provided.
Says which query returned how many results along with a link to Dune.
"""
return (
f"{self.name} - detected {num_results} cases. "
f"Results available at {self.result_url()}"
)

def run_loop(
self, dune: DuneAPI, slack_client: WebClient, alert_channel: str
) -> None:
"""
Standard run-loop refreshing query, fetching results and alerting if necessary.
"""
log.info(f'Refreshing "{self.name}" query {self.result_url()}')
results = self.refresh(dune)
if len(results) > self.threshold:
log.error(self.alert_message(len(results)))
slack_client.chat_postMessage(
channel=alert_channel,
text=self.alert_message(len(results)),
# Do not show link preview!
# https://api.slack.com/reference/messaging/link-unfurling
unfurl_media=False,
)
else:
log.info(f"No {self.name} detected")


class QueryMonitor(BaseQueryMonitor):
"""This is essentially the base query monitor with all default methods"""
40 changes: 40 additions & 0 deletions src/query_monitor/counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""QueryMonitor for Counters. Alert set to valuation"""

from duneapi.types import DuneRecord

from src.alert import Alert, AlertLevel
from src.query_monitor.base import QueryBase, QueryData


class CounterQueryMonitor(QueryBase):
"""
All queries here, must return a single record specifying a column with numeric type.
"""

def __init__(
self,
query: QueryData,
column: str,
alert_value: float = 0.0,
):
super().__init__(query)
self.column = column
self.alert_value = alert_value

def _result_value(self, results: list[DuneRecord]) -> float:
assert len(results) == 1, f"Expected single record, got {results}"
return float(results[0][self.column])

def alert_message(self, results: list[DuneRecord]) -> Alert:
result_value = self._result_value(results)
if result_value > self.alert_value:
return Alert(
kind=AlertLevel.SLACK,
value=f"Query {self.name}: {self.column} exceeds {self.alert_value} "
f"with {self._result_value(results)} (cf. {self.result_url()})",
)
return Alert(
kind=AlertLevel.LOG,
value=f"value of {self.column} = {result_value} "
f"does not exceed {self.alert_value}",
)
30 changes: 21 additions & 9 deletions src/query_monitor/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,40 @@
from duneapi.types import QueryParameter

from src.models import TimeWindow, LeftBound
from src.query_monitor.base import BaseQueryMonitor, QueryMonitor
from src.query_monitor.base import QueryBase, QueryData
from src.query_monitor.counter import CounterQueryMonitor
from src.query_monitor.left_bounded import LeftBoundedQueryMonitor
from src.query_monitor.result_threshold import ResultThresholdQuery
from src.query_monitor.windowed import WindowedQueryMonitor


def load_from_config(config_yaml: str) -> BaseQueryMonitor:
def load_from_config(config_yaml: str) -> QueryBase:
"""Loads a QueryMonitor object from yaml configuration file"""
with open(config_yaml, "r", encoding="utf-8") as yaml_file:
cfg = yaml.load(yaml_file, yaml.Loader)

name, query_id = cfg["name"], cfg["id"]
query = QueryData(
name=cfg["name"],
query_id=cfg["id"],
params=[
QueryParameter.from_dict(param_cfg)
for param_cfg in cfg.get("parameters", [])
],
)

threshold = cfg.get("threshold", 0)
params = [
QueryParameter.from_dict(param_cfg) for param_cfg in cfg.get("parameters", [])
]
if "window" in cfg:
# Windowed Query
window = TimeWindow.from_cfg(cfg["window"])
return WindowedQueryMonitor(name, query_id, window, params, threshold)
return WindowedQueryMonitor(query, window, threshold)
if "left_bound" in cfg:
# Left Bounded Query
left_bound = LeftBound.from_cfg(cfg["left_bound"])
return LeftBoundedQueryMonitor(name, query_id, left_bound, params, threshold)
return LeftBoundedQueryMonitor(query, left_bound, threshold)

if "column" in cfg and "alert_value" in cfg:
# Counter Query
column, alert_value = cfg["column"], float(cfg["alert_value"])
return CounterQueryMonitor(query, column, alert_value)

return QueryMonitor(name, query_id, params, threshold)
return ResultThresholdQuery(query, threshold)
15 changes: 7 additions & 8 deletions src/query_monitor/left_bounded.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
"""Implementation of BaseQueryMonitor for queries beginning from StartTime"""
from __future__ import annotations
import urllib.parse
from typing import Optional

from duneapi.types import QueryParameter

from src.models import LeftBound
from src.query_monitor.base import BaseQueryMonitor
from src.query_monitor.base import QueryData
from src.query_monitor.result_threshold import ResultThresholdQuery


class LeftBoundedQueryMonitor(BaseQueryMonitor):
class LeftBoundedQueryMonitor(ResultThresholdQuery):
"""
All queries here, must have `StartTime` as parameter.
This is set by an instance's left_bound attribute.
"""

def __init__(
self,
name: str,
query_id: int,
query: QueryData,
left_bound: LeftBound,
params: Optional[list[QueryParameter]] = None,
threshold: int = 0,
):
super().__init__(name, query_id, params, threshold)
super().__init__(query, threshold)
self.left_bound = left_bound

def parameters(self) -> list[QueryParameter]:
"""Similar to the base model, but with left bound parameter appended"""
return self.fixed_params + self.left_bound.as_query_parameters()
return (self.query.params or []) + self.left_bound.as_query_parameters()

def result_url(self) -> str:
"""Returns a link to the query"""
Expand Down
30 changes: 30 additions & 0 deletions src/query_monitor/result_threshold.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
Elementary implementation of QueryBase that alerts when
number of results returned is > `threshold`
"""
from duneapi.types import DuneRecord

from src.alert import Alert, AlertLevel
from src.query_monitor.base import QueryBase, QueryData


class ResultThresholdQuery(QueryBase):
"""This is essentially the base query monitor with all default methods"""

def __init__(self, query: QueryData, threshold: int = 0):
super().__init__(query)
self.threshold = threshold

def alert_message(self, results: list[DuneRecord]) -> Alert:
"""
Default Alert message if not special implementation is provided.
Says which query returned how many results along with a link to Dune.
"""
num_results = len(results)
if num_results > self.threshold:
return Alert(
kind=AlertLevel.SLACK,
value=f"{self.name} - detected {num_results} cases. "
f"Results available at {self.result_url()}",
)
return Alert.default()
15 changes: 7 additions & 8 deletions src/query_monitor/windowed.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
"""
Implementation of BaseQueryMonitor for "windowed" queries having StartTime and EndTime
"""
from __future__ import annotations
import logging.config
import urllib.parse
from datetime import datetime, timedelta
from typing import Optional

from duneapi.types import QueryParameter

from src.models import TimeWindow
from src.query_monitor.base import BaseQueryMonitor
from src.query_monitor.base import QueryData
from src.query_monitor.result_threshold import ResultThresholdQuery

log = logging.getLogger(__name__)
logging.config.fileConfig(fname="logging.conf", disable_existing_loggers=False)


class WindowedQueryMonitor(BaseQueryMonitor):
class WindowedQueryMonitor(ResultThresholdQuery):
"""
All queries here, must have `StartTime` and `EndTime` as parameters,
set by an instance's window attribute via window.as_query_parameters()
Expand All @@ -25,18 +26,16 @@ class WindowedQueryMonitor(BaseQueryMonitor):

def __init__(
self,
name: str,
query_id: int,
query: QueryData,
window: TimeWindow,
params: Optional[list[QueryParameter]] = None,
threshold: int = 0,
):
super().__init__(name, query_id, params, threshold)
super().__init__(query, threshold)
self._set_window(window)

def parameters(self) -> list[QueryParameter]:
"""Similar to the base model, but with window parameters appended"""
return self.fixed_params + self.window.as_query_parameters()
return (self.query.params or []) + self.window.as_query_parameters()

def result_url(self) -> str:
"""Returns a link to the query"""
Expand Down
Loading