Skip to content

Commit

Permalink
Call cleanup task upon passing threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
aayush-se committed Sep 19, 2024
1 parent 3232528 commit 341526b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
27 changes: 27 additions & 0 deletions src/seer/anomaly_detection/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
TimeSeriesAnomalies,
)
from seer.anomaly_detection.models.external import AnomalyDetectionConfig, TimeSeriesPoint
from seer.anomaly_detection.tasks import cleanup_timeseries
from seer.db import DbDynamicAlert, DbDynamicAlertTimeSeries, Session
from seer.exceptions import ClientError

Expand Down Expand Up @@ -54,11 +55,18 @@ def save_timepoint(
def delete_alert_data(self, external_alert_id: int):
return NotImplemented

@abc.abstractmethod
def cleanup_data(self, external_alert_id: int):
return NotImplemented


class DbAlertDataAccessor(AlertDataAccessor):

@sentry_sdk.trace
def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
# TODO: Add some some random noise (0-48 hours) to threshold
timestamp_threshold = datetime.datetime.now() - datetime.timedelta(days=28)
num_points_older = 0
window_size = db_alert.anomaly_algo_data.get("window_size")
flags = []
scores = []
Expand All @@ -75,6 +83,12 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
point.anomaly_algo_data
)
mp.append([dist, idx, l_idx, r_idx])
if point.timestamp.timestamp < timestamp_threshold:
num_points_older += 1

cleanup_threshold = 24 * 4 * 2 # num alerts over 2 days
if num_points_older >= cleanup_threshold:
self.cleanup_data(db_alert.external_alert_id, timestamp_threshold)

anomalies = MPTimeSeriesAnomalies(
flags=flags,
Expand Down Expand Up @@ -209,3 +223,16 @@ def delete_alert_data(self, external_alert_id: int):
raise ClientError(f"Alert with id {external_alert_id} not found")
session.delete(existing)
session.commit()

@sentry_sdk.trace
def cleanup_data(self, alert_id: int, date_threshold: datetime):

# Update task_flag
with Session() as session:
session.update(DbDynamicAlert).where(external_alert_id=alert_id).values(
task_flag="queued"
)
session.commit()

# Create cleanup task
cleanup_timeseries.apply_async((alert_id, date_threshold))
43 changes: 22 additions & 21 deletions src/seer/anomaly_detection/tasks.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,34 @@
import datetime
import logging

# import sentry_sdk
import sentry_sdk
import sqlalchemy.sql as sql

from celery_app.app import celery_app
from seer.anomaly_detection.detectors.anomaly_detectors import MPBatchAnomalyDetector
from seer.anomaly_detection.models import AnomalyDetectionConfig
from seer.db import DbDynamicAlert, DbDynamicAlertTimeSeries, Session

# from typing import List


# import stumpy


logger = logging.getLogger(__name__)


@sentry_sdk.trace
@celery_app.task(timelimit=5)
def cleanup_timeseries(alert_id: int, date_threshold: datetime):
def cleanup_timeseries(alert_id: int, date_threshold: datetime = datetime.datetime):

logger.info("Deleting timeseries points over 28 days old and updating matrix profiles")

toggle_task_flag()
toggle_task_flag(alert_id)
deleted_timeseries_points = delete_old_timeseries_points(alert_id, date_threshold)
updated_matrix_profiles, num_alert_ids = update_matrix_profiles(alert_id)
toggle_task_flag()
update_matrix_profiles(alert_id)
toggle_task_flag(alert_id)
logger.info(f"Deleted {deleted_timeseries_points} timeseries points")
logger.info(f"Updated {updated_matrix_profiles} matrix profiles for {num_alert_ids} alert ids")
logger.info(f"Updated matrix profiles for alertd id {alert_id}")


def delete_old_timeseries_points(alert_id: int, date_threshold: datetime, batch_size: int = 1000):

# Delete respective timeseries points and get the alert ids that are being deleted
# Delete respective timeseries points
deleted_count = 0
while True:
with Session() as session:
Expand All @@ -47,13 +42,11 @@ def delete_old_timeseries_points(alert_id: int, date_threshold: datetime, batch_
.limit(batch_size)
.subquery()
)

count = (
session.query(DbDynamicAlertTimeSeries)
.filter(sql.exists().where(DbDynamicAlertTimeSeries.id == subquery.c.group_id))
.delete()
)

session.commit()

deleted_count += count
Expand All @@ -70,11 +63,9 @@ def update_matrix_profiles(alert_id: int):
timeseries = session.query(DbDynamicAlert.timeseries).filter(
DbDynamicAlert.dynamic_alert_id == alert_id
)

config = AnomalyDetectionConfig(
time_period=5, sensitivity="high", direction="both", expected_seasonality="auto"
)

updated_mp = MPBatchAnomalyDetector()._compute_matrix_profile(
timeseries=timeseries, config=config
)
Expand All @@ -83,11 +74,21 @@ def update_matrix_profiles(alert_id: int):
anomaly_algo_data=updated_mp
)

pass
session.commit()


def toggle_task_flag(alert_id: int):

def toggle_task_flag():
with Session() as session:

# with Session() as session:
task_flag = session.query(DbDynamicAlert.task_flag).where(DbDynamicAlert.id == alert_id)
new_flag = ""
if task_flag == "queued":
new_flag = "processing"
elif task_flag == "processing":
new_flag = "not_queued"

pass
session.update(DbDynamicAlert).where(DbDynamicAlert.id == alert_id).values(
task_flag=new_flag
)
session.commit()

0 comments on commit 341526b

Please sign in to comment.