Skip to content

Commit

Permalink
Anomaly detection - peak/trough detection and caching of matrix profle (
Browse files Browse the repository at this point in the history
#1081)

This PR has multiple changes:
- Trough and peak detection using interquartile range
- Caching of matrix profile
- Changes to the detect end point to support historic time steps
separated from current time steps of a time series

---------

Co-authored-by: Zachary Collins <zachary.collins@sentry.io>
  • Loading branch information
ram-senth and corps authored Sep 3, 2024
1 parent 4527b48 commit 7a3ee44
Show file tree
Hide file tree
Showing 16 changed files with 643 additions and 122 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ shell: .env # Opens a bash shell in the context of the project
.PHONY: update
update: .env # Updates the project's docker compose image.
docker compose build
docker compose run app flask db history
docker compose run app flask db upgrade

.PHONY: db_downgrade
db_downgrade: .env # Downgrades the db by one upgrade script each time it is run.
docker compose run app flask db downgrade

.PHONY: db_reset
db_reset: .env
docker compose down --volumes

.PHONY: dev
dev: .env # Starts the webserver based on the current src on port 9091
docker compose up --build
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,5 @@ google-cloud-aiplatform==1.60.0
anthropic[vertex]==0.31.2
langfuse @ git+https://github.com/jennmueng/langfuse-python.git@9d9350de1e4e84fa548fe84f82c1b826be17956e
watchdog
stumpy==1.12.0
stumpy==1.13.0
pytest_alembic==0.11.1
42 changes: 42 additions & 0 deletions src/migrations/versions/00a7fb4f4911_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Migration
Revision ID: 00a7fb4f4911
Revises: 96e56e375579
Create Date: 2024-08-27 21:59:23.740336
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "00a7fb4f4911"
down_revision = "96e56e375579"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("dynamic_alert_time_series", schema=None) as batch_op:
batch_op.add_column(sa.Column("anomaly_type", sa.String(), nullable=False))
batch_op.add_column(sa.Column("anomaly_score", sa.Float(), nullable=False))
batch_op.add_column(sa.Column("anomaly_algo_data", sa.JSON(), nullable=True))

with op.batch_alter_table("dynamic_alerts", schema=None) as batch_op:
batch_op.add_column(sa.Column("anomaly_algo_data", sa.JSON(), nullable=False))

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("dynamic_alerts", schema=None) as batch_op:
batch_op.drop_column("anomaly_algo_data")

with op.batch_alter_table("dynamic_alert_time_series", schema=None) as batch_op:
batch_op.drop_column("anomaly_algo_data")
batch_op.drop_column("anomaly_score")
batch_op.drop_column("anomaly_type")

# ### end Alembic commands ###
21 changes: 21 additions & 0 deletions src/migrations/versions/141e9df277be_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""empty message
Revision ID: 141e9df277be
Revises: 00a7fb4f4911
Create Date: 2024-09-03 18:26:51.007139
"""

# revision identifiers, used by Alembic.
revision = "141e9df277be"
down_revision = "00a7fb4f4911"
branch_labels = None
depends_on = None


def upgrade():
pass


def downgrade():
pass
21 changes: 21 additions & 0 deletions src/migrations/versions/aeeaf482e659_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""empty message
Revision ID: aeeaf482e659
Revises: 141e9df277be, 2e22d272d3b8
Create Date: 2024-09-03 19:07:16.527261
"""

# revision identifiers, used by Alembic.
revision = "aeeaf482e659"
down_revision = ("141e9df277be", "2e22d272d3b8")
branch_labels = None
depends_on = None


def upgrade():
pass


def downgrade():
pass
105 changes: 83 additions & 22 deletions src/seer/anomaly_detection/accessors.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import abc
import datetime
import logging
from typing import List
from typing import List, Optional

import numpy as np
import sentry_sdk
import stumpy # type: ignore # mypy throws "missing library stubs"
from pydantic import BaseModel
from sqlalchemy import delete

from seer.anomaly_detection.models import DynamicAlert, TimeSeries
from seer.anomaly_detection.models import (
DynamicAlert,
MPTimeSeries,
MPTimeSeriesAnomalies,
TimeSeriesAnomalies,
)
from seer.anomaly_detection.models.external import AnomalyDetectionConfig, TimeSeriesPoint
from seer.db import DbDynamicAlert, DbDynamicAlertTimeSeries, Session

Expand All @@ -27,15 +34,67 @@ def save_alert(
external_alert_id: int,
config: AnomalyDetectionConfig,
timeseries: List[TimeSeriesPoint],
anomalies: TimeSeriesAnomalies,
anomaly_algo_data: dict,
):
return NotImplemented

@abc.abstractmethod
def save_timepoint(self, external_alert_id: int, timepoint: TimeSeriesPoint):
def save_timepoint(
self,
external_alert_id: int,
timepoint: TimeSeriesPoint,
anomaly: TimeSeriesAnomalies,
anomaly_algo_data: Optional[dict],
):
return NotImplemented


class DbAlertDataAccessor(AlertDataAccessor):

@sentry_sdk.trace
def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
window_size = db_alert.anomaly_algo_data.get("window_size")
flags = []
scores = []
mp = []
ts = []
values = []
for point in db_alert.timeseries:
ts.append(point.timestamp.timestamp)
values.append(point.value)
flags.append(point.anomaly_type)
scores.append(point.anomaly_score)
if point.anomaly_algo_data is not None:
dist, idx, l_idx, r_idx = MPTimeSeriesAnomalies.extract_algo_data(
point.anomaly_algo_data
)
mp.append([dist, idx, l_idx, r_idx])

anomalies = MPTimeSeriesAnomalies(
flags=flags,
scores=scores,
matrix_profile=stumpy.mparray.mparray(
mp,
k=1,
m=window_size,
excl_zone_denom=stumpy.config.STUMPY_EXCL_ZONE_DENOM,
),
window_size=window_size,
)
return DynamicAlert(
organization_id=db_alert.organization_id,
project_id=db_alert.project_id,
external_alert_id=db_alert.external_alert_id,
config=AnomalyDetectionConfig.model_validate(db_alert.config),
timeseries=MPTimeSeries(
timestamps=np.array(ts),
values=np.array(values),
),
anomalies=anomalies,
)

@sentry_sdk.trace
def query(self, external_alert_id: int) -> DynamicAlert | None:
with Session() as session:
alert_info = (
Expand All @@ -51,30 +110,18 @@ def query(self, external_alert_id: int) -> DynamicAlert | None:
},
)
return None
return self._hydrate_alert(alert_info)

timeseries: TimeSeries = TimeSeries(
timestamps=np.empty([len(alert_info.timeseries)]),
values=np.empty([len(alert_info.timeseries)]),
)
for i, point in enumerate(alert_info.timeseries):
np.put(timeseries.timestamps, i, point.timestamp.timestamp())
np.put(timeseries.values, i, point.value)

return DynamicAlert(
organization_id=alert_info.organization_id,
project_id=alert_info.project_id,
external_alert_id=external_alert_id,
config=AnomalyDetectionConfig.model_validate(alert_info.config),
timeseries=timeseries,
)

@sentry_sdk.trace
def save_alert(
self,
organization_id: int,
project_id: int,
external_alert_id: int,
config: AnomalyDetectionConfig,
timeseries: List[TimeSeriesPoint],
anomalies: TimeSeriesAnomalies,
anomaly_algo_data: dict,
):
with Session() as session:
existing_records = (
Expand All @@ -96,7 +143,7 @@ def save_alert(
DbDynamicAlert.external_alert_id == external_alert_id
)
session.execute(delete_q)

algo_data = anomalies.get_anomaly_algo_data(len(timeseries))
new_record = DbDynamicAlert(
organization_id=organization_id,
project_id=project_id,
Expand All @@ -106,14 +153,25 @@ def save_alert(
DbDynamicAlertTimeSeries(
timestamp=datetime.datetime.fromtimestamp(point.timestamp),
value=point.value,
anomaly_type=anomalies.flags[i],
anomaly_score=anomalies.scores[i],
anomaly_algo_data=algo_data[i],
)
for point in timeseries
for i, point in enumerate(timeseries)
],
anomaly_algo_data=anomaly_algo_data,
)
session.add(new_record)
session.commit()

def save_timepoint(self, external_alert_id: int, timepoint: TimeSeriesPoint):
@sentry_sdk.trace
def save_timepoint(
self,
external_alert_id: int,
timepoint: TimeSeriesPoint,
anomaly: TimeSeriesAnomalies,
anomaly_algo_data: Optional[dict],
):
with Session() as session:
existing = (
session.query(DbDynamicAlert)
Expand All @@ -127,6 +185,9 @@ def save_timepoint(self, external_alert_id: int, timepoint: TimeSeriesPoint):
dynamic_alert_id=existing.id,
timestamp=datetime.datetime.fromtimestamp(timepoint.timestamp),
value=timepoint.value,
anomaly_type=anomaly.flags[0],
anomaly_score=anomaly.scores[0],
anomaly_algo_data=anomaly_algo_data,
)
session.add(new_record)
session.commit()
Loading

0 comments on commit 7a3ee44

Please sign in to comment.