diff --git a/snuba/environment.py b/snuba/environment.py index 93f9c4026d..ca1a496438 100644 --- a/snuba/environment.py +++ b/snuba/environment.py @@ -87,6 +87,12 @@ def setup_sentry() -> None: release=os.getenv("SNUBA_RELEASE"), traces_sample_rate=settings.SENTRY_TRACE_SAMPLE_RATE, profiles_sample_rate=settings.SNUBA_PROFILES_SAMPLE_RATE, + _experiments={ + # Turns on the metrics module + "enable_metrics": True, + # Enables sending of code locations for metrics + "metric_code_locations": True, + }, ) from snuba.utils.profiler import run_ondemand_profiler diff --git a/snuba/utils/metrics/backends/abstract.py b/snuba/utils/metrics/backends/abstract.py index ee7037d5e4..fbe6a6761b 100644 --- a/snuba/utils/metrics/backends/abstract.py +++ b/snuba/utils/metrics/backends/abstract.py @@ -11,7 +11,11 @@ class MetricsBackend(ABC): @abstractmethod def increment( - self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float] = 1, + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: """ Increment a counter metric. These increments can also be @@ -27,7 +31,11 @@ def increment( @abstractmethod def gauge( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: """ Emit a metric that is the authoritative value for a quantity at a point in time @@ -40,7 +48,11 @@ def gauge( @abstractmethod def timing( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: """ Emit a metric for the timing performance of an operation. @@ -51,6 +63,23 @@ def timing( """ raise NotImplementedError + @abstractmethod + def distribution( + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, + ) -> None: + """ + Emit a metric for the performance of an operation. + + Example: + + metrics.distribution("request.size", request_size_in_bytes) + """ + raise NotImplementedError + @abstractmethod def events( self, diff --git a/snuba/utils/metrics/backends/datadog.py b/snuba/utils/metrics/backends/datadog.py index da8aff4ba2..1bd05692e9 100644 --- a/snuba/utils/metrics/backends/datadog.py +++ b/snuba/utils/metrics/backends/datadog.py @@ -47,7 +47,11 @@ def __normalize_tags(self, tags: Optional[Tags]) -> Optional[Sequence[str]]: return [f"{key}:{value.replace('|', '_')}" for key, value in tags.items()] def increment( - self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float] = 1, + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: self.__client.increment( name, @@ -57,7 +61,11 @@ def increment( ) def gauge( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: self.__client.gauge( name, @@ -67,7 +75,11 @@ def gauge( ) def timing( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: self.__client.timing( name, @@ -76,6 +88,20 @@ def timing( sample_rate=self.__sample_rates.get(name, 1.0), ) + def distribution( + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, + ) -> None: + self.__client.distribution( + name, + value, + tags=self.__normalize_tags(tags), + sample_rate=self.__sample_rates.get(name, 1.0), + ) + def events( self, title: str, diff --git a/snuba/utils/metrics/backends/dualwrite.py b/snuba/utils/metrics/backends/dualwrite.py new file mode 100644 index 0000000000..c69a2457cd --- /dev/null +++ b/snuba/utils/metrics/backends/dualwrite.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import random + +from snuba import settings, state +from snuba.utils.metrics.backends.abstract import MetricsBackend +from snuba.utils.metrics.backends.datadog import DatadogMetricsBackend +from snuba.utils.metrics.backends.sentry import SentryMetricsBackend +from snuba.utils.metrics.types import Tags + + +class SentryDatadogMetricsBackend(MetricsBackend): + """ + A metrics backend that records metrics to Sentry and Datadog. + """ + + def __init__( + self, datadog: DatadogMetricsBackend, sentry: SentryMetricsBackend + ) -> None: + self.datadog = datadog + self.sentry = sentry + + def _use_sentry(self) -> bool: + if state.get_config("use_sentry_metrics", "0") == "1": + return bool(random.random() < settings.DDM_METRICS_SAMPLE_RATE) + return False + + def increment( + self, + name: str, + value: int | float = 1, + tags: Tags | None = None, + unit: str | None = None, + ) -> None: + self.datadog.increment(name, value, tags, unit) + if self._use_sentry(): + self.sentry.increment(name, value, tags, unit) + + def gauge( + self, + name: str, + value: int | float, + tags: Tags | None = None, + unit: str | None = None, + ) -> None: + self.datadog.gauge(name, value, tags, unit) + if self._use_sentry(): + self.sentry.gauge(name, value, tags, unit) + + def timing( + self, + name: str, + value: int | float, + tags: Tags | None = None, + unit: str | None = None, + ) -> None: + self.datadog.timing(name, value, tags, unit) + if self._use_sentry(): + self.sentry.timing(name, value, tags, unit) + + def distribution( + self, + name: str, + value: int | float, + tags: Tags | None = None, + unit: str | None = None, + ) -> None: + self.datadog.distribution(name, value, tags, unit) + if self._use_sentry(): + self.sentry.distribution(name, value, tags, unit) + + def events( + self, + title: str, + text: str, + alert_type: str, + priority: str, + tags: Tags | None = None, + ) -> None: + self.datadog.events(title, text, alert_type, priority, tags) diff --git a/snuba/utils/metrics/backends/dummy.py b/snuba/utils/metrics/backends/dummy.py index 57aa89ff7b..c3d3ada1b9 100644 --- a/snuba/utils/metrics/backends/dummy.py +++ b/snuba/utils/metrics/backends/dummy.py @@ -26,7 +26,11 @@ def __validate_tags(self, tags: Tags) -> None: assert isinstance(v, str) def increment( - self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float] = 1, + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: if self.__strict: assert isinstance(name, str) @@ -35,7 +39,11 @@ def increment( self.__validate_tags(tags) def gauge( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: if self.__strict: assert isinstance(name, str) @@ -44,7 +52,24 @@ def gauge( self.__validate_tags(tags) def timing( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, + ) -> None: + if self.__strict: + assert isinstance(name, str) + assert isinstance(value, (int, float)) + if tags is not None: + self.__validate_tags(tags) + + def distribution( + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: if self.__strict: assert isinstance(name, str) diff --git a/snuba/utils/metrics/backends/sentry.py b/snuba/utils/metrics/backends/sentry.py new file mode 100644 index 0000000000..42d0b3c88e --- /dev/null +++ b/snuba/utils/metrics/backends/sentry.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from sentry_sdk import metrics + +from snuba.utils.metrics.backends.abstract import MetricsBackend +from snuba.utils.metrics.types import Tags + + +class SentryMetricsBackend(MetricsBackend): + """ + A metrics backend that records metrics to Sentry. + """ + + def __init__(self) -> None: + return None # Sentry doesn't require any setup + + def increment( + self, + name: str, + value: int | float = 1, + tags: Tags | None = None, + unit: str | None = None, + ) -> None: + metrics.incr(name, value, unit or "none", tags) + + def gauge( + self, + name: str, + value: int | float, + tags: Tags | None = None, + unit: str | None = None, + ) -> None: + metrics.gauge(name, value, unit or "none", tags) + + def timing( + self, + name: str, + value: int | float, + tags: Tags | None = None, + unit: str | None = None, + ) -> None: + # The Sentry SDK has strict typing on the unit, so it doesn't allow passing arbitrary units + metrics.timing(name, value, unit or "millisecond", tags) # type: ignore + + def distribution( + self, + name: str, + value: int | float, + tags: Tags | None = None, + unit: str | None = None, + ) -> None: + metrics.distribution(name, value, unit or "none", tags) + + def events( + self, + title: str, + text: str, + alert_type: str, + priority: str, + tags: Tags | None = None, + ) -> None: + return None # Sentry doesn't support events diff --git a/snuba/utils/metrics/backends/testing.py b/snuba/utils/metrics/backends/testing.py index 1b64cbd8f8..861bc8152f 100644 --- a/snuba/utils/metrics/backends/testing.py +++ b/snuba/utils/metrics/backends/testing.py @@ -11,6 +11,7 @@ class RecordedMetricCall: value: int | float tags: Tags + unit: str | None @dataclass(frozen=True) @@ -29,7 +30,11 @@ class RecordedEventCall: def record_metric_call( - mtype: str, name: str, value: int | float, tags: Optional[Tags] + mtype: str, + name: str, + value: int | float, + tags: Optional[Tags], + unit: Optional[str] = None, ) -> None: if mtype not in RECORDED_METRIC_CALLS: RECORDED_METRIC_CALLS[mtype] = {} @@ -39,7 +44,7 @@ def record_metric_call( if tags is None: tags = {} - RECORDED_METRIC_CALLS[mtype][name].append(RecordedMetricCall(value, tags)) + RECORDED_METRIC_CALLS[mtype][name].append(RecordedMetricCall(value, tags, unit)) def record_event_call( @@ -87,7 +92,11 @@ def __validate_tags(self, tags: Tags) -> None: assert isinstance(v, str) def increment( - self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float] = 1, + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: record_metric_call("increment", name, value, tags) if self.__strict: @@ -97,7 +106,11 @@ def increment( self.__validate_tags(tags) def gauge( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: record_metric_call("gauge", name, value, tags) if self.__strict: @@ -107,7 +120,11 @@ def gauge( self.__validate_tags(tags) def timing( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: record_metric_call("timing", name, value, tags) if self.__strict: @@ -116,6 +133,20 @@ def timing( if tags is not None: self.__validate_tags(tags) + def distribution( + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, + ) -> None: + record_metric_call("distribution", name, value, tags) + if self.__strict: + assert isinstance(name, str) + assert isinstance(value, (int, float)) + if tags is not None: + self.__validate_tags(tags) + def events( self, title: str, diff --git a/snuba/utils/metrics/util.py b/snuba/utils/metrics/util.py index b27511f9c7..7687c059ec 100644 --- a/snuba/utils/metrics/util.py +++ b/snuba/utils/metrics/util.py @@ -38,18 +38,23 @@ def create_metrics( from datadog import DogStatsd from snuba.utils.metrics.backends.datadog import DatadogMetricsBackend - - return DatadogMetricsBackend( - partial( - DogStatsd, - host=host, - port=port, - namespace=prefix, - constant_tags=[f"{key}:{value}" for key, value in tags.items()] - if tags is not None - else None, + from snuba.utils.metrics.backends.dualwrite import SentryDatadogMetricsBackend + from snuba.utils.metrics.backends.sentry import SentryMetricsBackend + + return SentryDatadogMetricsBackend( + DatadogMetricsBackend( + partial( + DogStatsd, + host=host, + port=port, + namespace=prefix, + constant_tags=[f"{key}:{value}" for key, value in tags.items()] + if tags is not None + else None, + ), + sample_rates, ), - sample_rates, + SentryMetricsBackend(), ) diff --git a/snuba/utils/metrics/wrapper.py b/snuba/utils/metrics/wrapper.py index fcbbd359e2..23563e4794 100644 --- a/snuba/utils/metrics/wrapper.py +++ b/snuba/utils/metrics/wrapper.py @@ -30,21 +30,48 @@ def __merge_tags(self, tags: Optional[Tags]) -> Optional[Tags]: return {**tags, **self.__tags} def increment( - self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float] = 1, + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: self.__backend.increment( - self.__merge_name(name), value, self.__merge_tags(tags) + self.__merge_name(name), value, self.__merge_tags(tags), unit ) def gauge( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: - self.__backend.gauge(self.__merge_name(name), value, self.__merge_tags(tags)) + self.__backend.gauge( + self.__merge_name(name), value, self.__merge_tags(tags), unit + ) def timing( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: - self.__backend.timing(self.__merge_name(name), value, self.__merge_tags(tags)) + self.__backend.timing( + self.__merge_name(name), value, self.__merge_tags(tags), unit + ) + + def distribution( + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, + ) -> None: + self.__backend.distribution( + self.__merge_name(name), value, self.__merge_tags(tags), unit + ) def events( self, diff --git a/tests/backends/metrics.py b/tests/backends/metrics.py index 0fa2de41fd..843ccdda60 100644 --- a/tests/backends/metrics.py +++ b/tests/backends/metrics.py @@ -8,18 +8,28 @@ class Increment(NamedTuple): name: str value: Union[int, float] tags: Optional[Tags] + unit: Optional[str] = None class Gauge(NamedTuple): name: str value: Union[int, float] tags: Optional[Tags] + unit: Optional[str] = None class Timing(NamedTuple): name: str value: Union[int, float] tags: Optional[Tags] + unit: Optional[str] = None + + +class Distribution(NamedTuple): + name: str + value: Union[int, float] + tags: Optional[Tags] + unit: Optional[str] = None class Events(NamedTuple): @@ -40,22 +50,45 @@ class TestingMetricsBackend(MetricsBackend): # TODO: This might make sense to extend the dummy metrics backend. def __init__(self) -> None: - self.calls: MutableSequence[Union[Increment, Gauge, Timing, Events]] = [] + self.calls: MutableSequence[ + Union[Increment, Gauge, Timing, Distribution, Events] + ] = [] def increment( - self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float] = 1, + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: - self.calls.append(Increment(name, value, tags)) + self.calls.append(Increment(name, value, tags, unit)) def gauge( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: - self.calls.append(Gauge(name, value, tags)) + self.calls.append(Gauge(name, value, tags, unit)) def timing( - self, name: str, value: Union[int, float], tags: Optional[Tags] = None + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, + ) -> None: + self.calls.append(Timing(name, value, tags, unit)) + + def distribution( + self, + name: str, + value: Union[int, float], + tags: Optional[Tags] = None, + unit: Optional[str] = None, ) -> None: - self.calls.append(Timing(name, value, tags)) + self.calls.append(Distribution(name, value, tags, unit)) def events( self, diff --git a/tests/subscriptions/test_executor_consumer.py b/tests/subscriptions/test_executor_consumer.py index 20d118e39d..70384735b9 100644 --- a/tests/subscriptions/test_executor_consumer.py +++ b/tests/subscriptions/test_executor_consumer.py @@ -425,7 +425,6 @@ def test_max_concurrent_queries( total_concurrent_queries: int, expected_max_concurrent_queries: int, ) -> None: - calculated = calculate_max_concurrent_queries( assigned_partition_count, total_partition_count, total_concurrent_queries )