diff --git a/eagleproject/notify/events/__init__.py b/eagleproject/notify/events/__init__.py index ecd4dfff..274aa03c 100644 --- a/eagleproject/notify/events/__init__.py +++ b/eagleproject/notify/events/__init__.py @@ -4,6 +4,7 @@ from core.common import Severity from notify.models import EventSeen +EVENT_DUPE_WINDOW_SECONDS = timedelta(seconds=3600) # 1hr def event_order_comp(a, b) -> int: """ Compare to Events for ordering """ @@ -33,7 +34,10 @@ class Event: """ An event worthy of an action """ def __init__(self, title, details, severity=Severity.NORMAL, stamp=datetime.utcnow(), tags=['default'], block_number=0, - transaction_index=0, log_index=0): + transaction_index=0, log_index=0, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS): + + assert isinstance(deduplicate_time_window, timedelta), "since is not a timedelta object" + self._severity = severity or Severity.NORMAL self._title = title self._details = details @@ -43,6 +47,7 @@ def __init__(self, title, details, severity=Severity.NORMAL, self._transaction_index = transaction_index self._log_index = log_index self.vague_hash = False + self.deduplicate_time_window = deduplicate_time_window def __str__(self): return "{} [{}] {}: {}".format( @@ -109,7 +114,7 @@ def tags(self): def event_critical(title, details, stamp=datetime.utcnow(), tags=None, block_number=0, transaction_index=0, log_index=0, - log_model=None): + log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS): """ Create a critical severity event """ if log_model is not None: @@ -126,12 +131,13 @@ def event_critical(title, details, stamp=datetime.utcnow(), tags=None, block_number=block_number, transaction_index=transaction_index, log_index=log_index, + deduplicate_time_window=deduplicate_time_window, ) def event_high(title, details, stamp=datetime.utcnow(), tags=None, block_number=0, transaction_index=0, log_index=0, - log_model=None): + log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS): """ Create a high severity event """ if log_model is not None: @@ -148,12 +154,13 @@ def event_high(title, details, stamp=datetime.utcnow(), tags=None, block_number=block_number, transaction_index=transaction_index, log_index=log_index, + deduplicate_time_window=deduplicate_time_window, ) def event_normal(title, details, stamp=datetime.utcnow(), tags=None, block_number=0, transaction_index=0, log_index=0, - log_model=None): + log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS): """ Create a normal severity event """ if log_model is not None: @@ -170,12 +177,13 @@ def event_normal(title, details, stamp=datetime.utcnow(), tags=None, block_number=block_number, transaction_index=transaction_index, log_index=log_index, + deduplicate_time_window=deduplicate_time_window, ) def event_low(title, details, stamp=datetime.utcnow(), tags=None, block_number=0, transaction_index=0, log_index=0, - log_model=None): + log_model=None, deduplicate_time_window=EVENT_DUPE_WINDOW_SECONDS): """ Create a low severity event """ if log_model is not None: @@ -192,21 +200,13 @@ def event_low(title, details, stamp=datetime.utcnow(), tags=None, block_number=block_number, transaction_index=transaction_index, log_index=log_index, + deduplicate_time_window=deduplicate_time_window, ) -def seen_filter(events, since): - """ Filter out any events seen since `since` and add newly discovered hashes - to the DB """ - assert isinstance(since, timedelta), "since is not a timedelta object" - - hashes_since = [ - x.event_hash - for x in EventSeen.objects.filter( - last_seen__gt=datetime.now(tz=timezone.utc) - since - ).only('event_hash') - ] - +def seen_filter(events): + """ Filter out any events seen since `event.deduplicate_time_window` and + add newly discovered hashes to the DB """ filtered = [] events_parsed = [] @@ -216,13 +216,22 @@ def seen_filter(events, since): # Deduplicate unprocessed events if event_hash in events_parsed: continue + events_parsed.append(event_hash) - - if event_hash not in hashes_since: - filtered.append(ev) - EventSeen.objects.update_or_create(event_hash=event_hash, defaults={ + _, created = EventSeen.objects.get_or_create( + event_hash=event_hash, + last_seen__gt=datetime.now(tz=timezone.utc) - ev.deduplicate_time_window, + defaults={ + 'event_hash': event_hash, 'last_seen': datetime.now(tz=timezone.utc) - }) + } + ) + + if not created: + # Do not send duplicate alert within the defined time window + continue + + filtered.append(ev) return filtered diff --git a/eagleproject/notify/main.py b/eagleproject/notify/main.py index 1e378cc4..8e2870dc 100644 --- a/eagleproject/notify/main.py +++ b/eagleproject/notify/main.py @@ -1,14 +1,9 @@ -from datetime import timedelta - from notify.actions import execute_all_actions, create_actions_from_events from notify.events import seen_filter from notify.triggers import run_all_triggers -EVENT_DUPE_WINDOW_SECONDS = timedelta(seconds=3600) # 1hr - - def run_all(): events = run_all_triggers() - events = seen_filter(events, since=EVENT_DUPE_WINDOW_SECONDS) + events = seen_filter(events) actions = create_actions_from_events(events) execute_all_actions(actions) diff --git a/eagleproject/notify/triggers/vault_oracle.py b/eagleproject/notify/triggers/vault_oracle.py index b1e2ef5f..e3202c18 100644 --- a/eagleproject/notify/triggers/vault_oracle.py +++ b/eagleproject/notify/triggers/vault_oracle.py @@ -9,6 +9,9 @@ from core.blockchain.strategies import OUSD_BACKING_ASSETS, OETH_BACKING_ASSETS from time import sleep +from datetime import timedelta + +DEDUPE_WINDOW_SECONDS = timedelta(days=1) # 1d # USD-pegged stable coins drift thresholds MAX_USD_PRICE = Decimal("1.05") @@ -63,13 +66,25 @@ def run_trigger(transfers, new_transfers): continue except AssertionError as e: events.append( - event_high("Exceptional Oracle Price 🧙‍♀️", str(e)) + event_high( + "Exceptional Oracle Price 🧙‍♀️", + str(e), + deduplicate_time_window=DEDUPE_WINDOW_SECONDS, + ) ) continue except RPCError as e: print("RPC Error when reading price for {}".format(symbol), e) - sleep(3) - if retries <= 0: - events.append(event_high("RPC Error when reading price for {} 🔴".format(symbol), str(e))) + if "below peg" in e.message or retries <= 0: + events.append( + event_high( + "RPC Error when reading price for {} 🔴".format(symbol), + str(e), + deduplicate_time_window=DEDUPE_WINDOW_SECONDS, + ), + ) + break + elif retries > 0: + sleep(3) return events diff --git a/eagleproject/notify/views.py b/eagleproject/notify/views.py index be15b517..a7193987 100644 --- a/eagleproject/notify/views.py +++ b/eagleproject/notify/views.py @@ -19,7 +19,7 @@ def gc(request): # event_seen is time sensitive and old entries are irrelevant try: EventSeen.objects.filter( - last_seen__gt=datetime.utcnow() - timedelta(hours=2) + last_seen__gt=datetime.utcnow() - timedelta(hours=24) ).delete() except Exception: log.exception( diff --git a/eagleproject/scripts/gc.py b/eagleproject/scripts/gc.py index 0a44aeec..f2c33bbb 100644 --- a/eagleproject/scripts/gc.py +++ b/eagleproject/scripts/gc.py @@ -11,7 +11,7 @@ def run(): # event_seen is time sensitive and old entries are irrelevant try: EventSeen.objects.filter( - last_seen__gt=datetime.utcnow() - timedelta(hours=2) + last_seen__gt=datetime.utcnow() - timedelta(hours=24) ).delete() except Exception: log.exception(