diff --git a/codecov.yml b/codecov.yml index b5d971b2d..6bcc46250 100644 --- a/codecov.yml +++ b/codecov.yml @@ -16,6 +16,9 @@ coverage: trend_detection: paths: - 'src/seer/trend_detection/' + anomaly_detection: + paths: + - 'src/seer/anomaly_detection/' app: paths: - 'src/seer/app.py' diff --git a/src/seer/anomaly_detection/anomaly_detection.py b/src/seer/anomaly_detection/anomaly_detection.py index 670c5dcc1..86f85eeba 100644 --- a/src/seer/anomaly_detection/anomaly_detection.py +++ b/src/seer/anomaly_detection/anomaly_detection.py @@ -26,6 +26,7 @@ ) from seer.dependency_injection import inject, injected from seer.exceptions import ClientError, ServerError +from seer.tags import AnomalyDetectionModes, AnomalyDetectionTags anomaly_detection_module.enable() logger = logging.getLogger(__name__) @@ -226,16 +227,16 @@ def detect_anomalies(self, request: DetectAnomaliesRequest) -> DetectAnomaliesRe Anomaly detection request that has either a complete time series or an alert reference. """ if isinstance(request.context, AlertInSeer): - mode = "streaming.alert" + mode = AnomalyDetectionModes.STREAMING_ALERT elif isinstance(request.context, TimeSeriesWithHistory): - mode = "streaming.ts_with_history" + mode = AnomalyDetectionModes.STREAMING_TS_WITH_HISTORY else: - mode = "batch.ts_full" + mode = AnomalyDetectionModes.BATCH_TS_FULL - sentry_sdk.set_tag("ad_mode", mode) + sentry_sdk.set_tag(AnomalyDetectionTags.MODE, mode) if isinstance(request.context, AlertInSeer): - sentry_sdk.set_tag("alert_id", request.context.id) + sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.context.id) ts, anomalies = self._online_detect(request.context, request.config) elif isinstance(request.context, TimeSeriesWithHistory): ts, anomalies = self._combo_detect(request.context, request.config) @@ -255,6 +256,7 @@ def store_data( request: StoreDataRequest Alert information along with underlying time series data """ + sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id) # Ensure we have at least 7 days of data in the time series min_len = self._min_required_timesteps(request.config.time_period) if len(request.timeseries) < min_len: @@ -302,5 +304,6 @@ def delete_alert_data( request: DeleteAlertDataRequest Alert to clear """ + sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id) alert_data_accessor.delete_alert_data(external_alert_id=request.alert.id) return DeleteAlertDataResponse(success=True) diff --git a/src/seer/anomaly_detection/detectors/anomaly_detectors.py b/src/seer/anomaly_detection/detectors/anomaly_detectors.py index b3e8af558..5453bf6c2 100644 --- a/src/seer/anomaly_detection/detectors/anomaly_detectors.py +++ b/src/seer/anomaly_detection/detectors/anomaly_detectors.py @@ -83,7 +83,6 @@ def _compute_matrix_profile( """ ts_values = timeseries.values # np.array([np.float64(point.value) for point in timeseries]) window_size = ws_selector.optimal_window_size(ts_values) - logger.debug(f"window_size: {window_size}") if window_size <= 0: # TODO: Add sentry logging of this error raise ServerError("Invalid window size") diff --git a/src/seer/anomaly_detection/detectors/mp_scorers.py b/src/seer/anomaly_detection/detectors/mp_scorers.py index 660bcbcd9..b8445daf3 100644 --- a/src/seer/anomaly_detection/detectors/mp_scorers.py +++ b/src/seer/anomaly_detection/detectors/mp_scorers.py @@ -3,9 +3,11 @@ import numpy as np import numpy.typing as npt +import sentry_sdk from pydantic import BaseModel from seer.anomaly_detection.models import AnomalyFlags, Directions, Sensitivities +from seer.tags import AnomalyDetectionTags logger = logging.getLogger(__name__) @@ -41,6 +43,21 @@ def stream_score( class MPIRQScorer(MPScorer): + def _flag_if_low_variance( + self, ts_value: float, variance: float, mean: float + ) -> AnomalyFlags | None: + if variance <= 0.0001: + logger.info("Timeseries has very low variance") + sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 1) + # mean = np.mean(context) + # if current value is significantly higher or lower than the mean then mark it as high anomaly else mark it as no anomaly + if abs(ts_value) >= 2 * abs(mean): + return "anomaly_higher_confidence" + else: + return "none" + # If there is enough variance in the data then we do not need to compare to mean + sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 0) + return None def batch_score( self, @@ -86,13 +103,17 @@ def batch_score( # Compute score and anomaly flags scores = [] flags = [] + ts_variance = ts.var() + ts_mean = ts.mean() for i, val in enumerate(mp_dist): scores.append(0.0 if np.isnan(val) or np.isinf(val) else val - threshold_upper) - flag = self._to_flag(val, threshold_lower, threshold_upper) - if i > 2 * window_size: - flag = self._adjust_flag_for_vicinity( - flag=flag, ts_value=ts[i], context=ts[i - 2 * window_size : i - 1] - ) + flag = self._flag_if_low_variance(ts_value=ts[i], variance=ts_variance, mean=ts_mean) + if flag is None: + flag = self._to_flag(val, threshold_lower, threshold_upper) + if i > 2 * window_size: + flag = self._adjust_flag_for_vicinity( + flag=flag, ts_value=ts[i], context=ts[i - 2 * window_size : i - 1] + ) flags.append(flag) return scores, flags @@ -130,23 +151,30 @@ def stream_score( * "anomaly_lower_confidence" - indicating anomaly but only with a lower threshold * "anomaly_higher_confidence" - indicating anomaly with a higher threshold """ - # Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring. - mp_dist_baseline_finite = mp_dist_baseline[np.isfinite(mp_dist_baseline)] - - # Compute the quantiles for two different threshold levels - [Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.25, 0.75]) - IQR_L = Q3 - Q1 - threshold_lower = Q3 + (1.5 * IQR_L) + context = ts_baseline[-2 * window_size :] + flag = self._flag_if_low_variance( + ts_value=ts_value, variance=context.var(), mean=context.mean() + ) + score = 0.0 + if flag is None: + # Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring. + mp_dist_baseline_finite = mp_dist_baseline[np.isfinite(mp_dist_baseline)] + + # Compute the quantiles for two different threshold levels + [Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.25, 0.75]) + IQR_L = Q3 - Q1 + threshold_lower = Q3 + (1.5 * IQR_L) + + [Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.15, 0.85]) + IQR_L = Q3 - Q1 + threshold_upper = Q3 + (1.5 * IQR_L) + + # Compute score and anomaly flags + score = 0.0 if np.isnan(mp_dist) or np.isinf(mp_dist) else mp_dist - threshold_upper + flag = self._to_flag(mp_dist, threshold_lower, threshold_upper) + # anomaly identified. apply logic to check for peak and trough + flag = self._adjust_flag_for_vicinity(ts_value, flag, ts_baseline[-2 * window_size :]) - [Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.15, 0.85]) - IQR_L = Q3 - Q1 - threshold_upper = Q3 + (1.5 * IQR_L) - - # Compute score and anomaly flags - score = 0.0 if np.isnan(mp_dist) or np.isinf(mp_dist) else mp_dist - threshold_upper - flag = self._to_flag(mp_dist, threshold_lower, threshold_upper) - # anomaly identified. apply logic to check for peak and trough - flag = self._adjust_flag_for_vicinity(ts_value, flag, ts_baseline[-2 * window_size :]) return [score], [flag] def _to_flag(self, mp_dist: float, threshold_lower: float, threshold_upper: float): diff --git a/src/seer/anomaly_detection/detectors/normalizers.py b/src/seer/anomaly_detection/detectors/normalizers.py index ea6506546..d20aff78d 100644 --- a/src/seer/anomaly_detection/detectors/normalizers.py +++ b/src/seer/anomaly_detection/detectors/normalizers.py @@ -18,4 +18,9 @@ def normalize(self, array: npt.NDArray) -> npt.NDArray: class MinMaxNormalizer(Normalizer): def normalize(self, array: npt.NDArray) -> npt.NDArray: """Applies min-max normalization to input array""" + if array.var() == 0: + if array[0] == 0: + return array + else: + return np.full_like(array, 1.0) return (array - np.min(array)) / (np.max(array) - np.min(array)) diff --git a/src/seer/anomaly_detection/detectors/window_size_selectors.py b/src/seer/anomaly_detection/detectors/window_size_selectors.py index f2d83539d..ac5b607b6 100644 --- a/src/seer/anomaly_detection/detectors/window_size_selectors.py +++ b/src/seer/anomaly_detection/detectors/window_size_selectors.py @@ -3,9 +3,11 @@ import numpy as np import numpy.typing as npt import pandas as pd +import sentry_sdk from pydantic import BaseModel, Field from seer.anomaly_detection.detectors.normalizers import MinMaxNormalizer, Normalizer +from seer.tags import AnomalyDetectionTags class WindowSizeSelector(BaseModel, abc.ABC): @@ -84,9 +86,13 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int: The time series as a seaquence of float values. """ + if time_series.var() == 0: + sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1) + sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 1) + return 3 time_series = self.normalizer.normalize(time_series) - ts_mean = np.mean(time_series) - ts_std = np.std(time_series) + ts_mean = time_series.mean() + ts_std = time_series.std() ts_min_max = np.max(time_series) - np.min(time_series) lbound = self.lbound @@ -119,7 +125,9 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int: exp += 1 if not found_window: - raise Exception("Search for optimal window failed.") + # raise Exception("Search for optimal window failed.") + sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1) + return 3 lbound = max(lbound, 2 ** (exp - 1)) @@ -136,5 +144,9 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int: ubound = window_size - 1 else: break - - return 2 * lbound + window_size = 2 * lbound + if window_size >= len(time_series): + sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1) + return 3 + sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 0) + return window_size diff --git a/src/seer/tags.py b/src/seer/tags.py new file mode 100644 index 000000000..f40c8ae4b --- /dev/null +++ b/src/seer/tags.py @@ -0,0 +1,14 @@ +from enum import StrEnum + + +class AnomalyDetectionTags(StrEnum): + ALERT_ID = "alert_id" + MODE = "mode" + LOW_VARIANCE_TS = "low_variance_ts" + WINDOW_SEARCH_FAILED = "window_search_failed" + + +class AnomalyDetectionModes(StrEnum): + STREAMING_ALERT = "streaming.alert" + STREAMING_TS_WITH_HISTORY = "streaming.ts_with_history" + BATCH_TS_FULL = "batch.ts_full" diff --git a/tests/seer/anomaly_detection/detectors/test_anomaly_detectors.py b/tests/seer/anomaly_detection/detectors/test_anomaly_detectors.py index f0aad12a0..2d12933ba 100644 --- a/tests/seer/anomaly_detection/detectors/test_anomaly_detectors.py +++ b/tests/seer/anomaly_detection/detectors/test_anomaly_detectors.py @@ -3,14 +3,15 @@ import numpy as np +from seer.anomaly_detection.detectors import MPConfig, MPIRQScorer, MPUtils, SuSSWindowSizeSelector from seer.anomaly_detection.detectors.anomaly_detectors import ( MPBatchAnomalyDetector, MPStreamAnomalyDetector, ) -from seer.anomaly_detection.detectors.mp_config import MPConfig from seer.anomaly_detection.models import MPTimeSeriesAnomalies from seer.anomaly_detection.models.external import AnomalyDetectionConfig from seer.anomaly_detection.models.timeseries import TimeSeries +from seer.exceptions import ServerError from tests.seer.anomaly_detection.test_utils import convert_synthetic_ts @@ -76,6 +77,27 @@ def test_compute_matrix_profile(self, mock_stump): self.ws_selector.optimal_window_size.assert_called_once() self.mp_utils.get_mp_dist_from_mp.assert_called_once() + def test_invalid_window_size(self): + + timeseries, mp_dists, window_sizes = convert_synthetic_ts( + "tests/seer/anomaly_detection/test_data/synthetic_series", as_ts_datatype=False + ) + + ts_values, _, _ = timeseries[0], mp_dists[0], window_sizes[0] + ts = TimeSeries(timestamps=np.array([]), values=ts_values) + + self.ws_selector.optimal_window_size = MagicMock(return_value=-1) + + with self.assertRaises(ServerError, msg="Invalid window size"): + self.detector._compute_matrix_profile( + ts, + self.config, + ws_selector=self.ws_selector, + mp_config=self.mp_config, + scorer=self.scorer, + mp_utils=self.mp_utils, + ) + class TestMPStreamAnomalyDetector(unittest.TestCase): @@ -92,6 +114,7 @@ def setUp(self): self.config = AnomalyDetectionConfig( time_period=15, sensitivity="low", direction="up", expected_seasonality="auto" ) # TODO: Placeholder values as not used in detection yet + self.mp_config = MPConfig(ignore_trivial=False, normalize_mp=False) @patch("stumpy.stumpi") @patch("seer.anomaly_detection.detectors.MPScorer") @@ -123,3 +146,119 @@ def test_detect(self, MockMPUtils, MockMPScorer, MockStumpi): assert len(anomalies.matrix_profile) == 3 mock_scorer.stream_score.assert_called() mock_stream.update.assert_called() + + def _detect_anomalies(self, history_ts, stream_ts): + batch_detector = MPBatchAnomalyDetector() + history_ts_timestamps = np.arange(1.0, len(history_ts) + 1) + stream_ts_timestamps = np.array(list(range(1, len(stream_ts) + 1))) + len(history_ts) + batch_anomalies = batch_detector._compute_matrix_profile( + timeseries=TimeSeries(timestamps=history_ts_timestamps, values=np.array(history_ts)), + config=self.config, + ws_selector=SuSSWindowSizeSelector(), + mp_config=self.mp_config, + scorer=MPIRQScorer(), + mp_utils=MPUtils(), + ) + stream_detector = MPStreamAnomalyDetector( + base_timestamps=np.array(history_ts_timestamps), + base_values=np.array(history_ts), + base_mp=batch_anomalies.matrix_profile, + window_size=batch_anomalies.window_size, + ) + stream_anomalies = stream_detector.detect( + timeseries=TimeSeries(timestamps=stream_ts_timestamps, values=np.array(stream_ts)), + config=self.config, + scorer=MPIRQScorer(), + mp_utils=MPUtils(), + ) + return batch_anomalies, stream_anomalies + + def test_stream_detect_spiked_history_spiked_stream_long_ts(self): + history_ts = [0.5] * 200 + history_ts[-115] = 1.0 + stream_ts = [0.5, 0.5, 1.2, *[0.5] * 10] + expected_stream_flags = [ + "none", + "none", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "none", + "none", + "none", + "none", + "none", + "none", + "none", + ] + history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts) + print(stream_anomalies.flags) + assert history_anomalies.window_size == 90 + assert stream_anomalies.flags == expected_stream_flags + + def test_stream_detect_spiked_history_spiked_stream(self): + history_ts = [0.5] * 20 + history_ts[-15] = 1.0 # Spiked history + stream_ts = [0.5, 0.5, 1.0, *[0.5] * 10] # Spiked stream + expected_stream_flags = [ + "none", + "none", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "none", + "none", + "none", + "none", + ] + history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts) + print(stream_anomalies.flags) + assert history_anomalies.window_size == 3 + assert stream_anomalies.flags == expected_stream_flags + + def test_stream_detect_flat_history_flat_stream(self): + history_ts = [0.5] * 200 # Flat history + stream_ts = [0.5] * 10 # Flat stream + expected_stream_flags = ["none"] * len(stream_ts) + + history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts) + assert history_anomalies.window_size == 3 + assert stream_anomalies.flags == expected_stream_flags + + def test_stream_detect_flat_history_spiked_stream(self): + history_ts = [0.5] * 200 # Flat history + stream_ts = [0.5, 0.5, 1.0, *[0.5] * 10] # Spiked stream + expected_stream_flags = [ + "none", + "none", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "none", + "none", + "none", + "none", + ] + + history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts) + assert history_anomalies.window_size == 3 + assert stream_anomalies.flags == expected_stream_flags + + def test_stream_detect_spliked_history_flat_stream(self): + history_ts = [0.5] * 200 + history_ts[-15] = 1.0 # Spiked history + stream_ts = [0.5] * 10 # Flat stream + expected_stream_flags = ["none"] * 10 + + history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts) + assert history_anomalies.window_size == 132 + assert stream_anomalies.flags == expected_stream_flags diff --git a/tests/seer/anomaly_detection/detectors/test_nomalizers.py b/tests/seer/anomaly_detection/detectors/test_nomalizers.py index 194643b62..c039faab2 100644 --- a/tests/seer/anomaly_detection/detectors/test_nomalizers.py +++ b/tests/seer/anomaly_detection/detectors/test_nomalizers.py @@ -18,8 +18,14 @@ def test_normalize_standard_array(self): np.testing.assert_equal(result, expected) def test_normalize_identical_values(self): - array = np.array([3, 3, 3, 3]) - expected = np.array([np.nan, np.nan, np.nan, np.nan]) + array = np.array([3] * 4) + expected = np.array([1.0] * 4) + result = self.normalizer.normalize(array) + np.testing.assert_equal(result, expected) + + def test_normalize_zeroes(self): + array = np.array([0] * 4) + expected = np.array([0.0] * 4) result = self.normalizer.normalize(array) np.testing.assert_equal(result, expected) @@ -29,6 +35,6 @@ def test_empty_array(self): def test_single_element_array(self): array = np.array([10]) - expected = np.array([np.nan]) + expected = np.array([1.0]) result = self.normalizer.normalize(array) np.testing.assert_equal(result, expected) diff --git a/tests/seer/anomaly_detection/detectors/test_window_size_selectors.py b/tests/seer/anomaly_detection/detectors/test_window_size_selectors.py index e0d51447e..4e34c2e15 100644 --- a/tests/seer/anomaly_detection/detectors/test_window_size_selectors.py +++ b/tests/seer/anomaly_detection/detectors/test_window_size_selectors.py @@ -13,8 +13,8 @@ def setUp(self): def test_optimal_window_size_constant_series(self): ts = np.array([5.0] * 700) - with self.assertRaises(Exception, msg="Search for optimal window failed."): - self.selector.optimal_window_size(ts) + window_size = self.selector.optimal_window_size(ts) + assert window_size == 3 def test_optimal_window_size_linear_series(self): ts = np.linspace(1, 100, 100) @@ -25,8 +25,8 @@ def test_optimal_window_size_linear_series(self): def test_optimal_window_size_short_series(self): ts = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - with self.assertRaises(Exception, msg="Search for optimal window failed."): - self.selector.optimal_window_size(ts) + window_size = self.selector.optimal_window_size(ts) + assert window_size == 3 def test_optimal_window_size(self):