diff --git a/src/migrations/versions/480ca2916d86_migration.py b/src/migrations/versions/480ca2916d86_migration.py new file mode 100644 index 00000000..03bfedbd --- /dev/null +++ b/src/migrations/versions/480ca2916d86_migration.py @@ -0,0 +1,32 @@ +"""Migration + +Revision ID: 480ca2916d86 +Revises: da0a9c9f1bb4 +Create Date: 2024-09-20 20:50:57.331837 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "480ca2916d86" +down_revision = "da0a9c9f1bb4" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("dynamic_alerts", schema=None) as batch_op: + batch_op.add_column(sa.Column("data_purge_flag", sa.String(), nullable=False)) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("dynamic_alerts", schema=None) as batch_op: + batch_op.drop_column("data_purge_flag") + + # ### end Alembic commands ### diff --git a/src/seer/anomaly_detection/accessors.py b/src/seer/anomaly_detection/accessors.py index 13400e43..328a5ef1 100644 --- a/src/seer/anomaly_detection/accessors.py +++ b/src/seer/anomaly_detection/accessors.py @@ -41,7 +41,7 @@ def save_alert( timeseries: List[TimeSeriesPoint], anomalies: TimeSeriesAnomalies, anomaly_algo_data: dict, - task_flag: str, + data_purge_flag: str, ): return NotImplemented @@ -60,7 +60,7 @@ def delete_alert_data(self, external_alert_id: int): return NotImplemented @abc.abstractmethod - def set_task_flag(self, external_alert_id: int): + def set_data_purge_flag(self, external_alert_id: int): return NotImplemented @@ -146,7 +146,7 @@ def save_alert( timeseries: List[TimeSeriesPoint], anomalies: TimeSeriesAnomalies, anomaly_algo_data: dict, - task_flag: str, + data_purge_flag: str, ): with Session() as session: existing_records = ( @@ -185,7 +185,7 @@ def save_alert( for i, point in enumerate(timeseries) ], anomaly_algo_data=anomaly_algo_data, - task_flag=task_flag, + data_purge_flag=data_purge_flag, ) session.add(new_record) session.commit() @@ -232,12 +232,12 @@ def delete_alert_data(self, external_alert_id: int): session.commit() @sentry_sdk.trace - def set_task_flag(self, alert_id: int): + def set_data_purge_flag(self, alert_id: int): - # Update task_flag + # Update data_purge_flag with Session() as session: session.update(DbDynamicAlert).where(external_alert_id=alert_id).values( - task_flag="queued" + data_purge_flag="queued" ) session.commit() diff --git a/src/seer/anomaly_detection/anomaly_detection.py b/src/seer/anomaly_detection/anomaly_detection.py index 7736b456..f77fdd5a 100644 --- a/src/seer/anomaly_detection/anomaly_detection.py +++ b/src/seer/anomaly_detection/anomaly_detection.py @@ -24,7 +24,6 @@ TimeSeriesPoint, TimeSeriesWithHistory, ) -from seer.anomaly_detection.tasks import cleanup_timeseries from seer.dependency_injection import inject, injected from seer.exceptions import ClientError, ServerError @@ -151,10 +150,12 @@ def _online_detect( anomaly_algo_data=streamed_anomalies.get_anomaly_algo_data(len(ts_external))[0], ) + from seer.anomaly_detection.tasks import cleanup_timeseries + # Set flag and create new task for cleanup cleanup_config = historic.cleanup_config if cleanup_config.num_old_points >= cleanup_config.num_acceptable_points: - alert_data_accessor.set_task_flag() + alert_data_accessor.set_data_purge_flag() cleanup_timeseries.apply_async( historic.external_alert_id, cleanup_config.timestamp_threshold ) @@ -297,7 +298,7 @@ def store_data( timeseries=ts, anomalies=anomalies, anomaly_algo_data={"window_size": anomalies.window_size}, - task_flag="not_queued", + data_purge_flag="not_queued", ) return StoreDataResponse(success=True) diff --git a/src/seer/anomaly_detection/models/dynamic_alert.py b/src/seer/anomaly_detection/models/dynamic_alert.py index 73b51ce7..f7cd4db2 100644 --- a/src/seer/anomaly_detection/models/dynamic_alert.py +++ b/src/seer/anomaly_detection/models/dynamic_alert.py @@ -17,3 +17,4 @@ class DynamicAlert(BaseModel): timeseries: TimeSeries anomalies: TimeSeriesAnomalies cleanup_config: CleanupConfig + data_purge_flag: str diff --git a/src/seer/anomaly_detection/tasks.py b/src/seer/anomaly_detection/tasks.py index 995019a3..7124de73 100644 --- a/src/seer/anomaly_detection/tasks.py +++ b/src/seer/anomaly_detection/tasks.py @@ -18,10 +18,10 @@ def cleanup_timeseries(alert_id: int, date_threshold: datetime = datetime.dateti logger.info("Deleting timeseries points over 28 days old and updating matrix profiles") - toggle_task_flag(alert_id) + toggle_data_purge_flag(alert_id) deleted_timeseries_points = delete_old_timeseries_points(alert_id, date_threshold) update_matrix_profiles(alert_id) - toggle_task_flag(alert_id) + toggle_data_purge_flag(alert_id) logger.info(f"Deleted {deleted_timeseries_points} timeseries points") logger.info(f"Updated matrix profiles for alertd id {alert_id}") @@ -77,20 +77,22 @@ def update_matrix_profiles(alert_id: int): session.commit() -def toggle_task_flag(alert_id: int): +def toggle_data_purge_flag(alert_id: int): with Session() as session: - task_flag = session.query(DbDynamicAlert.task_flag).where(DbDynamicAlert.id == alert_id) + data_purge_flag = session.query(DbDynamicAlert.data_purge_flag).where( + DbDynamicAlert.id == alert_id + ) # TODO: Confirm that this is right (chance there might be empty string) new_flag = "" - if task_flag == "queued": + if data_purge_flag == "queued": new_flag = "processing" - elif task_flag == "processing": + elif data_purge_flag == "processing": new_flag = "not_queued" session.update(DbDynamicAlert).where(DbDynamicAlert.id == alert_id).values( - task_flag=new_flag + data_purge_flag=new_flag ) session.commit() diff --git a/src/seer/db.py b/src/seer/db.py index 003f845e..5c2fa3af 100644 --- a/src/seer/db.py +++ b/src/seer/db.py @@ -286,7 +286,7 @@ class DbDynamicAlert(Base): cascade="all, delete", passive_deletes=True, ) - task_flag: Mapped[str] = mapped_column(String, nullable=False) + data_purge_flag: Mapped[str] = mapped_column(String, nullable=False) class DbDynamicAlertTimeSeries(Base):