Skip to content

Commit

Permalink
chore(anomaly_detection) Handle timeseries with very low variance
Browse files Browse the repository at this point in the history
  • Loading branch information
ram-senth committed Sep 21, 2024
1 parent 0104de0 commit e4374b5
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 40 deletions.
3 changes: 3 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ coverage:
trend_detection:
paths:
- 'src/seer/trend_detection/'
anomaly_detection:
paths:
- 'src/seer/anomaly_detection/'
app:
paths:
- 'src/seer/app.py'
Expand Down
13 changes: 8 additions & 5 deletions src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from seer.dependency_injection import inject, injected
from seer.exceptions import ClientError, ServerError
from seer.tags import AnomalyDetectionModes, AnomalyDetectionTags

anomaly_detection_module.enable()
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -226,16 +227,16 @@ def detect_anomalies(self, request: DetectAnomaliesRequest) -> DetectAnomaliesRe
Anomaly detection request that has either a complete time series or an alert reference.
"""
if isinstance(request.context, AlertInSeer):
mode = "streaming.alert"
mode = AnomalyDetectionModes.STREAMING_ALERT
elif isinstance(request.context, TimeSeriesWithHistory):
mode = "streaming.ts_with_history"
mode = AnomalyDetectionModes.STREAMING_TS_WITH_HISTORY
else:
mode = "batch.ts_full"
mode = AnomalyDetectionModes.BATCH_TS_FULL

sentry_sdk.set_tag("ad_mode", mode)
sentry_sdk.set_tag(AnomalyDetectionTags.MODE, mode)

if isinstance(request.context, AlertInSeer):
sentry_sdk.set_tag("alert_id", request.context.id)
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.context.id)
ts, anomalies = self._online_detect(request.context, request.config)
elif isinstance(request.context, TimeSeriesWithHistory):
ts, anomalies = self._combo_detect(request.context, request.config)
Expand All @@ -255,6 +256,7 @@ def store_data(
request: StoreDataRequest
Alert information along with underlying time series data
"""
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id)
# Ensure we have at least 7 days of data in the time series
min_len = self._min_required_timesteps(request.config.time_period)
if len(request.timeseries) < min_len:
Expand Down Expand Up @@ -302,5 +304,6 @@ def delete_alert_data(
request: DeleteAlertDataRequest
Alert to clear
"""
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id)
alert_data_accessor.delete_alert_data(external_alert_id=request.alert.id)
return DeleteAlertDataResponse(success=True)
1 change: 0 additions & 1 deletion src/seer/anomaly_detection/detectors/anomaly_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def _compute_matrix_profile(
"""
ts_values = timeseries.values # np.array([np.float64(point.value) for point in timeseries])
window_size = ws_selector.optimal_window_size(ts_values)
logger.debug(f"window_size: {window_size}")
if window_size <= 0:
# TODO: Add sentry logging of this error
raise ServerError("Invalid window size")
Expand Down
69 changes: 48 additions & 21 deletions src/seer/anomaly_detection/detectors/mp_scorers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

import numpy as np
import numpy.typing as npt
import sentry_sdk
from pydantic import BaseModel

from seer.anomaly_detection.models import AnomalyFlags, Directions, Sensitivities
from seer.tags import AnomalyDetectionTags

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,6 +43,20 @@ def stream_score(


class MPIRQScorer(MPScorer):
def _flag_if_low_variance(
self, ts_value: float, variance: float, mean: float
) -> AnomalyFlags | None:
if variance <= 0.0001:
logger.info("Timeseries has very low variance")
sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 1)
# if current value is significantly higher or lower than the mean then mark it as high anomaly else mark it as no anomaly
if abs(ts_value) >= 2 * abs(mean):
return "anomaly_higher_confidence"
else:
return "none"
# If there is enough variance in the data then we do not need to compare to mean
sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 0)
return None

def batch_score(
self,
Expand Down Expand Up @@ -86,13 +102,17 @@ def batch_score(
# Compute score and anomaly flags
scores = []
flags = []
ts_variance = ts.var()
ts_mean = ts.mean()
for i, val in enumerate(mp_dist):
scores.append(0.0 if np.isnan(val) or np.isinf(val) else val - threshold_upper)
flag = self._to_flag(val, threshold_lower, threshold_upper)
if i > 2 * window_size:
flag = self._adjust_flag_for_vicinity(
flag=flag, ts_value=ts[i], context=ts[i - 2 * window_size : i - 1]
)
flag = self._flag_if_low_variance(ts_value=ts[i], variance=ts_variance, mean=ts_mean)
if flag is None:
flag = self._to_flag(val, threshold_lower, threshold_upper)
if i > 2 * window_size:
flag = self._adjust_flag_for_vicinity(
flag=flag, ts_value=ts[i], context=ts[i - 2 * window_size : i - 1]
)
flags.append(flag)

return scores, flags
Expand Down Expand Up @@ -130,23 +150,30 @@ def stream_score(
* "anomaly_lower_confidence" - indicating anomaly but only with a lower threshold
* "anomaly_higher_confidence" - indicating anomaly with a higher threshold
"""
# Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring.
mp_dist_baseline_finite = mp_dist_baseline[np.isfinite(mp_dist_baseline)]

# Compute the quantiles for two different threshold levels
[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.25, 0.75])
IQR_L = Q3 - Q1
threshold_lower = Q3 + (1.5 * IQR_L)
context = ts_baseline[-2 * window_size :]
flag = self._flag_if_low_variance(
ts_value=ts_value, variance=context.var(), mean=context.mean()
)
score = 0.0
if flag is None:
# Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring.
mp_dist_baseline_finite = mp_dist_baseline[np.isfinite(mp_dist_baseline)]

# Compute the quantiles for two different threshold levels
[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.25, 0.75])
IQR_L = Q3 - Q1
threshold_lower = Q3 + (1.5 * IQR_L)

[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.15, 0.85])
IQR_L = Q3 - Q1
threshold_upper = Q3 + (1.5 * IQR_L)

# Compute score and anomaly flags
score = 0.0 if np.isnan(mp_dist) or np.isinf(mp_dist) else mp_dist - threshold_upper
flag = self._to_flag(mp_dist, threshold_lower, threshold_upper)
# anomaly identified. apply logic to check for peak and trough
flag = self._adjust_flag_for_vicinity(ts_value, flag, ts_baseline[-2 * window_size :])

[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.15, 0.85])
IQR_L = Q3 - Q1
threshold_upper = Q3 + (1.5 * IQR_L)

# Compute score and anomaly flags
score = 0.0 if np.isnan(mp_dist) or np.isinf(mp_dist) else mp_dist - threshold_upper
flag = self._to_flag(mp_dist, threshold_lower, threshold_upper)
# anomaly identified. apply logic to check for peak and trough
flag = self._adjust_flag_for_vicinity(ts_value, flag, ts_baseline[-2 * window_size :])
return [score], [flag]

def _to_flag(self, mp_dist: float, threshold_lower: float, threshold_upper: float):
Expand Down
5 changes: 5 additions & 0 deletions src/seer/anomaly_detection/detectors/normalizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ def normalize(self, array: npt.NDArray) -> npt.NDArray:
class MinMaxNormalizer(Normalizer):
def normalize(self, array: npt.NDArray) -> npt.NDArray:
"""Applies min-max normalization to input array"""
if array.var() == 0:
if array[0] == 0:
return array
else:
return np.full_like(array, 1.0)
return (array - np.min(array)) / (np.max(array) - np.min(array))
22 changes: 17 additions & 5 deletions src/seer/anomaly_detection/detectors/window_size_selectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import numpy as np
import numpy.typing as npt
import pandas as pd
import sentry_sdk
from pydantic import BaseModel, Field

from seer.anomaly_detection.detectors.normalizers import MinMaxNormalizer, Normalizer
from seer.tags import AnomalyDetectionTags


class WindowSizeSelector(BaseModel, abc.ABC):
Expand Down Expand Up @@ -84,9 +86,13 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int:
The time series as a seaquence of float values.
"""
if time_series.var() == 0:
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1)
sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 1)
return 3
time_series = self.normalizer.normalize(time_series)
ts_mean = np.mean(time_series)
ts_std = np.std(time_series)
ts_mean = time_series.mean()
ts_std = time_series.std()
ts_min_max = np.max(time_series) - np.min(time_series)

lbound = self.lbound
Expand Down Expand Up @@ -119,7 +125,9 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int:
exp += 1

if not found_window:
raise Exception("Search for optimal window failed.")
# raise Exception("Search for optimal window failed.")
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1)
return 3

lbound = max(lbound, 2 ** (exp - 1))

Expand All @@ -136,5 +144,9 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int:
ubound = window_size - 1
else:
break

return 2 * lbound
window_size = 2 * lbound
if window_size >= len(time_series):
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1)
return 3
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 0)
return window_size
14 changes: 14 additions & 0 deletions src/seer/tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from enum import StrEnum


class AnomalyDetectionTags(StrEnum):
ALERT_ID = "alert_id"
MODE = "mode"
LOW_VARIANCE_TS = "low_variance_ts"
WINDOW_SEARCH_FAILED = "window_search_failed"


class AnomalyDetectionModes(StrEnum):
STREAMING_ALERT = "streaming.alert"
STREAMING_TS_WITH_HISTORY = "streaming.ts_with_history"
BATCH_TS_FULL = "batch.ts_full"
Loading

0 comments on commit e4374b5

Please sign in to comment.