diff --git a/src/seer/anomaly_detection/accessors.py b/src/seer/anomaly_detection/accessors.py index 328a5ef1d..3e1a4cca7 100644 --- a/src/seer/anomaly_detection/accessors.py +++ b/src/seer/anomaly_detection/accessors.py @@ -69,7 +69,7 @@ class DbAlertDataAccessor(AlertDataAccessor): @sentry_sdk.trace def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert: # TODO: Add some some random noise (0-48 hours) to threshold - timestamp_threshold = datetime.datetime.now() - datetime.timedelta(days=28) + timestamp_threshold = (datetime.datetime.now() - datetime.timedelta(days=28)).timestamp() num_old_points = 0 window_size = db_alert.anomaly_algo_data.get("window_size") flags = [] @@ -87,7 +87,7 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert: point.anomaly_algo_data ) mp.append([dist, idx, l_idx, r_idx]) - if point.timestamp.timestamp < timestamp_threshold: + if point.timestamp.timestamp() < timestamp_threshold: num_old_points += 1 anomalies = MPTimeSeriesAnomalies( @@ -236,9 +236,12 @@ def set_data_purge_flag(self, alert_id: int): # Update data_purge_flag with Session() as session: - session.update(DbDynamicAlert).where(external_alert_id=alert_id).values( - data_purge_flag="queued" + dynamic_alert = session.query(DbDynamicAlert).where( + DbDynamicAlert.dynamic_alert_id == alert_id ) + + dynamic_alert.data_purge_flag = "queued" + session.commit() # Create cleanup task diff --git a/src/seer/anomaly_detection/anomaly_detection.py b/src/seer/anomaly_detection/anomaly_detection.py index f77fdd5a2..ccd2126d1 100644 --- a/src/seer/anomaly_detection/anomaly_detection.py +++ b/src/seer/anomaly_detection/anomaly_detection.py @@ -155,10 +155,10 @@ def _online_detect( # Set flag and create new task for cleanup cleanup_config = historic.cleanup_config if cleanup_config.num_old_points >= cleanup_config.num_acceptable_points: - alert_data_accessor.set_data_purge_flag() - cleanup_timeseries.apply_async( + alert_data_accessor.set_data_purge_flag(historic.external_alert_id) + cleanup_timeseries( historic.external_alert_id, cleanup_config.timestamp_threshold - ) + ).apply_async() return ts_external, streamed_anomalies diff --git a/src/seer/anomaly_detection/models/dynamic_alert.py b/src/seer/anomaly_detection/models/dynamic_alert.py index f7cd4db2b..73b51ce7e 100644 --- a/src/seer/anomaly_detection/models/dynamic_alert.py +++ b/src/seer/anomaly_detection/models/dynamic_alert.py @@ -17,4 +17,3 @@ class DynamicAlert(BaseModel): timeseries: TimeSeries anomalies: TimeSeriesAnomalies cleanup_config: CleanupConfig - data_purge_flag: str diff --git a/src/seer/anomaly_detection/tasks.py b/src/seer/anomaly_detection/tasks.py index 7124de736..d07016c90 100644 --- a/src/seer/anomaly_detection/tasks.py +++ b/src/seer/anomaly_detection/tasks.py @@ -14,7 +14,7 @@ @sentry_sdk.trace @celery_app.task(timelimit=5) -def cleanup_timeseries(alert_id: int, date_threshold: datetime = datetime.datetime): +def cleanup_timeseries(alert_id: int, date_threshold: float): logger.info("Deleting timeseries points over 28 days old and updating matrix profiles") @@ -22,11 +22,14 @@ def cleanup_timeseries(alert_id: int, date_threshold: datetime = datetime.dateti deleted_timeseries_points = delete_old_timeseries_points(alert_id, date_threshold) update_matrix_profiles(alert_id) toggle_data_purge_flag(alert_id) + logger.info(f"Deleted {deleted_timeseries_points} timeseries points") logger.info(f"Updated matrix profiles for alertd id {alert_id}") -def delete_old_timeseries_points(alert_id: int, date_threshold: datetime, batch_size: int = 1000): +def delete_old_timeseries_points(alert_id: int, date_threshold: float, batch_size: int = 1000): + + date_threshold = datetime.datetime.fromtimestamp(date_threshold) # Delete respective timeseries points deleted_count = 0 @@ -36,7 +39,7 @@ def delete_old_timeseries_points(alert_id: int, date_threshold: datetime, batch_ subquery = ( session.query(DbDynamicAlertTimeSeries.id) .filter( - DbDynamicAlert.id == alert_id, + DbDynamicAlert.dynamic_alert_id == alert_id, DbDynamicAlertTimeSeries.timestamp < date_threshold, ) .limit(batch_size) @@ -44,7 +47,7 @@ def delete_old_timeseries_points(alert_id: int, date_threshold: datetime, batch_ ) count = ( session.query(DbDynamicAlertTimeSeries) - .filter(sql.exists().where(DbDynamicAlertTimeSeries.id == subquery.c.group_id)) + .filter(sql.exists().where(DbDynamicAlertTimeSeries.id == subquery.c.id)) .delete() ) session.commit() @@ -61,7 +64,7 @@ def update_matrix_profiles(alert_id: int): with Session() as session: timeseries = session.query(DbDynamicAlert.timeseries).filter( - DbDynamicAlert.dynamic_alert_id == alert_id + DbDynamicAlert.external_alert_id == alert_id ) config = AnomalyDetectionConfig( time_period=5, sensitivity="high", direction="both", expected_seasonality="auto" @@ -70,10 +73,12 @@ def update_matrix_profiles(alert_id: int): timeseries=timeseries, config=config ) - session.update(DbDynamicAlert).where(DbDynamicAlert.id == alert_id).values( - anomaly_algo_data=updated_mp + dynamic_alert = session.query(DbDynamicAlert).where( + DbDynamicAlert.dynamic_alert_id == alert_id ) + dynamic_alert.anomaly_algo_data = updated_mp + session.commit() @@ -82,7 +87,7 @@ def toggle_data_purge_flag(alert_id: int): with Session() as session: data_purge_flag = session.query(DbDynamicAlert.data_purge_flag).where( - DbDynamicAlert.id == alert_id + DbDynamicAlert.dynamic_alert_id == alert_id ) # TODO: Confirm that this is right (chance there might be empty string) @@ -92,7 +97,9 @@ def toggle_data_purge_flag(alert_id: int): elif data_purge_flag == "processing": new_flag = "not_queued" - session.update(DbDynamicAlert).where(DbDynamicAlert.id == alert_id).values( - data_purge_flag=new_flag + dynamic_alert = session.query(DbDynamicAlert).where( + DbDynamicAlert.dynamic_alert_id == alert_id ) + + dynamic_alert.data_purge_flag = new_flag session.commit()