From 9d4f5831c874581f055b0b2fc5fc9b0bf6bfec5c Mon Sep 17 00:00:00 2001 From: Aayush Seth Date: Thu, 19 Sep 2024 13:36:37 -0700 Subject: [PATCH] Setup tasks --- src/celery_app/tasks.py | 2 +- src/seer/anomaly_detection/accessors.py | 25 +++++++++++-------- .../anomaly_detection/anomaly_detection.py | 12 +++++++-- .../anomaly_detection/models/dynamic_alert.py | 3 ++- src/seer/anomaly_detection/models/external.py | 6 +++++ src/seer/anomaly_detection/tasks.py | 2 ++ 6 files changed, 35 insertions(+), 15 deletions(-) diff --git a/src/celery_app/tasks.py b/src/celery_app/tasks.py index 695e42ea..98cc283b 100644 --- a/src/celery_app/tasks.py +++ b/src/celery_app/tasks.py @@ -2,7 +2,7 @@ # See the test_seer:test_detected_celery_jobs test from celery.schedules import crontab -import seer.app # noqa: F401 +# import seer.app # noqa: F401 from celery_app.app import celery_app as celery # noqa: F401 from celery_app.config import CeleryQueues from seer.automation.autofix.tasks import check_and_mark_recent_autofix_runs diff --git a/src/seer/anomaly_detection/accessors.py b/src/seer/anomaly_detection/accessors.py index f4a11f21..95ef132b 100644 --- a/src/seer/anomaly_detection/accessors.py +++ b/src/seer/anomaly_detection/accessors.py @@ -15,8 +15,11 @@ MPTimeSeriesAnomalies, TimeSeriesAnomalies, ) -from seer.anomaly_detection.models.external import AnomalyDetectionConfig, TimeSeriesPoint -from seer.anomaly_detection.tasks import cleanup_timeseries +from seer.anomaly_detection.models.external import ( + AnomalyDetectionConfig, + CleanupConfig, + TimeSeriesPoint, +) from seer.db import DbDynamicAlert, DbDynamicAlertTimeSeries, Session from seer.exceptions import ClientError @@ -56,7 +59,7 @@ def delete_alert_data(self, external_alert_id: int): return NotImplemented @abc.abstractmethod - def cleanup_data(self, external_alert_id: int): + def set_task_flag(self, external_alert_id: int): return NotImplemented @@ -66,7 +69,7 @@ class DbAlertDataAccessor(AlertDataAccessor): def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert: # TODO: Add some some random noise (0-48 hours) to threshold timestamp_threshold = datetime.datetime.now() - datetime.timedelta(days=28) - num_points_older = 0 + num_old_points = 0 window_size = db_alert.anomaly_algo_data.get("window_size") flags = [] scores = [] @@ -84,11 +87,7 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert: ) mp.append([dist, idx, l_idx, r_idx]) if point.timestamp.timestamp < timestamp_threshold: - num_points_older += 1 - - cleanup_threshold = 24 * 4 * 2 # num alerts over 2 days - if num_points_older >= cleanup_threshold: - self.cleanup_data(db_alert.external_alert_id, timestamp_threshold) + num_old_points += 1 anomalies = MPTimeSeriesAnomalies( flags=flags, @@ -111,6 +110,11 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert: values=np.array(values), ), anomalies=anomalies, + cleanup_config=CleanupConfig( + num_old_points=num_old_points, + timestamp_threshold=timestamp_threshold, + num_acceptable_points=24 * 4 * 2, # Num alerts for 2 days + ), ) @sentry_sdk.trace @@ -225,7 +229,7 @@ def delete_alert_data(self, external_alert_id: int): session.commit() @sentry_sdk.trace - def cleanup_data(self, alert_id: int, date_threshold: datetime): + def set_task_flag(self, alert_id: int): # Update task_flag with Session() as session: @@ -235,4 +239,3 @@ def cleanup_data(self, alert_id: int, date_threshold: datetime): session.commit() # Create cleanup task - cleanup_timeseries.apply_async((alert_id, date_threshold)) diff --git a/src/seer/anomaly_detection/anomaly_detection.py b/src/seer/anomaly_detection/anomaly_detection.py index 670c5dcc..e25c4468 100644 --- a/src/seer/anomaly_detection/anomaly_detection.py +++ b/src/seer/anomaly_detection/anomaly_detection.py @@ -24,6 +24,7 @@ TimeSeriesPoint, TimeSeriesWithHistory, ) +from seer.anomaly_detection.tasks import cleanup_timeseries from seer.dependency_injection import inject, injected from seer.exceptions import ClientError, ServerError @@ -96,7 +97,6 @@ def _online_detect( Returns: Tuple with input timeseries and identified anomalies """ - logger.info(f"Detecting anomalies for alert ID: {alert.id}") ts_external: List[TimeSeriesPoint] = [] if alert.cur_window: @@ -149,7 +149,15 @@ def _online_detect( anomaly=streamed_anomalies, anomaly_algo_data=streamed_anomalies.get_anomaly_algo_data(len(ts_external))[0], ) - # TODO: Clean up old data + + # Set flag and create new task for cleanup + cleanup_config = historic.cleanup_config + if cleanup_config.num_old_points >= cleanup_config.num_acceptable_points: + alert_data_accessor.set_task_flag() + cleanup_timeseries.apply_async( + historic.external_alert_id, cleanup_config.timestamp_threshold + ) + return ts_external, streamed_anomalies def _min_required_timesteps(self, time_period, min_num_days=7): diff --git a/src/seer/anomaly_detection/models/dynamic_alert.py b/src/seer/anomaly_detection/models/dynamic_alert.py index 0a25c129..73b51ce7 100644 --- a/src/seer/anomaly_detection/models/dynamic_alert.py +++ b/src/seer/anomaly_detection/models/dynamic_alert.py @@ -2,7 +2,7 @@ from pydantic import BaseModel -from seer.anomaly_detection.models.external import AnomalyDetectionConfig +from seer.anomaly_detection.models.external import AnomalyDetectionConfig, CleanupConfig from seer.anomaly_detection.models.timeseries import TimeSeries from seer.anomaly_detection.models.timeseries_anomalies import TimeSeriesAnomalies @@ -16,3 +16,4 @@ class DynamicAlert(BaseModel): config: AnomalyDetectionConfig timeseries: TimeSeries anomalies: TimeSeriesAnomalies + cleanup_config: CleanupConfig diff --git a/src/seer/anomaly_detection/models/external.py b/src/seer/anomaly_detection/models/external.py index 43d2f910..7fccd9a8 100644 --- a/src/seer/anomaly_detection/models/external.py +++ b/src/seer/anomaly_detection/models/external.py @@ -99,3 +99,9 @@ class DeleteAlertDataRequest(BaseModel): class DeleteAlertDataResponse(BaseModel): success: bool message: Optional[str] = Field(None) + + +class CleanupConfig(BaseModel): + num_old_points: int + timestamp_threshold: float + num_acceptable_points: int diff --git a/src/seer/anomaly_detection/tasks.py b/src/seer/anomaly_detection/tasks.py index 1dc8a27a..995019a3 100644 --- a/src/seer/anomaly_detection/tasks.py +++ b/src/seer/anomaly_detection/tasks.py @@ -82,6 +82,8 @@ def toggle_task_flag(alert_id: int): with Session() as session: task_flag = session.query(DbDynamicAlert.task_flag).where(DbDynamicAlert.id == alert_id) + + # TODO: Confirm that this is right (chance there might be empty string) new_flag = "" if task_flag == "queued": new_flag = "processing"