Skip to content

Commit

Permalink
Setup tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
aayush-se committed Sep 19, 2024
1 parent 341526b commit 9d4f583
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/celery_app/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# See the test_seer:test_detected_celery_jobs test
from celery.schedules import crontab

import seer.app # noqa: F401
# import seer.app # noqa: F401
from celery_app.app import celery_app as celery # noqa: F401
from celery_app.config import CeleryQueues
from seer.automation.autofix.tasks import check_and_mark_recent_autofix_runs
Expand Down
25 changes: 14 additions & 11 deletions src/seer/anomaly_detection/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
MPTimeSeriesAnomalies,
TimeSeriesAnomalies,
)
from seer.anomaly_detection.models.external import AnomalyDetectionConfig, TimeSeriesPoint
from seer.anomaly_detection.tasks import cleanup_timeseries
from seer.anomaly_detection.models.external import (
AnomalyDetectionConfig,
CleanupConfig,
TimeSeriesPoint,
)
from seer.db import DbDynamicAlert, DbDynamicAlertTimeSeries, Session
from seer.exceptions import ClientError

Expand Down Expand Up @@ -56,7 +59,7 @@ def delete_alert_data(self, external_alert_id: int):
return NotImplemented

@abc.abstractmethod
def cleanup_data(self, external_alert_id: int):
def set_task_flag(self, external_alert_id: int):
return NotImplemented


Expand All @@ -66,7 +69,7 @@ class DbAlertDataAccessor(AlertDataAccessor):
def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
# TODO: Add some some random noise (0-48 hours) to threshold
timestamp_threshold = datetime.datetime.now() - datetime.timedelta(days=28)
num_points_older = 0
num_old_points = 0
window_size = db_alert.anomaly_algo_data.get("window_size")
flags = []
scores = []
Expand All @@ -84,11 +87,7 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
)
mp.append([dist, idx, l_idx, r_idx])
if point.timestamp.timestamp < timestamp_threshold:
num_points_older += 1

cleanup_threshold = 24 * 4 * 2 # num alerts over 2 days
if num_points_older >= cleanup_threshold:
self.cleanup_data(db_alert.external_alert_id, timestamp_threshold)
num_old_points += 1

anomalies = MPTimeSeriesAnomalies(
flags=flags,
Expand All @@ -111,6 +110,11 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
values=np.array(values),
),
anomalies=anomalies,
cleanup_config=CleanupConfig(
num_old_points=num_old_points,
timestamp_threshold=timestamp_threshold,
num_acceptable_points=24 * 4 * 2, # Num alerts for 2 days
),
)

@sentry_sdk.trace
Expand Down Expand Up @@ -225,7 +229,7 @@ def delete_alert_data(self, external_alert_id: int):
session.commit()

@sentry_sdk.trace
def cleanup_data(self, alert_id: int, date_threshold: datetime):
def set_task_flag(self, alert_id: int):

# Update task_flag
with Session() as session:
Expand All @@ -235,4 +239,3 @@ def cleanup_data(self, alert_id: int, date_threshold: datetime):
session.commit()

# Create cleanup task
cleanup_timeseries.apply_async((alert_id, date_threshold))
12 changes: 10 additions & 2 deletions src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
TimeSeriesPoint,
TimeSeriesWithHistory,
)
from seer.anomaly_detection.tasks import cleanup_timeseries
from seer.dependency_injection import inject, injected
from seer.exceptions import ClientError, ServerError

Expand Down Expand Up @@ -96,7 +97,6 @@ def _online_detect(
Returns:
Tuple with input timeseries and identified anomalies
"""

logger.info(f"Detecting anomalies for alert ID: {alert.id}")
ts_external: List[TimeSeriesPoint] = []
if alert.cur_window:
Expand Down Expand Up @@ -149,7 +149,15 @@ def _online_detect(
anomaly=streamed_anomalies,
anomaly_algo_data=streamed_anomalies.get_anomaly_algo_data(len(ts_external))[0],
)
# TODO: Clean up old data

# Set flag and create new task for cleanup
cleanup_config = historic.cleanup_config
if cleanup_config.num_old_points >= cleanup_config.num_acceptable_points:
alert_data_accessor.set_task_flag()
cleanup_timeseries.apply_async(
historic.external_alert_id, cleanup_config.timestamp_threshold
)

return ts_external, streamed_anomalies

def _min_required_timesteps(self, time_period, min_num_days=7):
Expand Down
3 changes: 2 additions & 1 deletion src/seer/anomaly_detection/models/dynamic_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from pydantic import BaseModel

from seer.anomaly_detection.models.external import AnomalyDetectionConfig
from seer.anomaly_detection.models.external import AnomalyDetectionConfig, CleanupConfig
from seer.anomaly_detection.models.timeseries import TimeSeries
from seer.anomaly_detection.models.timeseries_anomalies import TimeSeriesAnomalies

Expand All @@ -16,3 +16,4 @@ class DynamicAlert(BaseModel):
config: AnomalyDetectionConfig
timeseries: TimeSeries
anomalies: TimeSeriesAnomalies
cleanup_config: CleanupConfig
6 changes: 6 additions & 0 deletions src/seer/anomaly_detection/models/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,9 @@ class DeleteAlertDataRequest(BaseModel):
class DeleteAlertDataResponse(BaseModel):
success: bool
message: Optional[str] = Field(None)


class CleanupConfig(BaseModel):
num_old_points: int
timestamp_threshold: float
num_acceptable_points: int
2 changes: 2 additions & 0 deletions src/seer/anomaly_detection/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def toggle_task_flag(alert_id: int):
with Session() as session:

task_flag = session.query(DbDynamicAlert.task_flag).where(DbDynamicAlert.id == alert_id)

# TODO: Confirm that this is right (chance there might be empty string)
new_flag = ""
if task_flag == "queued":
new_flag = "processing"
Expand Down

0 comments on commit 9d4f583

Please sign in to comment.