Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(anomaly_detection) Handle timeseries with very low variance #1186

Merged
merged 3 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ coverage:
trend_detection:
paths:
- 'src/seer/trend_detection/'
anomaly_detection:
paths:
- 'src/seer/anomaly_detection/'
app:
paths:
- 'src/seer/app.py'
Expand Down
13 changes: 8 additions & 5 deletions src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from seer.dependency_injection import inject, injected
from seer.exceptions import ClientError, ServerError
from seer.tags import AnomalyDetectionModes, AnomalyDetectionTags

anomaly_detection_module.enable()
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -226,16 +227,16 @@ def detect_anomalies(self, request: DetectAnomaliesRequest) -> DetectAnomaliesRe
Anomaly detection request that has either a complete time series or an alert reference.
"""
if isinstance(request.context, AlertInSeer):
mode = "streaming.alert"
mode = AnomalyDetectionModes.STREAMING_ALERT
elif isinstance(request.context, TimeSeriesWithHistory):
mode = "streaming.ts_with_history"
mode = AnomalyDetectionModes.STREAMING_TS_WITH_HISTORY
else:
mode = "batch.ts_full"
mode = AnomalyDetectionModes.BATCH_TS_FULL

sentry_sdk.set_tag("ad_mode", mode)
sentry_sdk.set_tag(AnomalyDetectionTags.MODE, mode)

if isinstance(request.context, AlertInSeer):
sentry_sdk.set_tag("alert_id", request.context.id)
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.context.id)
ts, anomalies = self._online_detect(request.context, request.config)
elif isinstance(request.context, TimeSeriesWithHistory):
ts, anomalies = self._combo_detect(request.context, request.config)
Expand All @@ -255,6 +256,7 @@ def store_data(
request: StoreDataRequest
Alert information along with underlying time series data
"""
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id)
# Ensure we have at least 7 days of data in the time series
min_len = self._min_required_timesteps(request.config.time_period)
if len(request.timeseries) < min_len:
Expand Down Expand Up @@ -302,5 +304,6 @@ def delete_alert_data(
request: DeleteAlertDataRequest
Alert to clear
"""
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.alert.id)
alert_data_accessor.delete_alert_data(external_alert_id=request.alert.id)
return DeleteAlertDataResponse(success=True)
4 changes: 2 additions & 2 deletions src/seer/anomaly_detection/anomaly_detection_di.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from seer.anomaly_detection.accessors import AlertDataAccessor, DbAlertDataAccessor
from seer.anomaly_detection.detectors import (
MinMaxNormalizer,
MPCascadingScorer,
MPConfig,
MPIRQScorer,
MPScorer,
MPUtils,
Normalizer,
Expand All @@ -22,7 +22,7 @@ def alert_data_accessor_provider() -> AlertDataAccessor:

@anomaly_detection_module.provider
def mp_scorer_provider() -> MPScorer:
return MPIRQScorer()
return MPCascadingScorer()


@anomaly_detection_module.provider
Expand Down
6 changes: 3 additions & 3 deletions src/seer/anomaly_detection/detectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
)

AnomalyDetector = anomaly_detectors.AnomalyDetector
DummyAnomalyDetector = anomaly_detectors.DummyAnomalyDetector
MPConfig = mp_config.MPConfig
MPBatchAnomalyDetector = anomaly_detectors.MPBatchAnomalyDetector
MPStreamAnomalyDetector = anomaly_detectors.MPStreamAnomalyDetector

WindowSizeSelector = window_size_selectors.WindowSizeSelector
SuSSWindowSizeSelector = window_size_selectors.SuSSWindowSizeSelector

FlagsAndScores = mp_scorers.FlagsAndScores
MPScorer = mp_scorers.MPScorer
MPIRQScorer = mp_scorers.MPIRQScorer
MPIRQScorer = mp_scorers.MPIQRScorer
MPCascadingScorer = mp_scorers.MPCascadingScorer

Normalizer = normalizers.Normalizer
MinMaxNormalizer = normalizers.MinMaxNormalizer
Expand Down
51 changes: 20 additions & 31 deletions src/seer/anomaly_detection/detectors/anomaly_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from seer.anomaly_detection.detectors.window_size_selectors import WindowSizeSelector
from seer.anomaly_detection.models import (
AnomalyDetectionConfig,
AnomalyFlags,
MPTimeSeriesAnomalies,
TimeSeries,
TimeSeriesAnomalies,
Expand Down Expand Up @@ -83,7 +84,6 @@ def _compute_matrix_profile(
"""
ts_values = timeseries.values # np.array([np.float64(point.value) for point in timeseries])
window_size = ws_selector.optimal_window_size(ts_values)
logger.debug(f"window_size: {window_size}")
if window_size <= 0:
# TODO: Add sentry logging of this error
raise ServerError("Invalid window size")
Expand All @@ -98,17 +98,19 @@ def _compute_matrix_profile(
# we do not normalize the matrix profile here as normalizing during stream detection later is not straighforward.
mp_dist = mp_utils.get_mp_dist_from_mp(mp, pad_to_len=len(ts_values))

scores, flags = scorer.batch_score(
ts_values,
mp_dist,
flags_and_scores = scorer.batch_score(
ts=ts_values,
mp_dist=mp_dist,
sensitivity=config.sensitivity,
direction=config.direction,
window_size=window_size,
)
if flags_and_scores is None:
raise ServerError("Failed to score the matrix profile distance")

return MPTimeSeriesAnomalies(
flags=flags,
scores=scores,
flags=flags_and_scores.flags,
scores=flags_and_scores.scores,
matrix_profile=mp,
window_size=window_size,
)
Expand Down Expand Up @@ -151,9 +153,9 @@ def detect(
)

with sentry_sdk.start_span(description="Stream compute MP"):
scores = []
flags = []
streamed_mp = []
scores: list[float] = []
flags: list[AnomalyFlags] = []
streamed_mp: list[list[float]] = []
for cur_val in timeseries.values:
# Update the sumpi stream processor with new data
stream.update(cur_val)
Expand All @@ -162,17 +164,19 @@ def detect(
cur_mp = [stream.P_[-1], stream.I_[-1], stream.left_I_[-1], -1]
streamed_mp.append(cur_mp)
mp_dist_baseline = mp_utils.get_mp_dist_from_mp(self.base_mp, pad_to_len=None)
cur_scores, cur_flags = scorer.stream_score(
ts_value=cur_val,
mp_dist=stream.P_[-1],
flags_and_scores = scorer.stream_score(
ts_streamed=cur_val,
mp_dist_streamed=stream.P_[-1],
ts_history=self.base_values,
mp_dist_history=mp_dist_baseline,
sensitivity=config.sensitivity,
direction=config.direction,
window_size=self.window_size,
ts_baseline=self.base_values,
mp_dist_baseline=mp_dist_baseline,
)
scores.extend(cur_scores)
flags.extend(cur_flags)
if flags_and_scores is None:
raise ServerError("Failed to score the matrix profile distance")
scores.extend(flags_and_scores.scores)
flags.extend(flags_and_scores.flags)

# Add new data point as well as its matrix profile to baseline
self.base_values = stream.T_
Expand All @@ -189,18 +193,3 @@ def detect(
),
window_size=self.window_size,
)


class DummyAnomalyDetector(AnomalyDetector):
"""
Dummy anomaly detector used during dev work
"""

def detect(self, timeseries: TimeSeries, config: AnomalyDetectionConfig) -> TimeSeriesAnomalies:
anomalies = MPTimeSeriesAnomalies(
flags=np.array(["none"] * len(timeseries.values)),
scores=np.array([np.float64(0.5)] * len(timeseries.values)),
matrix_profile=np.array([]),
window_size=0,
)
return anomalies
Loading
Loading