Skip to content

Commit

Permalink
chore(anomaly_detection) Handle timeseries with very low variance
Browse files Browse the repository at this point in the history
  • Loading branch information
ram-senth committed Sep 23, 2024
1 parent 0104de0 commit 447baeb
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 74 deletions.
3 changes: 3 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ coverage:
trend_detection:
paths:
- 'src/seer/trend_detection/'
anomaly_detection:
paths:
- 'src/seer/anomaly_detection/'
app:
paths:
- 'src/seer/app.py'
Expand Down
13 changes: 8 additions & 5 deletions src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from seer.dependency_injection import inject, injected
from seer.exceptions import ClientError, ServerError
from seer.tags import AnomalyDetectionModes, AnomalyDetectionTags

anomaly_detection_module.enable()
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -226,16 +227,16 @@ def detect_anomalies(self, request: DetectAnomaliesRequest) -> DetectAnomaliesRe
Anomaly detection request that has either a complete time series or an alert reference.
"""
if isinstance(request.context, AlertInSeer):
mode = "streaming.alert"
mode = AnomalyDetectionModes.STREAMING_ALERT
elif isinstance(request.context, TimeSeriesWithHistory):
mode = "streaming.ts_with_history"
mode = AnomalyDetectionModes.STREAMING_TS_WITH_HISTORY
else:
mode = "batch.ts_full"
mode = AnomalyDetectionModes.BATCH_TS_FULL

sentry_sdk.set_tag("ad_mode", mode)
sentry_sdk.set_tag(AnomalyDetectionTags.MODE, mode)

if isinstance(request.context, AlertInSeer):
sentry_sdk.set_tag("alert_id", request.context.id)
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.context.id)
ts, anomalies = self._online_detect(request.context, request.config)
elif isinstance(request.context, TimeSeriesWithHistory):
ts, anomalies = self._combo_detect(request.context, request.config)
Expand All @@ -255,6 +256,7 @@ def store_data(
request: StoreDataRequest
Alert information along with underlying time series data
"""
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id)
# Ensure we have at least 7 days of data in the time series
min_len = self._min_required_timesteps(request.config.time_period)
if len(request.timeseries) < min_len:
Expand Down Expand Up @@ -302,5 +304,6 @@ def delete_alert_data(
request: DeleteAlertDataRequest
Alert to clear
"""
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id)
alert_data_accessor.delete_alert_data(external_alert_id=request.alert.id)
return DeleteAlertDataResponse(success=True)
1 change: 0 additions & 1 deletion src/seer/anomaly_detection/detectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
)

AnomalyDetector = anomaly_detectors.AnomalyDetector
DummyAnomalyDetector = anomaly_detectors.DummyAnomalyDetector
MPConfig = mp_config.MPConfig
MPBatchAnomalyDetector = anomaly_detectors.MPBatchAnomalyDetector
MPStreamAnomalyDetector = anomaly_detectors.MPStreamAnomalyDetector
Expand Down
16 changes: 0 additions & 16 deletions src/seer/anomaly_detection/detectors/anomaly_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def _compute_matrix_profile(
"""
ts_values = timeseries.values # np.array([np.float64(point.value) for point in timeseries])
window_size = ws_selector.optimal_window_size(ts_values)
logger.debug(f"window_size: {window_size}")
if window_size <= 0:
# TODO: Add sentry logging of this error
raise ServerError("Invalid window size")
Expand Down Expand Up @@ -189,18 +188,3 @@ def detect(
),
window_size=self.window_size,
)


class DummyAnomalyDetector(AnomalyDetector):
"""
Dummy anomaly detector used during dev work
"""

def detect(self, timeseries: TimeSeries, config: AnomalyDetectionConfig) -> TimeSeriesAnomalies:
anomalies = MPTimeSeriesAnomalies(
flags=np.array(["none"] * len(timeseries.values)),
scores=np.array([np.float64(0.5)] * len(timeseries.values)),
matrix_profile=np.array([]),
window_size=0,
)
return anomalies
110 changes: 71 additions & 39 deletions src/seer/anomaly_detection/detectors/mp_scorers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import abc
import logging
from typing import Tuple

import numpy as np
import numpy.typing as npt
from pydantic import BaseModel
import sentry_sdk
from pydantic import BaseModel, Field

from seer.anomaly_detection.models import AnomalyFlags, Directions, Sensitivities
from seer.tags import AnomalyDetectionTags

logger = logging.getLogger(__name__)

Expand All @@ -15,6 +18,10 @@ class MPScorer(BaseModel, abc.ABC):
Abstract base class for calculating an anomaly score
"""

variance_threshold: float = Field(
0.0001, description="Minimum variance required in order to use IRQ based scoring"
)

@abc.abstractmethod
def batch_score(
self,
Expand All @@ -41,6 +48,12 @@ def stream_score(


class MPIRQScorer(MPScorer):
def _flag_if_low_variance(self, ts_value: float, mean: float) -> Tuple[AnomalyFlags, float]:
# if current value is significantly higher or lower than the mean then mark it as high anomaly else mark it as no anomaly
if abs(ts_value) >= 2 * abs(mean):
return "anomaly_higher_confidence", 0.9
else:
return "none", 0.0

def batch_score(
self,
Expand Down Expand Up @@ -71,29 +84,42 @@ def batch_score(
* "anomaly_lower_confidence" - indicating anomaly but only with a lower threshold
* "anomaly_higher_confidence" - indicating anomaly with a higher threshold
"""
# Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring.
mp_dist_baseline_finite = mp_dist[np.isfinite(mp_dist)]

# Compute the quantiles for two different threshold levels
[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.25, 0.75])
IQR_L = Q3 - Q1
threshold_lower = Q3 + (1.5 * IQR_L)

[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.15, 0.85])
IQR_L = Q3 - Q1
threshold_upper = Q3 + (1.5 * IQR_L)

# Compute score and anomaly flags
ts_variance = ts.var()
ts_mean = ts.mean()
scores = []
flags = []
for i, val in enumerate(mp_dist):
scores.append(0.0 if np.isnan(val) or np.isinf(val) else val - threshold_upper)
flag = self._to_flag(val, threshold_lower, threshold_upper)
if i > 2 * window_size:
flag = self._adjust_flag_for_vicinity(
flag=flag, ts_value=ts[i], context=ts[i - 2 * window_size : i - 1]
)
flags.append(flag)

if ts_variance <= self.variance_threshold:
sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 1)
for val in ts:
flag, score = self._flag_if_low_variance(ts_value=val, mean=ts_mean)
flags.append(flag)
scores.append(score)
else:
sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 0)
# Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring.
mp_dist_baseline_finite = mp_dist[np.isfinite(mp_dist)]

# Compute the quantiles for two different threshold levels
[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.25, 0.75])
IQR_L = Q3 - Q1
threshold_lower = Q3 + (1.5 * IQR_L)

[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.15, 0.85])
IQR_L = Q3 - Q1
threshold_upper = Q3 + (1.5 * IQR_L)

# Compute score and anomaly flags
scores = []
flags = []
for i, val in enumerate(mp_dist):
scores.append(0.0 if np.isnan(val) or np.isinf(val) else val - threshold_upper)
flag = self._to_flag(val, threshold_lower, threshold_upper)
if i > 2 * window_size:
flag = self._adjust_flag_for_vicinity(
flag=flag, ts_value=ts[i], context=ts[i - 2 * window_size : i - 1]
)
flags.append(flag)

return scores, flags

Expand Down Expand Up @@ -130,23 +156,29 @@ def stream_score(
* "anomaly_lower_confidence" - indicating anomaly but only with a lower threshold
* "anomaly_higher_confidence" - indicating anomaly with a higher threshold
"""
# Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring.
mp_dist_baseline_finite = mp_dist_baseline[np.isfinite(mp_dist_baseline)]

# Compute the quantiles for two different threshold levels
[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.25, 0.75])
IQR_L = Q3 - Q1
threshold_lower = Q3 + (1.5 * IQR_L)

[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.15, 0.85])
IQR_L = Q3 - Q1
threshold_upper = Q3 + (1.5 * IQR_L)

# Compute score and anomaly flags
score = 0.0 if np.isnan(mp_dist) or np.isinf(mp_dist) else mp_dist - threshold_upper
flag = self._to_flag(mp_dist, threshold_lower, threshold_upper)
# anomaly identified. apply logic to check for peak and trough
flag = self._adjust_flag_for_vicinity(ts_value, flag, ts_baseline[-2 * window_size :])
context = ts_baseline[-2 * window_size :]
context_var = context.var()
if context_var <= self.variance_threshold:
flag, score = self._flag_if_low_variance(ts_value=ts_value, mean=context.mean())
else:
# Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring.
mp_dist_baseline_finite = mp_dist_baseline[np.isfinite(mp_dist_baseline)]

# Compute the quantiles for two different threshold levels
[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.25, 0.75])
IQR_L = Q3 - Q1
threshold_lower = Q3 + (1.5 * IQR_L)

[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.15, 0.85])
IQR_L = Q3 - Q1
threshold_upper = Q3 + (1.5 * IQR_L)

# Compute score and anomaly flags
score = 0.0 if np.isnan(mp_dist) or np.isinf(mp_dist) else mp_dist - threshold_upper
flag = self._to_flag(mp_dist, threshold_lower, threshold_upper)
# anomaly identified. apply logic to check for peak and trough
flag = self._adjust_flag_for_vicinity(ts_value, flag, ts_baseline[-2 * window_size :])

return [score], [flag]

def _to_flag(self, mp_dist: float, threshold_lower: float, threshold_upper: float):
Expand Down
5 changes: 5 additions & 0 deletions src/seer/anomaly_detection/detectors/normalizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ def normalize(self, array: npt.NDArray) -> npt.NDArray:
class MinMaxNormalizer(Normalizer):
def normalize(self, array: npt.NDArray) -> npt.NDArray:
"""Applies min-max normalization to input array"""
if array.var() == 0:
if array[0] == 0:
return array
else:
return np.full_like(array, 1.0)
return (array - np.min(array)) / (np.max(array) - np.min(array))
22 changes: 17 additions & 5 deletions src/seer/anomaly_detection/detectors/window_size_selectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import numpy as np
import numpy.typing as npt
import pandas as pd
import sentry_sdk
from pydantic import BaseModel, Field

from seer.anomaly_detection.detectors.normalizers import MinMaxNormalizer, Normalizer
from seer.tags import AnomalyDetectionTags


class WindowSizeSelector(BaseModel, abc.ABC):
Expand Down Expand Up @@ -84,9 +86,13 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int:
The time series as a seaquence of float values.
"""
if time_series.var() == 0:
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1)
sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 1)
return 3
time_series = self.normalizer.normalize(time_series)
ts_mean = np.mean(time_series)
ts_std = np.std(time_series)
ts_mean = time_series.mean()
ts_std = time_series.std()
ts_min_max = np.max(time_series) - np.min(time_series)

lbound = self.lbound
Expand Down Expand Up @@ -119,7 +125,9 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int:
exp += 1

if not found_window:
raise Exception("Search for optimal window failed.")
# raise Exception("Search for optimal window failed.")
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1)
return 3

lbound = max(lbound, 2 ** (exp - 1))

Expand All @@ -136,5 +144,9 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int:
ubound = window_size - 1
else:
break

return 2 * lbound
window_size = 2 * lbound
if window_size >= len(time_series):
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1)
return 3
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 0)
return window_size
14 changes: 14 additions & 0 deletions src/seer/tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from enum import StrEnum


class AnomalyDetectionTags(StrEnum):
ALERT_ID = "alert_id"
MODE = "mode"
LOW_VARIANCE_TS = "low_variance_ts"
WINDOW_SEARCH_FAILED = "window_search_failed"


class AnomalyDetectionModes(StrEnum):
STREAMING_ALERT = "streaming.alert"
STREAMING_TS_WITH_HISTORY = "streaming.ts_with_history"
BATCH_TS_FULL = "batch.ts_full"
Loading

0 comments on commit 447baeb

Please sign in to comment.