diff --git a/codecov.yml b/codecov.yml index b5d971b2d..6bcc46250 100644 --- a/codecov.yml +++ b/codecov.yml @@ -16,6 +16,9 @@ coverage: trend_detection: paths: - 'src/seer/trend_detection/' + anomaly_detection: + paths: + - 'src/seer/anomaly_detection/' app: paths: - 'src/seer/app.py' diff --git a/src/seer/anomaly_detection/anomaly_detection.py b/src/seer/anomaly_detection/anomaly_detection.py index 670c5dcc1..86f85eeba 100644 --- a/src/seer/anomaly_detection/anomaly_detection.py +++ b/src/seer/anomaly_detection/anomaly_detection.py @@ -26,6 +26,7 @@ ) from seer.dependency_injection import inject, injected from seer.exceptions import ClientError, ServerError +from seer.tags import AnomalyDetectionModes, AnomalyDetectionTags anomaly_detection_module.enable() logger = logging.getLogger(__name__) @@ -226,16 +227,16 @@ def detect_anomalies(self, request: DetectAnomaliesRequest) -> DetectAnomaliesRe Anomaly detection request that has either a complete time series or an alert reference. """ if isinstance(request.context, AlertInSeer): - mode = "streaming.alert" + mode = AnomalyDetectionModes.STREAMING_ALERT elif isinstance(request.context, TimeSeriesWithHistory): - mode = "streaming.ts_with_history" + mode = AnomalyDetectionModes.STREAMING_TS_WITH_HISTORY else: - mode = "batch.ts_full" + mode = AnomalyDetectionModes.BATCH_TS_FULL - sentry_sdk.set_tag("ad_mode", mode) + sentry_sdk.set_tag(AnomalyDetectionTags.MODE, mode) if isinstance(request.context, AlertInSeer): - sentry_sdk.set_tag("alert_id", request.context.id) + sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.context.id) ts, anomalies = self._online_detect(request.context, request.config) elif isinstance(request.context, TimeSeriesWithHistory): ts, anomalies = self._combo_detect(request.context, request.config) @@ -255,6 +256,7 @@ def store_data( request: StoreDataRequest Alert information along with underlying time series data """ + sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id) # Ensure we have at least 7 days of data in the time series min_len = self._min_required_timesteps(request.config.time_period) if len(request.timeseries) < min_len: @@ -302,5 +304,6 @@ def delete_alert_data( request: DeleteAlertDataRequest Alert to clear """ + sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id) alert_data_accessor.delete_alert_data(external_alert_id=request.alert.id) return DeleteAlertDataResponse(success=True) diff --git a/src/seer/anomaly_detection/anomaly_detection_di.py b/src/seer/anomaly_detection/anomaly_detection_di.py index dd5f71bbb..e12d107e4 100644 --- a/src/seer/anomaly_detection/anomaly_detection_di.py +++ b/src/seer/anomaly_detection/anomaly_detection_di.py @@ -1,8 +1,8 @@ from seer.anomaly_detection.accessors import AlertDataAccessor, DbAlertDataAccessor from seer.anomaly_detection.detectors import ( MinMaxNormalizer, + MPCascadingScorer, MPConfig, - MPIRQScorer, MPScorer, MPUtils, Normalizer, @@ -22,7 +22,7 @@ def alert_data_accessor_provider() -> AlertDataAccessor: @anomaly_detection_module.provider def mp_scorer_provider() -> MPScorer: - return MPIRQScorer() + return MPCascadingScorer() @anomaly_detection_module.provider diff --git a/src/seer/anomaly_detection/detectors/__init__.py b/src/seer/anomaly_detection/detectors/__init__.py index 6ffeb2b3e..ee045b64d 100644 --- a/src/seer/anomaly_detection/detectors/__init__.py +++ b/src/seer/anomaly_detection/detectors/__init__.py @@ -8,16 +8,16 @@ ) AnomalyDetector = anomaly_detectors.AnomalyDetector -DummyAnomalyDetector = anomaly_detectors.DummyAnomalyDetector MPConfig = mp_config.MPConfig MPBatchAnomalyDetector = anomaly_detectors.MPBatchAnomalyDetector MPStreamAnomalyDetector = anomaly_detectors.MPStreamAnomalyDetector WindowSizeSelector = window_size_selectors.WindowSizeSelector SuSSWindowSizeSelector = window_size_selectors.SuSSWindowSizeSelector - +FlagsAndScores = mp_scorers.FlagsAndScores MPScorer = mp_scorers.MPScorer -MPIRQScorer = mp_scorers.MPIRQScorer +MPIRQScorer = mp_scorers.MPIQRScorer +MPCascadingScorer = mp_scorers.MPCascadingScorer Normalizer = normalizers.Normalizer MinMaxNormalizer = normalizers.MinMaxNormalizer diff --git a/src/seer/anomaly_detection/detectors/anomaly_detectors.py b/src/seer/anomaly_detection/detectors/anomaly_detectors.py index b3e8af558..58598c103 100644 --- a/src/seer/anomaly_detection/detectors/anomaly_detectors.py +++ b/src/seer/anomaly_detection/detectors/anomaly_detectors.py @@ -13,6 +13,7 @@ from seer.anomaly_detection.detectors.window_size_selectors import WindowSizeSelector from seer.anomaly_detection.models import ( AnomalyDetectionConfig, + AnomalyFlags, MPTimeSeriesAnomalies, TimeSeries, TimeSeriesAnomalies, @@ -83,7 +84,6 @@ def _compute_matrix_profile( """ ts_values = timeseries.values # np.array([np.float64(point.value) for point in timeseries]) window_size = ws_selector.optimal_window_size(ts_values) - logger.debug(f"window_size: {window_size}") if window_size <= 0: # TODO: Add sentry logging of this error raise ServerError("Invalid window size") @@ -98,17 +98,19 @@ def _compute_matrix_profile( # we do not normalize the matrix profile here as normalizing during stream detection later is not straighforward. mp_dist = mp_utils.get_mp_dist_from_mp(mp, pad_to_len=len(ts_values)) - scores, flags = scorer.batch_score( - ts_values, - mp_dist, + flags_and_scores = scorer.batch_score( + ts=ts_values, + mp_dist=mp_dist, sensitivity=config.sensitivity, direction=config.direction, window_size=window_size, ) + if flags_and_scores is None: + raise ServerError("Failed to score the matrix profile distance") return MPTimeSeriesAnomalies( - flags=flags, - scores=scores, + flags=flags_and_scores.flags, + scores=flags_and_scores.scores, matrix_profile=mp, window_size=window_size, ) @@ -151,9 +153,9 @@ def detect( ) with sentry_sdk.start_span(description="Stream compute MP"): - scores = [] - flags = [] - streamed_mp = [] + scores: list[float] = [] + flags: list[AnomalyFlags] = [] + streamed_mp: list[list[float]] = [] for cur_val in timeseries.values: # Update the sumpi stream processor with new data stream.update(cur_val) @@ -162,17 +164,19 @@ def detect( cur_mp = [stream.P_[-1], stream.I_[-1], stream.left_I_[-1], -1] streamed_mp.append(cur_mp) mp_dist_baseline = mp_utils.get_mp_dist_from_mp(self.base_mp, pad_to_len=None) - cur_scores, cur_flags = scorer.stream_score( - ts_value=cur_val, - mp_dist=stream.P_[-1], + flags_and_scores = scorer.stream_score( + ts_streamed=cur_val, + mp_dist_streamed=stream.P_[-1], + ts_history=self.base_values, + mp_dist_history=mp_dist_baseline, sensitivity=config.sensitivity, direction=config.direction, window_size=self.window_size, - ts_baseline=self.base_values, - mp_dist_baseline=mp_dist_baseline, ) - scores.extend(cur_scores) - flags.extend(cur_flags) + if flags_and_scores is None: + raise ServerError("Failed to score the matrix profile distance") + scores.extend(flags_and_scores.scores) + flags.extend(flags_and_scores.flags) # Add new data point as well as its matrix profile to baseline self.base_values = stream.T_ @@ -189,18 +193,3 @@ def detect( ), window_size=self.window_size, ) - - -class DummyAnomalyDetector(AnomalyDetector): - """ - Dummy anomaly detector used during dev work - """ - - def detect(self, timeseries: TimeSeries, config: AnomalyDetectionConfig) -> TimeSeriesAnomalies: - anomalies = MPTimeSeriesAnomalies( - flags=np.array(["none"] * len(timeseries.values)), - scores=np.array([np.float64(0.5)] * len(timeseries.values)), - matrix_profile=np.array([]), - window_size=0, - ) - return anomalies diff --git a/src/seer/anomaly_detection/detectors/mp_scorers.py b/src/seer/anomaly_detection/detectors/mp_scorers.py index 660bcbcd9..f14bb1bfb 100644 --- a/src/seer/anomaly_detection/detectors/mp_scorers.py +++ b/src/seer/anomaly_detection/detectors/mp_scorers.py @@ -1,15 +1,23 @@ import abc import logging +from typing import Optional import numpy as np import numpy.typing as npt -from pydantic import BaseModel +import sentry_sdk +from pydantic import BaseModel, Field from seer.anomaly_detection.models import AnomalyFlags, Directions, Sensitivities +from seer.tags import AnomalyDetectionTags logger = logging.getLogger(__name__) +class FlagsAndScores(BaseModel): + flags: list[AnomalyFlags] + scores: list[float] + + class MPScorer(BaseModel, abc.ABC): """ Abstract base class for calculating an anomaly score @@ -23,24 +31,84 @@ def batch_score( sensitivity: Sensitivities, direction: Directions, window_size: int, - ) -> tuple: + ) -> Optional[FlagsAndScores]: return NotImplemented @abc.abstractmethod def stream_score( self, - ts_value: float, - mp_dist: float, + ts_streamed: np.float64, + mp_dist_streamed: np.float64, + ts_history: npt.NDArray[np.float64], + mp_dist_history: npt.NDArray[np.float64], sensitivity: Sensitivities, direction: Directions, window_size: int, - ts_baseline: npt.NDArray[np.float64], - mp_dist_baseline: npt.NDArray[np.float64], - ) -> tuple: + ) -> Optional[FlagsAndScores]: return NotImplemented -class MPIRQScorer(MPScorer): +class LowVarianceScorer(MPScorer): + """ + This class scores anomalies using mean if the series has a low variance. + """ + + variance_threshold: float = Field( + 0.0001, description="Minimum variance required in order to use IQR based scoring" + ) + + def _to_flag_and_score( + self, val: np.float64, ts_mean: np.float64 + ) -> tuple[AnomalyFlags, float]: + if abs(val) >= 2 * abs(ts_mean): + return "anomaly_higher_confidence", 0.9 + return "none", 0.0 + + def batch_score( + self, + ts: npt.NDArray[np.float64], + mp_dist: npt.NDArray[np.float64], + sensitivity: Sensitivities, + direction: Directions, + window_size: int, + ) -> Optional[FlagsAndScores]: + ts_variance = ts.var() + ts_mean = ts.mean() + scores = [] + flags = [] + if ts_variance > self.variance_threshold: + sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 0) + return None + + sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 1) + for val in ts: + # if current value is significantly higher or lower than the mean then mark it as high anomaly else mark it as no anomaly + flag, score = self._to_flag_and_score(val, ts_mean) + flags.append(flag) + scores.append(score) + return FlagsAndScores(flags=flags, scores=scores) + + def stream_score( + self, + ts_streamed: np.float64, + mp_dist_streamed: np.float64, + ts_history: npt.NDArray[np.float64], + mp_dist_history: npt.NDArray[np.float64], + sensitivity: Sensitivities, + direction: Directions, + window_size: int, + ) -> Optional[FlagsAndScores]: + context = ts_history[-2 * window_size :] + context_var = context.var() + if context_var > self.variance_threshold: + return None + # if current value is significantly higher or lower than the mean then mark it as high anomaly else mark it as no anomaly + flag, score = self._to_flag_and_score(ts_streamed, context.mean()) + + return FlagsAndScores(flags=[flag], scores=[score]) + + +class MPIQRScorer(MPScorer): def batch_score( self, @@ -49,7 +117,7 @@ def batch_score( sensitivity: Sensitivities, direction: Directions, window_size: int, - ) -> tuple: + ) -> FlagsAndScores: """ Scores anomalies by computing the distance of the relevant MP distance from quartiles. This approach is not swayed by extreme values in MP distances. It also converts the score to a flag with a more meaningful interpretation of score. @@ -71,6 +139,9 @@ def batch_score( * "anomaly_lower_confidence" - indicating anomaly but only with a lower threshold * "anomaly_higher_confidence" - indicating anomaly with a higher threshold """ + scores: list[float] = [] + flags: list[AnomalyFlags] = [] + # Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring. mp_dist_baseline_finite = mp_dist[np.isfinite(mp_dist)] @@ -84,8 +155,6 @@ def batch_score( threshold_upper = Q3 + (1.5 * IQR_L) # Compute score and anomaly flags - scores = [] - flags = [] for i, val in enumerate(mp_dist): scores.append(0.0 if np.isnan(val) or np.isinf(val) else val - threshold_upper) flag = self._to_flag(val, threshold_lower, threshold_upper) @@ -95,18 +164,18 @@ def batch_score( ) flags.append(flag) - return scores, flags + return FlagsAndScores(flags=flags, scores=scores) def stream_score( self, - ts_value: float, - mp_dist: float, + ts_streamed: np.float64, + mp_dist_streamed: np.float64, + ts_history: npt.NDArray[np.float64], + mp_dist_history: npt.NDArray[np.float64], sensitivity: Sensitivities, direction: Directions, window_size: int, - ts_baseline: npt.NDArray[np.float64], - mp_dist_baseline: npt.NDArray[np.float64], - ) -> tuple: + ) -> FlagsAndScores: """ Scores anomalies by computing the distance of the relevant MP distance from quartiles. This approach is not swayed by extreme values in MP distances. It also converts the score to a flag with a more meaningful interpretation of score. @@ -131,7 +200,7 @@ def stream_score( * "anomaly_higher_confidence" - indicating anomaly with a higher threshold """ # Stumpy returns inf for the first timeseries[0:window_size - 2] entries. We just need to ignore those before scoring. - mp_dist_baseline_finite = mp_dist_baseline[np.isfinite(mp_dist_baseline)] + mp_dist_baseline_finite = mp_dist_history[np.isfinite(mp_dist_history)] # Compute the quantiles for two different threshold levels [Q1, Q3] = np.quantile(mp_dist_baseline_finite, [0.25, 0.75]) @@ -143,13 +212,18 @@ def stream_score( threshold_upper = Q3 + (1.5 * IQR_L) # Compute score and anomaly flags - score = 0.0 if np.isnan(mp_dist) or np.isinf(mp_dist) else mp_dist - threshold_upper - flag = self._to_flag(mp_dist, threshold_lower, threshold_upper) + score = ( + 0.0 + if np.isnan(mp_dist_streamed) or np.isinf(mp_dist_streamed) + else mp_dist_streamed - threshold_upper + ) + flag = self._to_flag(mp_dist_streamed, threshold_lower, threshold_upper) # anomaly identified. apply logic to check for peak and trough - flag = self._adjust_flag_for_vicinity(ts_value, flag, ts_baseline[-2 * window_size :]) - return [score], [flag] + flag = self._adjust_flag_for_vicinity(ts_streamed, flag, ts_history[-2 * window_size :]) + + return FlagsAndScores(flags=[flag], scores=[score]) - def _to_flag(self, mp_dist: float, threshold_lower: float, threshold_upper: float): + def _to_flag(self, mp_dist: np.float64, threshold_lower: float, threshold_upper: float): if np.isnan(mp_dist): return "none" if mp_dist < threshold_lower: @@ -159,7 +233,7 @@ def _to_flag(self, mp_dist: float, threshold_lower: float, threshold_upper: floa return "anomaly_higher_confidence" def _adjust_flag_for_vicinity( - self, ts_value: float, flag: AnomalyFlags, context: npt.NDArray[np.float64] + self, ts_value: np.float64, flag: AnomalyFlags, context: npt.NDArray[np.float64] ) -> AnomalyFlags: """ This method adjusts the severity of a detected anomaly based on the underlying time step's proximity to peaks and troughs. @@ -188,3 +262,51 @@ def _adjust_flag_for_vicinity( # else: # flag = "anomaly_higher_confidence" return flag + + +class MPCascadingScorer(MPScorer): + """ + This class combines the results of the LowVarianceScorer and the MPIQRScorer. + """ + + scorers: list[MPScorer] = Field( + [LowVarianceScorer(), MPIQRScorer()], description="The list of scorers to cascade" + ) + + def batch_score( + self, + ts: npt.NDArray[np.float64], + mp_dist: npt.NDArray[np.float64], + sensitivity: Sensitivities, + direction: Directions, + window_size: int, + ) -> Optional[FlagsAndScores]: + for scorer in self.scorers: + flags_and_scores = scorer.batch_score(ts, mp_dist, sensitivity, direction, window_size) + if flags_and_scores is not None: + return flags_and_scores + return None + + def stream_score( + self, + ts_streamed: np.float64, + mp_dist_streamed: np.float64, + ts_history: npt.NDArray[np.float64], + mp_dist_history: npt.NDArray[np.float64], + sensitivity: Sensitivities, + direction: Directions, + window_size: int, + ) -> Optional[FlagsAndScores]: + for scorer in self.scorers: + flags_and_scores = scorer.stream_score( + ts_streamed, + mp_dist_streamed, + ts_history, + mp_dist_history, + sensitivity, + direction, + window_size, + ) + if flags_and_scores is not None: + return flags_and_scores + return None diff --git a/src/seer/anomaly_detection/detectors/normalizers.py b/src/seer/anomaly_detection/detectors/normalizers.py index ea6506546..d20aff78d 100644 --- a/src/seer/anomaly_detection/detectors/normalizers.py +++ b/src/seer/anomaly_detection/detectors/normalizers.py @@ -18,4 +18,9 @@ def normalize(self, array: npt.NDArray) -> npt.NDArray: class MinMaxNormalizer(Normalizer): def normalize(self, array: npt.NDArray) -> npt.NDArray: """Applies min-max normalization to input array""" + if array.var() == 0: + if array[0] == 0: + return array + else: + return np.full_like(array, 1.0) return (array - np.min(array)) / (np.max(array) - np.min(array)) diff --git a/src/seer/anomaly_detection/detectors/window_size_selectors.py b/src/seer/anomaly_detection/detectors/window_size_selectors.py index f2d83539d..ac5b607b6 100644 --- a/src/seer/anomaly_detection/detectors/window_size_selectors.py +++ b/src/seer/anomaly_detection/detectors/window_size_selectors.py @@ -3,9 +3,11 @@ import numpy as np import numpy.typing as npt import pandas as pd +import sentry_sdk from pydantic import BaseModel, Field from seer.anomaly_detection.detectors.normalizers import MinMaxNormalizer, Normalizer +from seer.tags import AnomalyDetectionTags class WindowSizeSelector(BaseModel, abc.ABC): @@ -84,9 +86,13 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int: The time series as a seaquence of float values. """ + if time_series.var() == 0: + sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1) + sentry_sdk.set_tag(AnomalyDetectionTags.LOW_VARIANCE_TS, 1) + return 3 time_series = self.normalizer.normalize(time_series) - ts_mean = np.mean(time_series) - ts_std = np.std(time_series) + ts_mean = time_series.mean() + ts_std = time_series.std() ts_min_max = np.max(time_series) - np.min(time_series) lbound = self.lbound @@ -119,7 +125,9 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int: exp += 1 if not found_window: - raise Exception("Search for optimal window failed.") + # raise Exception("Search for optimal window failed.") + sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1) + return 3 lbound = max(lbound, 2 ** (exp - 1)) @@ -136,5 +144,9 @@ def optimal_window_size(self, time_series: npt.NDArray[np.float64]) -> int: ubound = window_size - 1 else: break - - return 2 * lbound + window_size = 2 * lbound + if window_size >= len(time_series): + sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 1) + return 3 + sentry_sdk.set_tag(AnomalyDetectionTags.WINDOW_SEARCH_FAILED, 0) + return window_size diff --git a/src/seer/tags.py b/src/seer/tags.py new file mode 100644 index 000000000..f40c8ae4b --- /dev/null +++ b/src/seer/tags.py @@ -0,0 +1,14 @@ +from enum import StrEnum + + +class AnomalyDetectionTags(StrEnum): + ALERT_ID = "alert_id" + MODE = "mode" + LOW_VARIANCE_TS = "low_variance_ts" + WINDOW_SEARCH_FAILED = "window_search_failed" + + +class AnomalyDetectionModes(StrEnum): + STREAMING_ALERT = "streaming.alert" + STREAMING_TS_WITH_HISTORY = "streaming.ts_with_history" + BATCH_TS_FULL = "batch.ts_full" diff --git a/tests/seer/anomaly_detection/detectors/test_anomaly_detectors.py b/tests/seer/anomaly_detection/detectors/test_anomaly_detectors.py index f0aad12a0..652ca25aa 100644 --- a/tests/seer/anomaly_detection/detectors/test_anomaly_detectors.py +++ b/tests/seer/anomaly_detection/detectors/test_anomaly_detectors.py @@ -3,14 +3,21 @@ import numpy as np +from seer.anomaly_detection.detectors import ( + FlagsAndScores, + MPCascadingScorer, + MPConfig, + MPUtils, + SuSSWindowSizeSelector, +) from seer.anomaly_detection.detectors.anomaly_detectors import ( MPBatchAnomalyDetector, MPStreamAnomalyDetector, ) -from seer.anomaly_detection.detectors.mp_config import MPConfig from seer.anomaly_detection.models import MPTimeSeriesAnomalies from seer.anomaly_detection.models.external import AnomalyDetectionConfig from seer.anomaly_detection.models.timeseries import TimeSeries +from seer.exceptions import ServerError from tests.seer.anomaly_detection.test_utils import convert_synthetic_ts @@ -34,9 +41,9 @@ def test_compute_matrix_profile(self, mock_stump): # Mock to return dummy values mock_stump.return_value = np.array([1, 2, 3, 4]) self.scorer.batch_score = MagicMock( - return_value=( - [0.1, 6.5, 4.8, 0.2], - ["none", "anomaly_higher_confidence", "anomaly_higher_confidence", "none"], + return_value=FlagsAndScores( + scores=[0.1, 6.5, 4.8, 0.2], + flags=["none", "anomaly_higher_confidence", "anomaly_higher_confidence", "none"], ) ) @@ -76,6 +83,27 @@ def test_compute_matrix_profile(self, mock_stump): self.ws_selector.optimal_window_size.assert_called_once() self.mp_utils.get_mp_dist_from_mp.assert_called_once() + def test_invalid_window_size(self): + + timeseries, mp_dists, window_sizes = convert_synthetic_ts( + "tests/seer/anomaly_detection/test_data/synthetic_series", as_ts_datatype=False + ) + + ts_values, _, _ = timeseries[0], mp_dists[0], window_sizes[0] + ts = TimeSeries(timestamps=np.array([]), values=ts_values) + + self.ws_selector.optimal_window_size = MagicMock(return_value=-1) + + with self.assertRaises(ServerError, msg="Invalid window size"): + self.detector._compute_matrix_profile( + ts, + self.config, + ws_selector=self.ws_selector, + mp_config=self.mp_config, + scorer=self.scorer, + mp_utils=self.mp_utils, + ) + class TestMPStreamAnomalyDetector(unittest.TestCase): @@ -92,6 +120,7 @@ def setUp(self): self.config = AnomalyDetectionConfig( time_period=15, sensitivity="low", direction="up", expected_seasonality="auto" ) # TODO: Placeholder values as not used in detection yet + self.mp_config = MPConfig(ignore_trivial=False, normalize_mp=False) @patch("stumpy.stumpi") @patch("seer.anomaly_detection.detectors.MPScorer") @@ -109,7 +138,7 @@ def test_detect(self, MockMPUtils, MockMPScorer, MockStumpi): mock_utils.get_mp_dist_from_mp.return_value = np.array([0.1, 0.2]) - mock_scorer.stream_score.return_value = ([0.5], ["none"]) + mock_scorer.stream_score.return_value = FlagsAndScores(scores=[0.5], flags=["none"]) anomalies = self.detector.detect(self.timeseries, self.config, mock_scorer, mock_utils) @@ -123,3 +152,117 @@ def test_detect(self, MockMPUtils, MockMPScorer, MockStumpi): assert len(anomalies.matrix_profile) == 3 mock_scorer.stream_score.assert_called() mock_stream.update.assert_called() + + def _detect_anomalies(self, history_ts, stream_ts): + batch_detector = MPBatchAnomalyDetector() + history_ts_timestamps = np.arange(1.0, len(history_ts) + 1) + stream_ts_timestamps = np.array(list(range(1, len(stream_ts) + 1))) + len(history_ts) + batch_anomalies = batch_detector._compute_matrix_profile( + timeseries=TimeSeries(timestamps=history_ts_timestamps, values=np.array(history_ts)), + config=self.config, + ws_selector=SuSSWindowSizeSelector(), + mp_config=self.mp_config, + scorer=MPCascadingScorer(), + mp_utils=MPUtils(), + ) + stream_detector = MPStreamAnomalyDetector( + base_timestamps=np.array(history_ts_timestamps), + base_values=np.array(history_ts), + base_mp=batch_anomalies.matrix_profile, + window_size=batch_anomalies.window_size, + ) + stream_anomalies = stream_detector.detect( + timeseries=TimeSeries(timestamps=stream_ts_timestamps, values=np.array(stream_ts)), + config=self.config, + scorer=MPCascadingScorer(), + mp_utils=MPUtils(), + ) + return batch_anomalies, stream_anomalies + + def test_stream_detect_spiked_history_spiked_stream_long_ts(self): + history_ts = [0.5] * 200 + history_ts[-115] = 1.0 + stream_ts = [0.5, 0.5, 1.2, *[0.5] * 10] + expected_stream_flags = [ + "none", + "none", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "none", + "none", + "none", + "none", + "none", + "none", + "none", + ] + history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts) + assert history_anomalies.window_size == 90 + assert stream_anomalies.flags == expected_stream_flags + + def test_stream_detect_spiked_history_spiked_stream(self): + history_ts = [0.5] * 20 + history_ts[-15] = 1.0 # Spiked history + stream_ts = [0.5, 0.5, 1.0, *[0.5] * 10] # Spiked stream + expected_stream_flags = [ + "none", + "none", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "none", + "none", + "none", + "none", + ] + history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts) + assert history_anomalies.window_size == 3 + assert stream_anomalies.flags == expected_stream_flags + + def test_stream_detect_flat_history_flat_stream(self): + history_ts = [0.5] * 200 # Flat history + stream_ts = [0.5] * 10 # Flat stream + expected_stream_flags = ["none"] * len(stream_ts) + + history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts) + assert history_anomalies.window_size == 3 + assert stream_anomalies.flags == expected_stream_flags + + def test_stream_detect_flat_history_spiked_stream(self): + history_ts = [0.5] * 200 # Flat history + stream_ts = [0.5, 0.5, 1.0, *[0.5] * 10] # Spiked stream + expected_stream_flags = [ + "none", + "none", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "anomaly_higher_confidence", + "none", + "none", + "none", + "none", + ] + + history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts) + assert history_anomalies.window_size == 3 + assert stream_anomalies.flags == expected_stream_flags + + def test_stream_detect_spliked_history_flat_stream(self): + history_ts = [0.5] * 200 + history_ts[-15] = 1.0 # Spiked history + stream_ts = [0.5] * 10 # Flat stream + expected_stream_flags = ["none"] * 10 + + history_anomalies, stream_anomalies = self._detect_anomalies(history_ts, stream_ts) + assert history_anomalies.window_size == 132 + assert stream_anomalies.flags == expected_stream_flags diff --git a/tests/seer/anomaly_detection/detectors/test_mp_scorers.py b/tests/seer/anomaly_detection/detectors/test_mp_scorers.py index a9289adb1..0c8539960 100644 --- a/tests/seer/anomaly_detection/detectors/test_mp_scorers.py +++ b/tests/seer/anomaly_detection/detectors/test_mp_scorers.py @@ -1,13 +1,15 @@ import unittest -from seer.anomaly_detection.detectors.mp_scorers import MPIRQScorer +import numpy as np + +from seer.anomaly_detection.detectors.mp_scorers import MPCascadingScorer, MPScorer from tests.seer.anomaly_detection.test_utils import convert_synthetic_ts class TestMPScorers(unittest.TestCase): def setUp(self): - self.scorer = MPIRQScorer() + self.scorer = MPCascadingScorer() def test_batch_score_synthetic_data(self): @@ -29,9 +31,11 @@ def test_batch_score_synthetic_data(self): expected_types, timeseries, mp_dists, window_sizes, window_starts, window_ends ): - _, actual_flags = self.scorer.batch_score( + flags_and_scores = self.scorer.batch_score( ts, mp_dist, sensitivity, direction, window_size ) + assert flags_and_scores is not None + actual_flags = flags_and_scores.flags # Calculate percentage of anomaly flags in given range num_anomalies_detected = 0 @@ -63,14 +67,46 @@ def test_stream_score(self): test_ts_val = ts_baseline[-1] * multiplier test_mp_dist = mp_dist_baseline[-1] * abs(multiplier) - _, flag = self.scorer.stream_score( + flags_and_scores = self.scorer.stream_score( test_ts_val, test_mp_dist, + ts_baseline, + mp_dist_baseline, sensitivity, direction, window_size, - ts_baseline, - mp_dist_baseline, ) + assert flags_and_scores is not None + actual_flags = flags_and_scores.flags + + assert actual_flags[0] == expected_flags[i] - assert flag[0] == expected_flags[i] + def test_cascading_scorer_failed_case(self): + class DummyScorer(MPScorer): + def batch_score(self, *args, **kwargs): + return None + + def stream_score(self, *args, **kwargs): + return None + + scorer = MPCascadingScorer(scorers=[DummyScorer(), DummyScorer()]) + + flags_and_scores = scorer.batch_score( + np.arange(1.0, 10), + np.arange(1.0, 10), + sensitivity="high", + direction="both", + window_size=3, + ) + assert flags_and_scores is None + + flags_and_scores = scorer.stream_score( + np.arange(1.0, 10), + np.arange(1.0, 10), + np.arange(1.0, 3), + np.arange(1.0, 3), + sensitivity="high", + direction="both", + window_size=3, + ) + assert flags_and_scores is None diff --git a/tests/seer/anomaly_detection/detectors/test_nomalizers.py b/tests/seer/anomaly_detection/detectors/test_nomalizers.py index 194643b62..c039faab2 100644 --- a/tests/seer/anomaly_detection/detectors/test_nomalizers.py +++ b/tests/seer/anomaly_detection/detectors/test_nomalizers.py @@ -18,8 +18,14 @@ def test_normalize_standard_array(self): np.testing.assert_equal(result, expected) def test_normalize_identical_values(self): - array = np.array([3, 3, 3, 3]) - expected = np.array([np.nan, np.nan, np.nan, np.nan]) + array = np.array([3] * 4) + expected = np.array([1.0] * 4) + result = self.normalizer.normalize(array) + np.testing.assert_equal(result, expected) + + def test_normalize_zeroes(self): + array = np.array([0] * 4) + expected = np.array([0.0] * 4) result = self.normalizer.normalize(array) np.testing.assert_equal(result, expected) @@ -29,6 +35,6 @@ def test_empty_array(self): def test_single_element_array(self): array = np.array([10]) - expected = np.array([np.nan]) + expected = np.array([1.0]) result = self.normalizer.normalize(array) np.testing.assert_equal(result, expected) diff --git a/tests/seer/anomaly_detection/detectors/test_window_size_selectors.py b/tests/seer/anomaly_detection/detectors/test_window_size_selectors.py index e0d51447e..4e34c2e15 100644 --- a/tests/seer/anomaly_detection/detectors/test_window_size_selectors.py +++ b/tests/seer/anomaly_detection/detectors/test_window_size_selectors.py @@ -13,8 +13,8 @@ def setUp(self): def test_optimal_window_size_constant_series(self): ts = np.array([5.0] * 700) - with self.assertRaises(Exception, msg="Search for optimal window failed."): - self.selector.optimal_window_size(ts) + window_size = self.selector.optimal_window_size(ts) + assert window_size == 3 def test_optimal_window_size_linear_series(self): ts = np.linspace(1, 100, 100) @@ -25,8 +25,8 @@ def test_optimal_window_size_linear_series(self): def test_optimal_window_size_short_series(self): ts = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) - with self.assertRaises(Exception, msg="Search for optimal window failed."): - self.selector.optimal_window_size(ts) + window_size = self.selector.optimal_window_size(ts) + assert window_size == 3 def test_optimal_window_size(self): diff --git a/tests/seer/anomaly_detection/test_anomaly_detection.py b/tests/seer/anomaly_detection/test_anomaly_detection.py index b94fc873d..1e8d9b426 100644 --- a/tests/seer/anomaly_detection/test_anomaly_detection.py +++ b/tests/seer/anomaly_detection/test_anomaly_detection.py @@ -3,11 +3,14 @@ import numpy as np +from seer.anomaly_detection.accessors import AlertDataAccessor from seer.anomaly_detection.anomaly_detection import AnomalyDetection from seer.anomaly_detection.models import DynamicAlert, MPTimeSeries from seer.anomaly_detection.models.external import ( AlertInSeer, AnomalyDetectionConfig, + DeleteAlertDataRequest, + DeleteAlertDataResponse, DetectAnomaliesRequest, DetectAnomaliesResponse, StoreDataRequest, @@ -16,6 +19,7 @@ TimeSeriesWithHistory, ) from seer.anomaly_detection.models.timeseries_anomalies import MPTimeSeriesAnomalies +from seer.exceptions import ClientError from tests.seer.anomaly_detection.test_utils import convert_synthetic_ts @@ -156,3 +160,45 @@ def test_detect_anomalies_combo(self): assert isinstance(response.timeseries, list) assert len(response.timeseries) == n assert isinstance(response.timeseries[0], TimeSeriesPoint) + + def test_delete_alert_data_success(self): + class MockAlertDataAccessor(AlertDataAccessor): + def delete_alert_data(self, external_alert_id: int): + assert external_alert_id == 1 + return DeleteAlertDataResponse(success=True) + + def query(self, *args, **kwargs) -> DynamicAlert | None: + return NotImplemented + + def save_alert(self, *args, **kwargs): + return NotImplemented + + def save_timepoint(self, *args, **kwargs): + return NotImplemented + + request = DeleteAlertDataRequest(organization_id=0, project_id=0, alert=AlertInSeer(id=1)) + response = AnomalyDetection().delete_alert_data( + request=request, alert_data_accessor=MockAlertDataAccessor() + ) + assert response == DeleteAlertDataResponse(success=True) + + def test_delete_alert_data_failure(self): + class MockAlertDataAccessor(AlertDataAccessor): + def delete_alert_data(self, external_alert_id: int): + raise ClientError(f"Alert id {external_alert_id} not found") + + def query(self, *args, **kwargs) -> DynamicAlert | None: + return NotImplemented + + def save_alert(self, *args, **kwargs): + return NotImplemented + + def save_timepoint(self, *args, **kwargs): + return NotImplemented + + request = DeleteAlertDataRequest(organization_id=0, project_id=0, alert=AlertInSeer(id=1)) + with self.assertRaises(ClientError) as e: + AnomalyDetection().delete_alert_data( + request=request, alert_data_accessor=MockAlertDataAccessor() + ) + assert "Alert id 1 not found" in str(e.exception)