Skip to content

Commit

Permalink
chore(anomaly_detection) Handle timeseries with very low variance
Browse files Browse the repository at this point in the history
  • Loading branch information
ram-senth committed Sep 21, 2024
1 parent 0104de0 commit 3a587ed
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 40 deletions.
3 changes: 3 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ coverage:
trend_detection:
paths:
- 'src/seer/trend_detection/'
anomaly_detection:
paths:
- 'src/seer/anomaly_detection/'
app:
paths:
- 'src/seer/app.py'
Expand Down
13 changes: 8 additions & 5 deletions src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from seer.dependency_injection import inject, injected
from seer.exceptions import ClientError, ServerError
from seer.tags import AnomalyDetectionModes, AnomalyDetectionTags

anomaly_detection_module.enable()
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -226,16 +227,16 @@ def detect_anomalies(self, request: DetectAnomaliesRequest) -> DetectAnomaliesRe
Anomaly detection request that has either a complete time series or an alert reference.
"""
if isinstance(request.context, AlertInSeer):
mode = "streaming.alert"
mode = AnomalyDetectionModes.STREAMING_ALERT
elif isinstance(request.context, TimeSeriesWithHistory):
mode = "streaming.ts_with_history"
mode = AnomalyDetectionModes.STREAMING_TS_WITH_HISTORY
else:
mode = "batch.ts_full"
mode = AnomalyDetectionModes.BATCH_TS_FULL

sentry_sdk.set_tag("ad_mode", mode)
sentry_sdk.set_tag(AnomalyDetectionTags.MODE, mode)

if isinstance(request.context, AlertInSeer):
sentry_sdk.set_tag("alert_id", request.context.id)
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.context.id)
ts, anomalies = self._online_detect(request.context, request.config)
elif isinstance(request.context, TimeSeriesWithHistory):
ts, anomalies = self._combo_detect(request.context, request.config)
Expand All @@ -255,6 +256,7 @@ def store_data(
request: StoreDataRequest
Alert information along with underlying time series data
"""
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id)
# Ensure we have at least 7 days of data in the time series
min_len = self._min_required_timesteps(request.config.time_period)
if len(request.timeseries) < min_len:
Expand Down Expand Up @@ -302,5 +304,6 @@ def delete_alert_data(
request: DeleteAlertDataRequest
Alert to clear
"""
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id)
alert_data_accessor.delete_alert_data(external_alert_id=request.alert.id)
return DeleteAlertDataResponse(success=True)
1 change: 0 additions & 1 deletion src/seer/anomaly_detection/detectors/anomaly_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def _compute_matrix_profile(
"""
ts_values = timeseries.values # np.array([np.float64(point.value) for point in timeseries])
window_size = ws_selector.optimal_window_size(ts_values)
logger.debug(f"window_size: {window_size}")
if window_size <= 0:
# TODO: Add sentry logging of this error
raise ServerError("Invalid window size")
Expand Down
70 changes: 49 additions & 21 deletions src/seer/anomaly_detection/detectors/mp_scorers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

import numpy as np
import numpy.typing as npt
import sentry_sdk
from pydantic import BaseModel

from seer.anomaly_detection.models import AnomalyFlags, Directions, Sensitivities
from seer.tags import AnomalyDetectionTags

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,6 +43,21 @@ def stream_score(


class MPIRQScorer(MPScorer):
def _flag_if_low_variance(
self, ts_value: float, variance: float, mean: float
) -> AnomalyFlags | None:
if variance <= 0.0001:
logger.info("Timeseries has very low variance")
sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 1)
# mean = np.mean(context)
# if current value is significantly higher or lower than the mean then mark it as high anomaly else mark it as no anomaly
if abs(ts_value) >= 2 * abs(mean):
return "anomaly_higher_confidence"
else:
return "none"
# If there is enough variance in the data then we do not need to compare to mean
sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 0)
return None

def batch_score(
self,
Expand Down Expand Up @@ -86,13 +103,17 @@ def batch_score(
# Compute score and anomaly flags
scores = []
flags = []
ts_variance = ts.var()
ts_mean = ts.mean()
for i, val in enumerate(mp_dist):
scores.append(0.0 if np.isnan(val) or np.isinf(val) else val - threshold_upper)
flag = self._to_flag(val, threshold_lower, threshold_upper)
if i > 2 * window_size:
flag = self._adjust_flag_for_vicinity(
flag=flag, ts_value=ts[i], context=ts[i - 2 * window_size : i - 1]
)
flag = self._flag_if_low_variance(ts_value=ts[i], variance=ts_variance, mean=ts_mean)
if flag is None:
flag = self._to_flag(val, threshold_lower, threshold_upper)
if i > 2 * window_size:
flag = self._adjust_flag_for_vicinity(
flag=flag, ts_value=ts[i], context=ts[i - 2 * window_size : i - 1]
)
flags.append(flag)

return scores, flags
Expand Down Expand Up @@ -130,23 +151,30 @@ def stream_score(
* "anomaly_lower_confidence" - indicating anomaly but only with a lower threshold
* "anomaly_higher_confidence" - indicating anomaly with a higher threshold
"""
# Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring.
mp_dist_baseline_finite = mp_dist_baseline[np.isfinite(mp_dist_baseline)]

# Compute the quantiles for two different threshold levels
[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.25, 0.75])
IQR_L = Q3 - Q1
threshold_lower = Q3 + (1.5 * IQR_L)
context = ts_baseline[-2 * window_size :]
flag = self._flag_if_low_variance(
ts_value=ts_value, variance=context.var(), mean=context.mean()
)
score = 0.0
if flag is None:
# Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring.
mp_dist_baseline_finite = mp_dist_baseline[np.isfinite(mp_dist_baseline)]

# Compute the quantiles for two different threshold levels
[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.25, 0.75])
IQR_L = Q3 - Q1
threshold_lower = Q3 + (1.5 * IQR_L)

[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.15, 0.85])
IQR_L = Q3 - Q1
threshold_upper = Q3 + (1.5 * IQR_L)

# Compute score and anomaly flags
score = 0.0 if np.isnan(mp_dist) or np.isinf(mp_dist) else mp_dist - threshold_upper
flag = self._to_flag(mp_dist, threshold_lower, threshold_upper)
# anomaly identified. apply logic to check for peak and trough
flag = self._adjust_flag_for_vicinity(ts_value, flag, ts_baseline[-2 * window_size :])

[Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.15, 0.85])
IQR_L = Q3 - Q1
threshold_upper = Q3 + (1.5 * IQR_L)

# Compute score and anomaly flags
score = 0.0 if np.isnan(mp_dist) or np.isinf(mp_dist) else mp_dist - threshold_upper
flag = self._to_flag(mp_dist, threshold_lower, threshold_upper)
# anomaly identified. apply logic to check for peak and trough
flag = self._adjust_flag_for_vicinity(ts_value, flag, ts_baseline[-2 * window_size :])
return [score], [flag]

def _to_flag(self, mp_dist: float, threshold_lower: float, threshold_upper: float):
Expand Down
5 changes: 5 additions & 0 deletions src/seer/anomaly_detection/detectors/normalizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ def normalize(self, array: npt.NDArray) -> npt.NDArray:
class MinMaxNormalizer(Normalizer):
def normalize(self, array: npt.NDArray) -> npt.NDArray:
"""Applies min-max normalization to input array"""
if array.var() == 0:
if array[0] == 0:
return array
else:
return np.full_like(array, 1.0)
return (array - np.min(array)) / (np.max(array) - np.min(array))
22 changes: 17 additions & 5 deletions src/seer/anomaly_detection/detectors/window_size_selectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import numpy as np
import numpy.typing as npt
import pandas as pd
import sentry_sdk
from pydantic import BaseModel, Field

from seer.anomaly_detection.detectors.normalizers import MinMaxNormalizer, Normalizer
from seer.tags import AnomalyDetectionTags


class WindowSizeSelector(BaseModel, abc.ABC):
Expand Down Expand Up @@ -84,9 +86,13 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int:
The time series as a seaquence of float values.
"""
if time_series.var() == 0:
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1)
sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 1)
return 3
time_series = self.normalizer.normalize(time_series)
ts_mean = np.mean(time_series)
ts_std = np.std(time_series)
ts_mean = time_series.mean()
ts_std = time_series.std()
ts_min_max = np.max(time_series) - np.min(time_series)

lbound = self.lbound
Expand Down Expand Up @@ -119,7 +125,9 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int:
exp += 1

if not found_window:
raise Exception("Search for optimal window failed.")
# raise Exception("Search for optimal window failed.")
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1)
return 3

lbound = max(lbound, 2 ** (exp - 1))

Expand All @@ -136,5 +144,9 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int:
ubound = window_size - 1
else:
break

return 2 * lbound
window_size = 2 * lbound
if window_size >= len(time_series):
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1)
return 3
sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 0)
return window_size
14 changes: 14 additions & 0 deletions src/seer/tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from enum import StrEnum


class AnomalyDetectionTags(StrEnum):
ALERT_ID = "alert_id"
MODE = "mode"
LOW_VARIANCE_TS = "low_variance_ts"
WINDOW_SEARCH_FAILED = "window_search_failed"


class AnomalyDetectionModes(StrEnum):
STREAMING_ALERT = "streaming.alert"
STREAMING_TS_WITH_HISTORY = "streaming.ts_with_history"
BATCH_TS_FULL = "batch.ts_full"
119 changes: 118 additions & 1 deletion tests/seer/anomaly_detection/detectors/test_anomaly_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

import numpy as np

from seer.anomaly_detection.detectors import MPConfig, MPIRQScorer, MPUtils, SuSSWindowSizeSelector
from seer.anomaly_detection.detectors.anomaly_detectors import (
MPBatchAnomalyDetector,
MPStreamAnomalyDetector,
)
from seer.anomaly_detection.detectors.mp_config import MPConfig
from seer.anomaly_detection.models import MPTimeSeriesAnomalies
from seer.anomaly_detection.models.external import AnomalyDetectionConfig
from seer.anomaly_detection.models.timeseries import TimeSeries
Expand Down Expand Up @@ -92,6 +92,7 @@ def setUp(self):
self.config = AnomalyDetectionConfig(
time_period=15, sensitivity="low", direction="up", expected_seasonality="auto"
) # TODO: Placeholder values as not used in detection yet
self.mp_config = MPConfig(ignore_trivial=False, normalize_mp=False)

@patch("stumpy.stumpi")
@patch("seer.anomaly_detection.detectors.MPScorer")
Expand Down Expand Up @@ -123,3 +124,119 @@ def test_detect(self, MockMPUtils, MockMPScorer, MockStumpi):
assert len(anomalies.matrix_profile) == 3
mock_scorer.stream_score.assert_called()
mock_stream.update.assert_called()

def _detect_anomalies(self, history_ts, stream_ts):
batch_detector = MPBatchAnomalyDetector()
history_ts_timestamps = np.arange(1.0, len(history_ts) + 1)
stream_ts_timestamps = np.array(list(range(1, len(stream_ts) + 1))) + len(history_ts)
batch_anomalies = batch_detector._compute_matrix_profile(
timeseries=TimeSeries(timestamps=history_ts_timestamps, values=np.array(history_ts)),
config=self.config,
ws_selector=SuSSWindowSizeSelector(),
mp_config=self.mp_config,
scorer=MPIRQScorer(),
mp_utils=MPUtils(),
)
stream_detector = MPStreamAnomalyDetector(
base_timestamps=np.array(history_ts_timestamps),
base_values=np.array(history_ts),
base_mp=batch_anomalies.matrix_profile,
window_size=batch_anomalies.window_size,
)
stream_anomalies = stream_detector.detect(
timeseries=TimeSeries(timestamps=stream_ts_timestamps, values=np.array(stream_ts)),
config=self.config,
scorer=MPIRQScorer(),
mp_utils=MPUtils(),
)
return batch_anomalies, stream_anomalies

def test_stream_detect_spiked_history_spiked_stream_long_ts(self):
history_ts = [0.5] * 200
history_ts[-115] = 1.0
stream_ts = [0.5, 0.5, 1.2, *[0.5] * 10]
expected_stream_flags = [
"none",
"none",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"none",
"none",
"none",
"none",
"none",
"none",
"none",
]
history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts)
print(stream_anomalies.flags)
assert history_anomalies.window_size == 90
assert stream_anomalies.flags == expected_stream_flags

def test_stream_detect_spiked_history_spiked_stream(self):
history_ts = [0.5] * 20
history_ts[-15] = 1.0 # Spiked history
stream_ts = [0.5, 0.5, 1.0, *[0.5] * 10] # Spiked stream
expected_stream_flags = [
"none",
"none",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"none",
"none",
"none",
"none",
]
history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts)
print(stream_anomalies.flags)
assert history_anomalies.window_size == 3
assert stream_anomalies.flags == expected_stream_flags

def test_stream_detect_flat_history_flat_stream(self):
history_ts = [0.5] * 200 # Flat history
stream_ts = [0.5] * 10 # Flat stream
expected_stream_flags = ["none"] * len(stream_ts)

history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts)
assert history_anomalies.window_size == 3
assert stream_anomalies.flags == expected_stream_flags

def test_stream_detect_flat_history_spiked_stream(self):
history_ts = [0.5] * 200 # Flat history
stream_ts = [0.5, 0.5, 1.0, *[0.5] * 10] # Spiked stream
expected_stream_flags = [
"none",
"none",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"anomaly_higher_confidence",
"none",
"none",
"none",
"none",
]

history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts)
assert history_anomalies.window_size == 3
assert stream_anomalies.flags == expected_stream_flags

def test_stream_detect_spliked_history_flat_stream(self):
history_ts = [0.5] * 200
history_ts[-15] = 1.0 # Spiked history
stream_ts = [0.5] * 10 # Flat stream
expected_stream_flags = ["none"] * 10

history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts)
assert history_anomalies.window_size == 132
assert stream_anomalies.flags == expected_stream_flags
Loading

0 comments on commit 3a587ed

Please sign in to comment.