Skip to content

Commit

Permalink
feat(crons): Implement make_clock_tick_decision (#80640)
Browse files Browse the repository at this point in the history
This function returns a DecisionResult which encapsulates the
TickAnomalyDecision and AnomalyTransition values for a particular clock
tick.

In the future this logic will be run at each clock tick and the result
will later be used to decide if we can process issue occurrences in the
incident_occurrences consumer for a specific clock tick.

Part of GH-79328
  • Loading branch information
evanpurkhiser authored Nov 13, 2024
1 parent 4a43dfa commit 3a905d6
Show file tree
Hide file tree
Showing 4 changed files with 699 additions and 23 deletions.
337 changes: 336 additions & 1 deletion src/sentry/monitors/system_incidents.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import logging
import statistics
from collections import Counter
from collections.abc import Sequence
from collections.abc import Generator, Sequence
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import StrEnum
from itertools import chain

from django.conf import settings

Expand All @@ -27,6 +30,9 @@
# This key is used to record the metric volume metric for the tick.
MONITOR_TICK_METRIC = "sentry.monitors.volume_metric:{ts}"

# This key is used to record the anomaly decision for a tick
MONITOR_TICK_DECISION = "sentry.monitors.tick_decision:{ts}"

# When fetching historic volume data to make a decision whether we have lost
# data this value will determine how many historic volume data-points we fetch
# of the window of the MONITOR_VOLUME_RETENTION. It is important to consider
Expand All @@ -42,6 +48,10 @@
# We record 30 days worth of historical data for each minute of check-ins.
MONITOR_VOLUME_RETENTION = timedelta(days=30)

# This is the number of previous ticks we will consider the tick metrics and
# tick decisions for to determine a decision about the tick being evaluated.
MONITOR_TICK_DECISION_WINDOW = 5


def update_check_in_volume(ts_list: Sequence[datetime]):
"""
Expand Down Expand Up @@ -166,6 +176,331 @@ def get_clock_tick_volume_metric(tick: datetime) -> float | None:
return None


class TickAnomalyDecision(StrEnum):
"""
This enum represents the system incident anomaly decision made for a
clock-tick. Tick transitions are represented by the AnomalyTransition.
"""

NORMAL = "normal"
"""
The tick is within expected volume levels and does not show any
abnormalities. The system is working as normal.
"""

ABNORMAL = "abnormal"
"""
The volume metrics have indicated that we've seen an abnormal number of
check-ins for this tick. We may be entering an INCIDENT state.
All abnormal tick decisions will be contiguous, and will resolve into
either NORMAL or INCIDENT.
"""

INCIDENT = "incident"
"""
The volume metrics have indicated that we are in a system incident, this
means we are not processing as many check-ins as we typically do.
Once in an incident we will transition into RECOVERING once we've detected
enough normal volume metrics.
"""

RECOVERING = "recovering"
"""
We are transitioning out of an incident. Volume metrics must remain below
abnormal levels in order for RECOVERING to transition into NORMAL.
All recovering tick decisions will be contiguous, and will resolve into
either NORMAL or back into INCIDENT.
"""

@classmethod
def from_str(cls, st: str) -> TickAnomalyDecision:
return cls[st.upper()]


class AnomalyTransition(StrEnum):
ABNORMALITY_STARTED = "abnormality_started"
"""
An abnormality has been detected during normal operations. We may
transition into a complete system incident, or the abnormality may recover
to normal.
"""

ABNORMALITY_RECOVERED = "abnormality_recovered"
"""
An abnormality has recovered back to a normal status.
"""

INCIDENT_STARTED = "incident_started"
"""
A system incident has been detected based on the historic check-in volume.
We are no longer able to reliably know that we are receving all check-ins.
"""

INCIDENT_RECOVERING = "incident_recovering"
"""
An incident has begun to recover. After this transition we will either
re-enter the incident va INCIDENT_STARTED or fully recover via
INCIDENT_RECOVERED.
"""

INCIDENT_RECOVERY_FAILED = "incident_recovery_failed"
"""
An incident failed to recover and has re-entered the incident state.
"""

INCIDENT_RECOVERED = "incident_recovered"
"""
An incident has recovered back to normal.
"""


@dataclass
class DecisionResult:
decision: TickAnomalyDecision
"""
The recorded decision made for the clock tick
"""

transition: AnomalyTransition | None = None
"""
Reflects the transition status when making a tick decision results in a
state transition. None if the decision has not changed.
"""


class Metric(StrEnum):
"""
A metric is similar to a tick decision, however it represents a decision
made on the volume metric. The metric we current consider is percent mean
deviation from historic volumes.
"""

NORMAL = "normal"
"""
The metric is below the abnormal threshold.
"""

ABNORMAL = "abnormal"
"""
The metric has surpassed the normal threshold but is still below the
incident threshold.
"""

INCIDENT = "incident"
"""
The metric has surpassed the incident threshold
"""

@staticmethod
def from_value(value: float | str | None) -> Metric:
"""
Determine an individual decision for the percentage deviation metric of a
clock tick. This only considers metrics that are negative, indicating
there's been a drop in check-in volume.
"""
# examples: -5% anomaly and -25% incident
anomaly_threshold = options.get("crons.system_incidents.pct_deviation_anomaly_threshold")
incident_threshold = options.get("crons.system_incidents.pct_deviation_incident_threshold")

# If we do not have a metric for this tick we must assume things are
# operating normally
if value is None:
return Metric.NORMAL

pct_deviation = float(value)

if pct_deviation <= incident_threshold:
return Metric.INCIDENT
if pct_deviation <= anomaly_threshold:
return Metric.ABNORMAL
return Metric.NORMAL


def make_clock_tick_decision(tick: datetime) -> DecisionResult:
"""
Given a clock tick timestamp determine based on the historic tick volume
metrics, and historic tick anomaly decisions, a DecisionResult.
This function will update previous decisions for earlier ticks detected as
ABNORMAL or RECOVERING to either NORMAL or INCIDENT.
The state transitions for tick decisions are as follows
┌───D────────────────────────────┐
┌────▼─┐ ┌────────┐ ┌────────┐ ┌┴─────────┐
│NORMAL├─A─►ABNORMAL├┬F─►INCIDENT├─C─►RECOVERING│
│ ◄─B─│ ││ │ ◄─E─┤ │
└────┬─┘ └────────┘│ └────────┘ └──────────┘
└───────────────┘
A: ABNORMALITY_STARTED
B: ABNORMALITY_RECOVERED
C: INCIDENT_RECOVERING
D: INCIDENT_RECOVERED
E: INCIDENT_RECOVERY_FAILED
F: INCIDENT_STARTED
"""
# Alias TickAnomalyDecision to improve code readability
Decision = TickAnomalyDecision

if not options.get("crons.tick_volume_anomaly_detection"):
return DecisionResult(Decision.NORMAL)

redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

tick_decision_window = options.get("crons.system_incidents.tick_decision_window")

# The clock has just ticked to the next minute. Look at the previous tick
# and decision metrics.
past_ts = tick - timedelta(minutes=1)

past_window_ts_keys = [
_make_reference_ts(past_ts - timedelta(minutes=delta))
for delta in range(0, tick_decision_window)
]

# Fetch histories for metrics and the last decision together. Window
# timestamps are reversed so the oldest metric is last.
redis_keys = chain(
(MONITOR_TICK_METRIC.format(ts=ts) for ts in reversed(past_window_ts_keys)),
(MONITOR_TICK_DECISION.format(ts=ts) for ts in [past_window_ts_keys[0]]),
)

values = redis_client.mget(redis_keys)

# Tick metrics are the first tick_decision_window values
tick_metrics = [Metric.from_value(value) for value in values[:-1]]
last_metric = tick_metrics[-1]

# The last decision is the last value fetched
if values[-1] is not None:
last_decision = Decision.from_str(values[-1])
else:
# By default the previous decision is used. If there was no previous
# decision we can only assume things are operating normally
last_decision = Decision.NORMAL

def make_decision(
decision: TickAnomalyDecision,
transition: AnomalyTransition | None = None,
) -> DecisionResult:
decision_key = MONITOR_TICK_DECISION.format(ts=_make_reference_ts(tick))
pipeline = redis_client.pipeline()
pipeline.set(decision_key, decision)
pipeline.expire(decision_key, MONITOR_VOLUME_RETENTION)
pipeline.execute()

return DecisionResult(decision, transition)

def metrics_match(metric: Metric) -> Generator[bool]:
return (d == metric for d in tick_metrics)

# A: NORMAL -> ABNORMAL
#
# If we've detected an anomaly and we're not already in an incident,
# anomalous state, or recovering, mark this tick as anomalous.
if last_decision == Decision.NORMAL and last_metric == Metric.ABNORMAL:
return make_decision(Decision.ABNORMAL, AnomalyTransition.ABNORMALITY_STARTED)

# B: ABNORMAL -> NORMAL
#
# If the previous result was anomalous check and if we have recovered and can
# backfill these decisions as normal
if last_decision == Decision.ABNORMAL and all(metrics_match(Metric.NORMAL)):
_backfill_decisions(past_ts, Decision.NORMAL, Decision.ABNORMAL)
return make_decision(Decision.NORMAL, AnomalyTransition.ABNORMALITY_RECOVERED)

# C: INCIDENT -> RECOVERING
#
# If we are actively in an incident and the most recent metric value has
# recovered to normal we can de-escalate the incident to abnormal.
if last_decision == Decision.INCIDENT and last_metric == Metric.NORMAL:
return make_decision(Decision.RECOVERING, AnomalyTransition.INCIDENT_RECOVERING)

# D: RECOVERING -> NORMAL
#
# If the previous result was recovering, check if we have recovered and can
# backfill these decisions as normal.
if last_decision == Decision.RECOVERING and all(metrics_match(Metric.NORMAL)):
_backfill_decisions(past_ts, Decision.NORMAL, Decision.RECOVERING)
return make_decision(Decision.NORMAL, AnomalyTransition.INCIDENT_RECOVERED)

# E: RECOVERING -> INCIDENT
#
# If an incident had begun recovering but we've detected a non-normal
# metric, backfill all recovery decisions to an incident decision.
if last_decision == Decision.RECOVERING and last_metric != Metric.NORMAL:
_backfill_decisions(past_ts, Decision.INCIDENT, Decision.RECOVERING)
return make_decision(Decision.INCIDENT, AnomalyTransition.INCIDENT_RECOVERY_FAILED)

# F: [NORMAL, ABNORMAL] -> INCIDENT
#
# If we're not already in an incident and the most recent metric value is
# an incident, mark this tick as an incident and backfill all abnormal
# decisions to an incident decision.
if last_decision != Decision.INCIDENT and last_metric == Metric.INCIDENT:
_backfill_decisions(past_ts, Decision.INCIDENT, Decision.ABNORMAL)
return make_decision(Decision.INCIDENT, AnomalyTransition.INCIDENT_STARTED)

# NORMAL -> NORMAL
# ABNORMAL -> ABNORMAL
# INCIDENT -> INCIDENT
# RECOVERING -> RECOVERING
#
# No decision transition. Use the previous decision
return make_decision(last_decision)


def get_clock_tick_decision(tick: datetime) -> TickAnomalyDecision | None:
"""
Retrieve the TickAnomalyDecision for a specific clock tick.
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

if value := redis_client.get(MONITOR_TICK_DECISION.format(ts=_make_reference_ts(tick))):
return TickAnomalyDecision.from_str(value)
else:
return None


def _backfill_decisions(
start: datetime,
decision: TickAnomalyDecision,
until_not: TickAnomalyDecision,
) -> None:
"""
Update historic tick decisions from `start` to `decision` until we no
longer see the `until_not` decision.
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

ts = start
updates: dict[str | bytes, str] = {}

while True:
key = MONITOR_TICK_DECISION.format(ts=_make_reference_ts(ts))

# Nothing to backfill if we don't have a decision value
value = redis_client.get(key)
if value is None:
break

# Exit the backfill once we no longer have the until_not decision
prev_decision = TickAnomalyDecision.from_str(value)
if prev_decision != until_not:
break

updates[key] = decision.value
ts = ts - timedelta(minutes=1)

# Apply decision updates
if updates:
redis_client.mset(updates)


def _make_reference_ts(ts: datetime):
"""
Produce a timestamp number with the seconds and microsecond removed
Expand Down
21 changes: 0 additions & 21 deletions src/sentry/monitors/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from dataclasses import dataclass
from datetime import datetime
from enum import StrEnum
from typing import Literal, NotRequired, TypedDict, Union

from django.utils.functional import cached_property
Expand Down Expand Up @@ -129,23 +128,3 @@ class IntervalSchedule:


ScheduleConfig = Union[CrontabSchedule, IntervalSchedule]


class TickVolumeAnomolyResult(StrEnum):
"""
This enum represents the result of comparing the minute we ticked past
with it's historic volume data. This is used to determine if we may have
consumed an anomalous number of check-ins, indicating there is an upstream
incident and we care not able to reliably report misses and time-outs.
A NORMAL result means we've considered the volume to be within the expected
volume for that minute. A ANOMALY value indicates there was a drop in
volume significant enough to consider it abnormal.
"""

NORMAL = "normal"
ABNORMAL = "abnormal"

@classmethod
def from_str(cls, st: str) -> TickVolumeAnomolyResult:
return cls[st.upper()]
Loading

0 comments on commit 3a905d6

Please sign in to comment.