Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remodeling Structures #17

Merged
merged 14 commits into from
Aug 29, 2022
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[MASTER]
disable=fixme,logging-fstring-interpolation,too-many-arguments
disable=fixme,logging-fstring-interpolation,too-many-arguments,too-few-public-methods
31 changes: 31 additions & 0 deletions src/alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Enum for alert levels and data class of type
(AlertLevel, String) containing the alert message
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum


class AlertLevel(Enum):
"""
Alert Levels ranging from
None (the lowest) to Slack (the highest)
"""

NONE = 0
LOG = 1
SLACK = 2


@dataclass
class Alert:
"""Encodes a "tuple" of AlertLevel with a message"""

kind: AlertLevel
bh2smith marked this conversation as resolved.
Show resolved Hide resolved
value: str
bh2smith marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def default(cls) -> Alert:
"""Default alert level is non with no message."""
return Alert(AlertLevel.NONE, "")
106 changes: 0 additions & 106 deletions src/models.py

This file was deleted.

87 changes: 27 additions & 60 deletions src/query_monitor/base.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,54 @@
"""
Abstract class containing Base/Default QueryMonitor attributes.
"""
from __future__ import annotations

import logging.config
from abc import ABC
from dataclasses import dataclass
from typing import Optional

from duneapi.api import DuneAPI
from duneapi.types import QueryParameter, DuneRecord
from slack.web.client import WebClient

log = logging.getLogger(__name__)
logging.config.fileConfig(fname="logging.conf", disable_existing_loggers=False)
from src.alert import Alert


@dataclass
class QueryData:
"""Basic data structure constituting a Dune Analytics Query."""

name: str
query_id: int
params: Optional[list[QueryParameter]] = None


class BaseQueryMonitor(ABC):
class QueryBase(ABC):
Comment on lines +14 to +22
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we split the query data into its own class and the base is now a true abstract class. Furthermore, run_loop method has been moved out of this class an into a new class called QueryRunner which has all the third party communication fields like dune and slack client (i.e. those which are required to run query and alert).

"""
Abstract base class for Dune Query Monitoring.
Contains several default/fallback methods,
Base class for Dune Query.
that are extended on in some implementations.
"""

def __init__(
self,
name: str,
query_id: int,
params: Optional[list[QueryParameter]] = None,
threshold: int = 0,
):
self.query_id = query_id
self.fixed_params = params if params else []
self.name = name
# Threshold for alert worthy number of results.
self.threshold = threshold
def __init__(self, query: QueryData):
self.query = query

def result_url(self) -> str:
"""Returns a link to query results excluding fixed parameters"""
return f"https://dune.com/queries/{self.query_id}"
@property
def query_id(self) -> int:
"""Returns (nested) query ID - for easier access"""
return self.query.query_id

def refresh(self, dune: DuneAPI) -> list[DuneRecord]:
"""Executes dune query by ID, and fetches the results by job ID returned"""
# TODO - this could probably live in the base duneapi library.
job_id = dune.execute(self.query_id, self.parameters())
return dune.get_results(job_id)
@property
def name(self) -> str:
"""Returns (nested) query name - for easier access"""
return self.query.name

def parameters(self) -> list[QueryParameter]:
"""
Base implementation only has fixed parameters,
extensions (like WindowedQueryMonitor) would append additional parameters to the fixed ones
"""
return self.fixed_params

def alert_message(self, num_results: int) -> str:
def result_url(self) -> str:
"""Returns a link to query results excluding fixed parameters"""

def alert_message(self, results: list[DuneRecord]) -> Alert:
bh2smith marked this conversation as resolved.
Show resolved Hide resolved
"""
Default Alert message if not special implementation is provided.
Says which query returned how many results along with a link to Dune.
"""
return (
f"{self.name} - detected {num_results} cases. "
f"Results available at {self.result_url()}"
)

def run_loop(
self, dune: DuneAPI, slack_client: WebClient, alert_channel: str
) -> None:
"""
Standard run-loop refreshing query, fetching results and alerting if necessary.
"""
log.info(f'Refreshing "{self.name}" query {self.result_url()}')
results = self.refresh(dune)
if len(results) > self.threshold:
log.error(self.alert_message(len(results)))
slack_client.chat_postMessage(
channel=alert_channel,
text=self.alert_message(len(results)),
# Do not show link preview!
# https://api.slack.com/reference/messaging/link-unfurling
unfurl_media=False,
)
else:
log.info(f"No {self.name} detected")


class QueryMonitor(BaseQueryMonitor):
"""This is essentially the base query monitor with all default methods"""
29 changes: 17 additions & 12 deletions src/query_monitor/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,34 @@
import yaml
from duneapi.types import QueryParameter

from src.models import TimeWindow, LeftBound
from src.query_monitor.base import BaseQueryMonitor, QueryMonitor
from src.query_monitor.left_bounded import LeftBoundedQueryMonitor
from src.query_monitor.windowed import WindowedQueryMonitor
from src.query_monitor.base import QueryBase, QueryData
from src.query_monitor.left_bounded import LeftBoundedQueryMonitor, LeftBound
from src.query_monitor.result_threshold import ResultThresholdQuery
from src.query_monitor.windowed import WindowedQueryMonitor, TimeWindow


def load_from_config(config_yaml: str) -> BaseQueryMonitor:
def load_from_config(config_yaml: str) -> QueryBase:
"""Loads a QueryMonitor object from yaml configuration file"""
with open(config_yaml, "r", encoding="utf-8") as yaml_file:
cfg = yaml.load(yaml_file, yaml.Loader)

name, query_id = cfg["name"], cfg["id"]
query = QueryData(
name=cfg["name"],
query_id=cfg["id"],
params=[
QueryParameter.from_dict(param_cfg)
for param_cfg in cfg.get("parameters", [])
],
)

threshold = cfg.get("threshold", 0)
params = [
QueryParameter.from_dict(param_cfg) for param_cfg in cfg.get("parameters", [])
]
if "window" in cfg:
# Windowed Query
window = TimeWindow.from_cfg(cfg["window"])
return WindowedQueryMonitor(name, query_id, window, params, threshold)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the passing of QueryData object here instead of the many arguments. Another cool quote from Clean Code:

The ideal number of arguments for a function is
zero (niladic). Next comes one (monadic), followed
closely by two (dyadic). Three arguments (triadic)
should be avoided where possible. More than three
(polyadic) requires very special justification—and
then shouldn’t be used anyway.

return WindowedQueryMonitor(query, window, threshold)
if "left_bound" in cfg:
# Left Bounded Query
left_bound = LeftBound.from_cfg(cfg["left_bound"])
return LeftBoundedQueryMonitor(name, query_id, left_bound, params, threshold)
return LeftBoundedQueryMonitor(query, left_bound, threshold)

return QueryMonitor(name, query_id, params, threshold)
return ResultThresholdQuery(query, threshold)
67 changes: 58 additions & 9 deletions src/query_monitor/left_bounded.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,82 @@
"""Implementation of BaseQueryMonitor for queries beginning from StartTime"""
from __future__ import annotations
import urllib.parse
from typing import Optional
from enum import Enum

from duneapi.types import QueryParameter

from src.models import LeftBound
from src.query_monitor.base import BaseQueryMonitor
from src.query_monitor.base import QueryData
from src.query_monitor.result_threshold import ResultThresholdQuery


class LeftBoundedQueryMonitor(BaseQueryMonitor):
class TimeUnit(Enum):
"""
Enum representing SQL interval time units
"""

SECONDS = "seconds"
MINUTES = "minutes"
HOURS = "hours"
DAYS = "days"
WEEKS = "weeks"
MONTHS = "months"

@classmethod
def options(cls) -> list[str]:
"""Returns a list of all available enum items"""
return [str(e.value) for e in cls]


class LeftBound:
"""Left Bound for Query Monitor"""

def __init__(self, units: TimeUnit, offset: int):
self.units = units
self.offset = offset

def as_query_parameters(self) -> list[QueryParameter]:
"""Returns DuneQueryParameters for object instance"""
return [
QueryParameter.enum_type(
name="TimeUnits",
value=str(self.units.value),
options=TimeUnit.options(),
),
QueryParameter.number_type(name="Offset", value=self.offset),
]

@classmethod
def from_cfg(cls, cfg: dict[str, int]) -> LeftBound:
"""Loads LeftBound based on dict containing keys units and offset"""
return cls(
units=TimeUnit(cfg["units"]),
offset=int(cfg["offset"]),
)

def __eq__(self, other: object) -> bool:
if isinstance(other, LeftBound):
return self.units == other.units and self.offset == other.offset
raise ValueError(f"Can't compare LeftBound with {type(other)}")


class LeftBoundedQueryMonitor(ResultThresholdQuery):
"""
All queries here, must have `StartTime` as parameter.
This is set by an instance's left_bound attribute.
"""

def __init__(
self,
name: str,
query_id: int,
query: QueryData,
left_bound: LeftBound,
params: Optional[list[QueryParameter]] = None,
threshold: int = 0,
):
super().__init__(name, query_id, params, threshold)
super().__init__(query, threshold)
self.left_bound = left_bound

def parameters(self) -> list[QueryParameter]:
"""Similar to the base model, but with left bound parameter appended"""
return self.fixed_params + self.left_bound.as_query_parameters()
return (self.query.params or []) + self.left_bound.as_query_parameters()
Comment on lines -30 to +29
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will be apparent in a few places because python type checking does not allow you to set mutable defaults. This means that the QueryData.params class has to use an Optional[list] defaulting to None instead of a list defaulting to []. The parameters methods expect a list returned and one cannot concatenate None with a list type... this is how we roll.


def result_url(self) -> str:
"""Returns a link to the query"""
Expand Down
Loading