Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event-specific dedupe window #375

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions eagleproject/notify/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from core.common import Severity
from notify.models import EventSeen

EVENT_DUPE_WINDOW_SECONDS = timedelta(seconds=3600) # 1hr

def event_order_comp(a, b) -> int:
""" Compare to Events for ordering """
Expand Down Expand Up @@ -33,7 +34,10 @@ class Event:
""" An event worthy of an action """
def __init__(self, title, details, severity=Severity.NORMAL,
stamp=datetime.utcnow(), tags=['default'], block_number=0,
transaction_index=0, log_index=0):
transaction_index=0, log_index=0, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS):

assert isinstance(deduplicate_time_window, timedelta), "since is not a timedelta object"

self._severity = severity or Severity.NORMAL
self._title = title
self._details = details
Expand All @@ -43,6 +47,7 @@ def __init__(self, title, details, severity=Severity.NORMAL,
self._transaction_index = transaction_index
self._log_index = log_index
self.vague_hash = False
self.deduplicate_time_window = deduplicate_time_window

def __str__(self):
return "{} [{}] {}: {}".format(
Expand Down Expand Up @@ -109,7 +114,7 @@ def tags(self):

def event_critical(title, details, stamp=datetime.utcnow(), tags=None,
block_number=0, transaction_index=0, log_index=0,
log_model=None):
log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS):
""" Create a critical severity event """

if log_model is not None:
Expand All @@ -126,12 +131,13 @@ def event_critical(title, details, stamp=datetime.utcnow(), tags=None,
block_number=block_number,
transaction_index=transaction_index,
log_index=log_index,
deduplicate_time_window=deduplicate_time_window,
)


def event_high(title, details, stamp=datetime.utcnow(), tags=None,
block_number=0, transaction_index=0, log_index=0,
log_model=None):
log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS):
""" Create a high severity event """

if log_model is not None:
Expand All @@ -148,12 +154,13 @@ def event_high(title, details, stamp=datetime.utcnow(), tags=None,
block_number=block_number,
transaction_index=transaction_index,
log_index=log_index,
deduplicate_time_window=deduplicate_time_window,
)


def event_normal(title, details, stamp=datetime.utcnow(), tags=None,
block_number=0, transaction_index=0, log_index=0,
log_model=None):
log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS):
""" Create a normal severity event """

if log_model is not None:
Expand All @@ -170,12 +177,13 @@ def event_normal(title, details, stamp=datetime.utcnow(), tags=None,
block_number=block_number,
transaction_index=transaction_index,
log_index=log_index,
deduplicate_time_window=deduplicate_time_window,
)


def event_low(title, details, stamp=datetime.utcnow(), tags=None,
block_number=0, transaction_index=0, log_index=0,
log_model=None):
log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS):
""" Create a low severity event """

if log_model is not None:
Expand All @@ -192,21 +200,13 @@ def event_low(title, details, stamp=datetime.utcnow(), tags=None,
block_number=block_number,
transaction_index=transaction_index,
log_index=log_index,
deduplicate_time_window=deduplicate_time_window,
)


def seen_filter(events, since):
def seen_filter(events):
""" Filter out any events seen since `since` and add newly discovered hashes
to the DB """
assert isinstance(since, timedelta), "since is not a timedelta object"

hashes_since = [
x.event_hash
for x in EventSeen.objects.filter(
last_seen__gt=datetime.now(tz=timezone.utc) - since
).only('event_hash')
]

filtered = []
events_parsed = []

Expand All @@ -216,13 +216,22 @@ def seen_filter(events, since):
# Deduplicate unprocessed events
if event_hash in events_parsed:
continue

events_parsed.append(event_hash)

if event_hash not in hashes_since:
filtered.append(ev)

EventSeen.objects.update_or_create(event_hash=event_hash, defaults={
_, created = EventSeen.objects.get_or_create(
event_hash=event_hash,
last_seen__gt=datetime.now(tz=timezone) - ev.deduplicate_time_window,
defaults={
'event_hash': event_hash,
'last_seen': datetime.now(tz=timezone.utc)
})
}
)

if not created:
# Do not send duplicate alert within the defined time window
continue

filtered.append(ev)

return filtered
7 changes: 1 addition & 6 deletions eagleproject/notify/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
from datetime import timedelta

from notify.actions import execute_all_actions, create_actions_from_events
from notify.events import seen_filter
from notify.triggers import run_all_triggers

EVENT_DUPE_WINDOW_SECONDS = timedelta(seconds=3600) # 1hr


def run_all():
events = run_all_triggers()
events = seen_filter(events, since=EVENT_DUPE_WINDOW_SECONDS)
events = seen_filter(events)
actions = create_actions_from_events(events)
execute_all_actions(actions)
14 changes: 12 additions & 2 deletions eagleproject/notify/triggers/vault_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from core.blockchain.strategies import OUSD_BACKING_ASSETS, OETH_BACKING_ASSETS

from time import sleep
from datetime import timedelta

DEDUPE_WINDOW_SECONDS = timedelta(days=1) # 1d

# USD-pegged stable coins drift thresholds
MAX_USD_PRICE = Decimal("1.05")
Expand Down Expand Up @@ -63,13 +66,20 @@ def run_trigger(transfers, new_transfers):
continue
except AssertionError as e:
events.append(
event_high("Exceptional Oracle Price 🧙‍♀️", str(e))
event_high(
"Exceptional Oracle Price 🧙‍♀️", str(e),
deduplicate_time_window=DEDUPE_WINDOW_SECONDS,
)
)
continue
except RPCError as e:
print("RPC Error when reading price for {}".format(symbol), e)
sleep(3)
if retries <= 0:
events.append(event_high("RPC Error when reading price for {} 🔴".format(symbol), str(e)))
events.append(
event_high(
"RPC Error when reading price for {} 🔴".format(symbol), str(e)),
deduplicate_time_window=DEDUPE_WINDOW_SECONDS,
)

return events
Loading