Skip to content

Commit

Permalink
Fix incorrect queries and timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
aayush-se committed Sep 20, 2024
1 parent 1bdb5c6 commit bf818da
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 18 deletions.
11 changes: 7 additions & 4 deletions src/seer/anomaly_detection/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DbAlertDataAccessor(AlertDataAccessor):
@sentry_sdk.trace
def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
# TODO: Add some some random noise (0-48 hours) to threshold
timestamp_threshold = datetime.datetime.now() - datetime.timedelta(days=28)
timestamp_threshold = (datetime.datetime.now() - datetime.timedelta(days=28)).timestamp()
num_old_points = 0
window_size = db_alert.anomaly_algo_data.get("window_size")
flags = []
Expand All @@ -87,7 +87,7 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
point.anomaly_algo_data
)
mp.append([dist, idx, l_idx, r_idx])
if point.timestamp.timestamp < timestamp_threshold:
if point.timestamp.timestamp() < timestamp_threshold:
num_old_points += 1

anomalies = MPTimeSeriesAnomalies(
Expand Down Expand Up @@ -236,9 +236,12 @@ def set_data_purge_flag(self, alert_id: int):

# Update data_purge_flag
with Session() as session:
session.update(DbDynamicAlert).where(external_alert_id=alert_id).values(
data_purge_flag="queued"
dynamic_alert = session.query(DbDynamicAlert).where(
DbDynamicAlert.dynamic_alert_id == alert_id
)

dynamic_alert.data_purge_flag = "queued"

session.commit()

# Create cleanup task
6 changes: 3 additions & 3 deletions src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ def _online_detect(
# Set flag and create new task for cleanup
cleanup_config = historic.cleanup_config
if cleanup_config.num_old_points >= cleanup_config.num_acceptable_points:
alert_data_accessor.set_data_purge_flag()
cleanup_timeseries.apply_async(
alert_data_accessor.set_data_purge_flag(historic.external_alert_id)
cleanup_timeseries(
historic.external_alert_id, cleanup_config.timestamp_threshold
)
).apply_async()

return ts_external, streamed_anomalies

Expand Down
1 change: 0 additions & 1 deletion src/seer/anomaly_detection/models/dynamic_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ class DynamicAlert(BaseModel):
timeseries: TimeSeries
anomalies: TimeSeriesAnomalies
cleanup_config: CleanupConfig
data_purge_flag: str
27 changes: 17 additions & 10 deletions src/seer/anomaly_detection/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@

@sentry_sdk.trace
@celery_app.task(timelimit=5)
def cleanup_timeseries(alert_id: int, date_threshold: datetime = datetime.datetime):
def cleanup_timeseries(alert_id: int, date_threshold: float):

logger.info("Deleting timeseries points over 28 days old and updating matrix profiles")

toggle_data_purge_flag(alert_id)
deleted_timeseries_points = delete_old_timeseries_points(alert_id, date_threshold)
update_matrix_profiles(alert_id)
toggle_data_purge_flag(alert_id)

logger.info(f"Deleted {deleted_timeseries_points} timeseries points")
logger.info(f"Updated matrix profiles for alertd id {alert_id}")


def delete_old_timeseries_points(alert_id: int, date_threshold: datetime, batch_size: int = 1000):
def delete_old_timeseries_points(alert_id: int, date_threshold: float, batch_size: int = 1000):

date_threshold = datetime.datetime.fromtimestamp(date_threshold)

# Delete respective timeseries points
deleted_count = 0
Expand All @@ -36,15 +39,15 @@ def delete_old_timeseries_points(alert_id: int, date_threshold: datetime, batch_
subquery = (
session.query(DbDynamicAlertTimeSeries.id)
.filter(
DbDynamicAlert.id == alert_id,
DbDynamicAlert.dynamic_alert_id == alert_id,
DbDynamicAlertTimeSeries.timestamp < date_threshold,
)
.limit(batch_size)
.subquery()
)
count = (
session.query(DbDynamicAlertTimeSeries)
.filter(sql.exists().where(DbDynamicAlertTimeSeries.id == subquery.c.group_id))
.filter(sql.exists().where(DbDynamicAlertTimeSeries.id == subquery.c.id))
.delete()
)
session.commit()
Expand All @@ -61,7 +64,7 @@ def update_matrix_profiles(alert_id: int):
with Session() as session:

timeseries = session.query(DbDynamicAlert.timeseries).filter(
DbDynamicAlert.dynamic_alert_id == alert_id
DbDynamicAlert.external_alert_id == alert_id
)
config = AnomalyDetectionConfig(
time_period=5, sensitivity="high", direction="both", expected_seasonality="auto"
Expand All @@ -70,10 +73,12 @@ def update_matrix_profiles(alert_id: int):
timeseries=timeseries, config=config
)

session.update(DbDynamicAlert).where(DbDynamicAlert.id == alert_id).values(
anomaly_algo_data=updated_mp
dynamic_alert = session.query(DbDynamicAlert).where(
DbDynamicAlert.dynamic_alert_id == alert_id
)

dynamic_alert.anomaly_algo_data = updated_mp

session.commit()


Expand All @@ -82,7 +87,7 @@ def toggle_data_purge_flag(alert_id: int):
with Session() as session:

data_purge_flag = session.query(DbDynamicAlert.data_purge_flag).where(
DbDynamicAlert.id == alert_id
DbDynamicAlert.dynamic_alert_id == alert_id
)

# TODO: Confirm that this is right (chance there might be empty string)
Expand All @@ -92,7 +97,9 @@ def toggle_data_purge_flag(alert_id: int):
elif data_purge_flag == "processing":
new_flag = "not_queued"

session.update(DbDynamicAlert).where(DbDynamicAlert.id == alert_id).values(
data_purge_flag=new_flag
dynamic_alert = session.query(DbDynamicAlert).where(
DbDynamicAlert.dynamic_alert_id == alert_id
)

dynamic_alert.data_purge_flag = new_flag
session.commit()

0 comments on commit bf818da

Please sign in to comment.