-
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create task for cleaning and updating MP
- Loading branch information
Showing
2 changed files
with
94 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import datetime | ||
import logging | ||
|
||
# import sentry_sdk | ||
import sqlalchemy.sql as sql | ||
|
||
from celery_app.app import celery_app | ||
from seer.anomaly_detection.detectors.anomaly_detectors import MPBatchAnomalyDetector | ||
from seer.anomaly_detection.models import AnomalyDetectionConfig | ||
from seer.db import DbDynamicAlert, DbDynamicAlertTimeSeries, Session | ||
|
||
# from typing import List | ||
|
||
|
||
# import stumpy | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@celery_app.task(timelimit=5) | ||
def cleanup_timeseries(alert_id: int, date_threshold: datetime): | ||
|
||
logger.info("Deleting timeseries points over 28 days old and updating matrix profiles") | ||
|
||
toggle_task_flag() | ||
deleted_timeseries_points = delete_old_timeseries_points(alert_id, date_threshold) | ||
updated_matrix_profiles, num_alert_ids = update_matrix_profiles(alert_id) | ||
toggle_task_flag() | ||
logger.info(f"Deleted {deleted_timeseries_points} timeseries points") | ||
logger.info(f"Updated {updated_matrix_profiles} matrix profiles for {num_alert_ids} alert ids") | ||
|
||
|
||
def delete_old_timeseries_points(alert_id: int, date_threshold: datetime, batch_size: int = 1000): | ||
|
||
# Delete respective timeseries points and get the alert ids that are being deleted | ||
deleted_count = 0 | ||
while True: | ||
with Session() as session: | ||
|
||
subquery = ( | ||
session.query(DbDynamicAlertTimeSeries.id) | ||
.filter( | ||
DbDynamicAlert.id == alert_id, | ||
DbDynamicAlertTimeSeries.timestamp < date_threshold, | ||
) | ||
.limit(batch_size) | ||
.subquery() | ||
) | ||
|
||
count = ( | ||
session.query(DbDynamicAlertTimeSeries) | ||
.filter(sql.exists().where(DbDynamicAlertTimeSeries.id == subquery.c.group_id)) | ||
.delete() | ||
) | ||
|
||
session.commit() | ||
|
||
deleted_count += count | ||
if count == 0: | ||
break | ||
|
||
return deleted_count | ||
|
||
|
||
def update_matrix_profiles(alert_id: int): | ||
|
||
with Session() as session: | ||
|
||
timeseries = session.query(DbDynamicAlert.timeseries).filter( | ||
DbDynamicAlert.dynamic_alert_id == alert_id | ||
) | ||
|
||
config = AnomalyDetectionConfig( | ||
time_period=5, sensitivity="high", direction="both", expected_seasonality="auto" | ||
) | ||
|
||
updated_mp = MPBatchAnomalyDetector()._compute_matrix_profile( | ||
timeseries=timeseries, config=config | ||
) | ||
|
||
session.update(DbDynamicAlert).where(DbDynamicAlert.id == alert_id).values( | ||
anomaly_algo_data=updated_mp | ||
) | ||
|
||
pass | ||
|
||
|
||
def toggle_task_flag(): | ||
|
||
# with Session() as session: | ||
|
||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters