Skip to content

Commit

Permalink
Update flag name
Browse files Browse the repository at this point in the history
  • Loading branch information
aayush-se committed Sep 20, 2024
1 parent b4b0ad6 commit 1bdb5c6
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 18 deletions.
32 changes: 32 additions & 0 deletions src/migrations/versions/480ca2916d86_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Migration
Revision ID: 480ca2916d86
Revises: da0a9c9f1bb4
Create Date: 2024-09-20 20:50:57.331837
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "480ca2916d86"
down_revision = "da0a9c9f1bb4"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("dynamic_alerts", schema=None) as batch_op:
batch_op.add_column(sa.Column("data_purge_flag", sa.String(), nullable=False))

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("dynamic_alerts", schema=None) as batch_op:
batch_op.drop_column("data_purge_flag")

# ### end Alembic commands ###
14 changes: 7 additions & 7 deletions src/seer/anomaly_detection/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def save_alert(
timeseries: List[TimeSeriesPoint],
anomalies: TimeSeriesAnomalies,
anomaly_algo_data: dict,
task_flag: str,
data_purge_flag: str,
):
return NotImplemented

Expand All @@ -60,7 +60,7 @@ def delete_alert_data(self, external_alert_id: int):
return NotImplemented

@abc.abstractmethod
def set_task_flag(self, external_alert_id: int):
def set_data_purge_flag(self, external_alert_id: int):
return NotImplemented


Expand Down Expand Up @@ -146,7 +146,7 @@ def save_alert(
timeseries: List[TimeSeriesPoint],
anomalies: TimeSeriesAnomalies,
anomaly_algo_data: dict,
task_flag: str,
data_purge_flag: str,
):
with Session() as session:
existing_records = (
Expand Down Expand Up @@ -185,7 +185,7 @@ def save_alert(
for i, point in enumerate(timeseries)
],
anomaly_algo_data=anomaly_algo_data,
task_flag=task_flag,
data_purge_flag=data_purge_flag,
)
session.add(new_record)
session.commit()
Expand Down Expand Up @@ -232,12 +232,12 @@ def delete_alert_data(self, external_alert_id: int):
session.commit()

@sentry_sdk.trace
def set_task_flag(self, alert_id: int):
def set_data_purge_flag(self, alert_id: int):

# Update task_flag
# Update data_purge_flag
with Session() as session:
session.update(DbDynamicAlert).where(external_alert_id=alert_id).values(
task_flag="queued"
data_purge_flag="queued"
)
session.commit()

Expand Down
7 changes: 4 additions & 3 deletions src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
TimeSeriesPoint,
TimeSeriesWithHistory,
)
from seer.anomaly_detection.tasks import cleanup_timeseries
from seer.dependency_injection import inject, injected
from seer.exceptions import ClientError, ServerError

Expand Down Expand Up @@ -151,10 +150,12 @@ def _online_detect(
anomaly_algo_data=streamed_anomalies.get_anomaly_algo_data(len(ts_external))[0],
)

from seer.anomaly_detection.tasks import cleanup_timeseries

# Set flag and create new task for cleanup
cleanup_config = historic.cleanup_config
if cleanup_config.num_old_points >= cleanup_config.num_acceptable_points:
alert_data_accessor.set_task_flag()
alert_data_accessor.set_data_purge_flag()
cleanup_timeseries.apply_async(
historic.external_alert_id, cleanup_config.timestamp_threshold
)
Expand Down Expand Up @@ -297,7 +298,7 @@ def store_data(
timeseries=ts,
anomalies=anomalies,
anomaly_algo_data={"window_size": anomalies.window_size},
task_flag="not_queued",
data_purge_flag="not_queued",
)
return StoreDataResponse(success=True)

Expand Down
1 change: 1 addition & 0 deletions src/seer/anomaly_detection/models/dynamic_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ class DynamicAlert(BaseModel):
timeseries: TimeSeries
anomalies: TimeSeriesAnomalies
cleanup_config: CleanupConfig
data_purge_flag: str
16 changes: 9 additions & 7 deletions src/seer/anomaly_detection/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def cleanup_timeseries(alert_id: int, date_threshold: datetime = datetime.dateti

logger.info("Deleting timeseries points over 28 days old and updating matrix profiles")

toggle_task_flag(alert_id)
toggle_data_purge_flag(alert_id)
deleted_timeseries_points = delete_old_timeseries_points(alert_id, date_threshold)
update_matrix_profiles(alert_id)
toggle_task_flag(alert_id)
toggle_data_purge_flag(alert_id)
logger.info(f"Deleted {deleted_timeseries_points} timeseries points")
logger.info(f"Updated matrix profiles for alertd id {alert_id}")

Expand Down Expand Up @@ -77,20 +77,22 @@ def update_matrix_profiles(alert_id: int):
session.commit()


def toggle_task_flag(alert_id: int):
def toggle_data_purge_flag(alert_id: int):

with Session() as session:

task_flag = session.query(DbDynamicAlert.task_flag).where(DbDynamicAlert.id == alert_id)
data_purge_flag = session.query(DbDynamicAlert.data_purge_flag).where(
DbDynamicAlert.id == alert_id
)

# TODO: Confirm that this is right (chance there might be empty string)
new_flag = ""
if task_flag == "queued":
if data_purge_flag == "queued":
new_flag = "processing"
elif task_flag == "processing":
elif data_purge_flag == "processing":
new_flag = "not_queued"

session.update(DbDynamicAlert).where(DbDynamicAlert.id == alert_id).values(
task_flag=new_flag
data_purge_flag=new_flag
)
session.commit()
2 changes: 1 addition & 1 deletion src/seer/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class DbDynamicAlert(Base):
cascade="all, delete",
passive_deletes=True,
)
task_flag: Mapped[str] = mapped_column(String, nullable=False)
data_purge_flag: Mapped[str] = mapped_column(String, nullable=False)


class DbDynamicAlertTimeSeries(Base):
Expand Down

0 comments on commit 1bdb5c6

Please sign in to comment.