From 05355c427aa49d8dca69c312755bf6df41a8468b Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 9 Aug 2023 13:02:30 +0200 Subject: [PATCH] ref(on-demand): Improve query builder integration (#54441) --- src/sentry/search/events/builder/discover.py | 2 + src/sentry/search/events/builder/metrics.py | 197 ++++++++------- .../search/events/builder/test_metrics.py | 229 ++++++++++++++++-- 3 files changed, 320 insertions(+), 108 deletions(-) diff --git a/src/sentry/search/events/builder/discover.py b/src/sentry/search/events/builder/discover.py index 3f6cb175fd52a5..7f5efacaff5164 100644 --- a/src/sentry/search/events/builder/discover.py +++ b/src/sentry/search/events/builder/discover.py @@ -241,6 +241,7 @@ def __init__( self.on_demand_metrics_enabled = on_demand_metrics_enabled self.auto_fields = auto_fields self.query = query + self.selected_columns = selected_columns self.groupby_columns = groupby_columns self.functions_acl = set() if functions_acl is None else functions_acl self.equation_config = {} if equation_config is None else equation_config @@ -315,6 +316,7 @@ def get_default_converter(self) -> Callable[[event_search.SearchFilter], Optiona def resolve_time_conditions(self) -> None: if self.skip_time_conditions: return + # start/end are required so that we can run a query in a reasonable amount of time if self.params.start is None or self.params.end is None: raise InvalidSearchQuery("Cannot query without a valid date range") diff --git a/src/sentry/search/events/builder/metrics.py b/src/sentry/search/events/builder/metrics.py index 4023725218975f..6b65841f2afbe7 100644 --- a/src/sentry/search/events/builder/metrics.py +++ b/src/sentry/search/events/builder/metrics.py @@ -3,6 +3,7 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Union import sentry_sdk +from django.utils.functional import cached_property from snuba_sdk import ( AliasedExpression, And, @@ -83,15 +84,6 @@ def __init__( if granularity is not None: self._granularity = granularity - self._on_demand_spec = self._resolve_on_demand_spec( - dataset, - kwargs.get("selected_columns", []), - kwargs.get("query", ""), - kwargs.get("on_demand_metrics_enabled", False), - ) - - self.use_on_demand_metrics = self._on_demand_spec is not None - super().__init__( # TODO: defaulting to Metrics for now so I don't have to update incidents tests. Should be # PerformanceMetrics @@ -107,33 +99,33 @@ def __init__( raise InvalidSearchQuery("Organization id required to create a metrics query") self.organization_id: int = org_id - def _resolve_on_demand_spec( - self, - dataset: Optional[Dataset], - selected_cols: List[Optional[str]], - query: str, - on_demand_metrics_enabled: bool, - ) -> Optional[OndemandMetricSpec]: - if not on_demand_metrics_enabled: + @cached_property + def _on_demand_metric_spec(self) -> Optional[OndemandMetricSpec]: + if not self.on_demand_metrics_enabled: return None - field = selected_cols[0] if selected_cols else None + field = self.selected_columns[0] if self.selected_columns else None if not field: return None - if not is_on_demand_metric_query(dataset, field, query): + if self.query is None: + return None + + if not is_on_demand_metric_query(self.dataset, field, self.query): return None try: - return OndemandMetricSpec(field, query) + return OndemandMetricSpec(field, self.query) except Exception as e: sentry_sdk.capture_exception(e) return None - def _get_on_demand_metrics_query(self, snuba_query: Query) -> Optional[MetricsQuery]: - spec = self._on_demand_spec + def _get_metrics_query_from_on_demand_spec( + self, spec: OndemandMetricSpec, require_time_range: bool = True + ) -> MetricsQuery: + if self.params.organization is None: + raise InvalidSearchQuery("An on demand metrics query requires an organization") - # TimeseriesQueryBuilder specific parameters if isinstance(self, TimeseriesMetricQueryBuilder): limit = Limit(1) alias = "count" @@ -143,7 +135,19 @@ def _get_on_demand_metrics_query(self, snuba_query: Query) -> Optional[MetricsQu alias = spec.mri include_series = False - granularity = snuba_query.granularity or self.resolve_granularity() + # Since the query builder is very convoluted, we first try to get the start and end from the validated + # parameters but in case it's none it can be that the `skip_time_conditions` was True, thus in that case we + # try to see if start and end were supplied directly in the constructor. + start = self.start or self.params.start + end = self.end or self.params.end + + # The time range can be required or not, since the query generated by the builder can either be used to execute + # the query on its own (requiring a time range) or it can be used to get the snql code necessary to create a + # query subscription from the outside. + if require_time_range and (start is None or end is None): + raise InvalidSearchQuery( + "The on demand metric query requires a time range to be executed" + ) return MetricsQuery( select=[MetricField(spec.op, spec.mri, alias=alias)], @@ -156,13 +160,13 @@ def _get_on_demand_metrics_query(self, snuba_query: Query) -> Optional[MetricsQu ], limit=limit, offset=self.offset, - granularity=granularity, + granularity=self.granularity, is_alerts_query=True, org_id=self.params.organization.id, project_ids=[p.id for p in self.params.projects], include_series=include_series, - start=self.params.start, - end=self.params.end, + start=start, + end=end, ) def validate_aggregate_arguments(self) -> None: @@ -191,29 +195,36 @@ def resolve_query( equations: Optional[List[str]] = None, orderby: Optional[List[str]] = None, ) -> None: + # Resolutions that we always must perform, irrespectively of on demand. with sentry_sdk.start_span(op="QueryBuilder", description="resolve_time_conditions"): # Has to be done early, since other conditions depend on start and end self.resolve_time_conditions() - with sentry_sdk.start_span(op="QueryBuilder", description="resolve_conditions"): - self.where, self.having = self.resolve_conditions( - query, use_aggregate_conditions=use_aggregate_conditions - ) with sentry_sdk.start_span(op="QueryBuilder", description="resolve_granularity"): # Needs to happen before params and after time conditions since granularity can change start&end self.granularity = self.resolve_granularity() - with sentry_sdk.start_span(op="QueryBuilder", description="resolve_params"): - # params depends on parse_query, and conditions being resolved first since there may be projects in conditions - self.where += self.resolve_params() - with sentry_sdk.start_span(op="QueryBuilder", description="resolve_columns"): - self.columns = self.resolve_select(selected_columns, equations) - with sentry_sdk.start_span(op="QueryBuilder", description="resolve_orderby"): - self.orderby = self.resolve_orderby(orderby) - with sentry_sdk.start_span(op="QueryBuilder", description="resolve_groupby"): - self.groupby = self.resolve_groupby(groupby_columns) + + # Resolutions that we will perform only in case the query is not on demand. The reasoning for this is that + # for building an on demand query we only require a time interval and granularity. All the other fields are + # automatically computed given the OndemandMetricSpec. + if not self._on_demand_metric_spec: + with sentry_sdk.start_span(op="QueryBuilder", description="resolve_conditions"): + self.where, self.having = self.resolve_conditions( + query, use_aggregate_conditions=use_aggregate_conditions + ) + with sentry_sdk.start_span(op="QueryBuilder", description="resolve_params"): + # params depends on parse_query, and conditions being resolved first since there may be projects + # in conditions + self.where += self.resolve_params() + with sentry_sdk.start_span(op="QueryBuilder", description="resolve_columns"): + self.columns = self.resolve_select(selected_columns, equations) + with sentry_sdk.start_span(op="QueryBuilder", description="resolve_orderby"): + self.orderby = self.resolve_orderby(orderby) + with sentry_sdk.start_span(op="QueryBuilder", description="resolve_groupby"): + self.groupby = self.resolve_groupby(groupby_columns) if len(self.metric_ids) > 0 and not self.use_metrics_layer: self.where.append( - # Metric id is intentionally sorted so we create consistent queries here both for testing & caching + # Metric id is intentionally sorted, so we create consistent queries here both for testing & caching. Condition(Column("metric_id"), Op.IN, sorted(self.metric_ids)) ) @@ -223,7 +234,7 @@ def resolve_column_name(self, col: str) -> str: col = tag_match.group("tag") if tag_match else col # on-demand metrics require metrics layer behavior - if self.use_metrics_layer or self.use_on_demand_metrics: + if self.use_metrics_layer or self._on_demand_metric_spec: if col in ["project_id", "timestamp"]: return col # TODO: update resolve params so this isn't needed @@ -591,7 +602,7 @@ def _environment_filter_converter(self, search_filter: SearchFilter) -> Optional else: return env_conditions[0] - def get_metrics_layer_snql_query(self) -> Request: + def get_metrics_layer_snql_query(self) -> Query: """ This method returns the metrics layer snql of the query being fed into the transformer and then into the metrics layer. @@ -602,7 +613,6 @@ def get_metrics_layer_snql_query(self) -> Request: This dialect should NEVER be used outside of the transformer as it will create problems if parsed by the snuba SDK. """ - if not self.use_metrics_layer and not self.on_demand_metrics_enabled: # The reasoning for this error is because if "use_metrics_layer" is false, the MQB will not generate the # snql dialect explained below as there is not need for that because it will directly generate normal snql @@ -613,38 +623,31 @@ def get_metrics_layer_snql_query(self) -> Request: self.validate_orderby_clause() prefix = "generic_" if self.dataset is Dataset.PerformanceMetrics else "" - return Request( - dataset=self.dataset.value, - app_id="default", - query=Query( - match=Entity(f"{prefix}metrics_distributions", sample=self.sample_rate), - # Metrics doesn't support columns in the select, and instead expects them in the groupby - select=self.aggregates - + [ - # Team key transaction is a special case sigh - col - for col in self.columns - if isinstance(col, Function) and col.function == "team_key_transaction" - ], - array_join=self.array_join, - where=self.where, - having=self.having, - groupby=self.groupby, - orderby=self.orderby, - limit=self.limit, - offset=self.offset, - limitby=self.limitby, - granularity=self.granularity, - ), - flags=Flags(turbo=self.turbo), - tenant_ids=self.tenant_ids, + return Query( + match=Entity(f"{prefix}metrics_distributions", sample=self.sample_rate), + # Metrics doesn't support columns in the select, and instead expects them in the groupby + select=self.aggregates + + [ + # Team key transaction is a special case sigh + col + for col in self.columns + if isinstance(col, Function) and col.function == "team_key_transaction" + ], + array_join=self.array_join, + where=self.where, + having=self.having, + groupby=self.groupby, + orderby=self.orderby, + limit=self.limit, + offset=self.offset, + limitby=self.limitby, + granularity=self.granularity, ) def get_snql_query(self) -> Request: """ This method returns the normal snql of the query being built for execution. """ - if self.use_metrics_layer: # The reasoning for this error is because if "use_metrics_layer" is true, the snql built within MQB will # be a slight variation of snql that is understood only by the "mqb_query_transformer" thus we don't @@ -816,8 +819,7 @@ def validate_orderby_clause(self) -> None: raise IncompatibleMetricsQuery("Can't orderby tags") def run_query(self, referrer: str, use_cache: bool = False) -> Any: - - if self.use_metrics_layer or self.use_on_demand_metrics: + if self.use_metrics_layer or self._on_demand_metric_spec: from sentry.snuba.metrics.datasource import get_series from sentry.snuba.metrics.mqb_query_transformer import ( transform_mqb_query_to_metrics_query, @@ -825,13 +827,16 @@ def run_query(self, referrer: str, use_cache: bool = False) -> Any: try: with sentry_sdk.start_span(op="metric_layer", description="transform_query"): - snuba_query = self.get_metrics_layer_snql_query() - if self.use_on_demand_metrics: - metrics_query = self._get_on_demand_metrics_query(snuba_query.query) + if self._on_demand_metric_spec: + metrics_query = self._get_metrics_query_from_on_demand_spec( + spec=self._on_demand_metric_spec, require_time_range=True + ) else: + intermediate_query = self.get_metrics_layer_snql_query() metrics_query = transform_mqb_query_to_metrics_query( - snuba_query.query, self.is_alerts_query + intermediate_query, self.is_alerts_query ) + with sentry_sdk.start_span(op="metric_layer", description="run_query"): metrics_data = get_series( projects=self.params.projects, @@ -1016,20 +1021,20 @@ def get_snql_query(self) -> Request: we are going to import the purposfully hidden SnubaQueryBuilder which is a component that takes a MetricsQuery and returns one or more equivalent snql query(ies). """ - - if self.use_metrics_layer or self.use_on_demand_metrics: + if self.use_metrics_layer or self._on_demand_metric_spec: from sentry.snuba.metrics import SnubaQueryBuilder from sentry.snuba.metrics.mqb_query_transformer import ( transform_mqb_query_to_metrics_query, ) - snuba_request = self.get_metrics_layer_snql_query() - - if self.use_on_demand_metrics: - metrics_query = self._get_on_demand_metrics_query(snuba_request.query) + if self._on_demand_metric_spec: + metrics_query = self._get_metrics_query_from_on_demand_spec( + spec=self._on_demand_metric_spec, require_time_range=False + ) else: + intermediate_query = self.get_metrics_layer_snql_query() metrics_query = transform_mqb_query_to_metrics_query( - snuba_request.query, is_alerts_query=self.is_alerts_query + intermediate_query, is_alerts_query=self.is_alerts_query ) snuba_queries, _ = SnubaQueryBuilder( @@ -1042,14 +1047,21 @@ def get_snql_query(self) -> Request: # If we have zero or more than one queries resulting from the supplied query, we want to generate # an error as we don't support this case. raise IncompatibleMetricsQuery( - "The metrics layer generated zero or multiple queries from the supplied query, only a single query is supported" + "The metrics layer generated zero or multiple queries from the supplied query, only a single " + "query is supported" ) # We take only the first query, supposing a single query is generated. entity = list(snuba_queries.keys())[0] - snuba_request.query = snuba_queries[entity]["totals"] - - return snuba_request + query = snuba_queries[entity]["totals"] + + return Request( + dataset=self.dataset.value, + app_id="default", + query=query, + flags=Flags(turbo=self.turbo), + tenant_ids=self.tenant_ids, + ) return super().get_snql_query() @@ -1247,19 +1259,20 @@ def get_snql_query(self) -> List[Request]: return queries def run_query(self, referrer: str, use_cache: bool = False) -> Any: - - if self.use_metrics_layer or self.use_on_demand_metrics: + if self.use_metrics_layer or self._on_demand_metric_spec: from sentry.snuba.metrics.datasource import get_series from sentry.snuba.metrics.mqb_query_transformer import ( transform_mqb_query_to_metrics_query, ) - snuba_query = self.get_snql_query()[0].query try: with sentry_sdk.start_span(op="metric_layer", description="transform_query"): - if self.use_on_demand_metrics: - metrics_query = self._get_on_demand_metrics_query(snuba_query) + if self._on_demand_metric_spec: + metrics_query = self._get_metrics_query_from_on_demand_spec( + spec=self._on_demand_metric_spec, require_time_range=True + ) elif self.use_metrics_layer: + snuba_query = self.get_snql_query()[0].query metrics_query = transform_mqb_query_to_metrics_query( snuba_query, self.is_alerts_query ) diff --git a/tests/sentry/search/events/builder/test_metrics.py b/tests/sentry/search/events/builder/test_metrics.py index 99754da321a7ff..36a896a1392d8f 100644 --- a/tests/sentry/search/events/builder/test_metrics.py +++ b/tests/sentry/search/events/builder/test_metrics.py @@ -21,7 +21,9 @@ from sentry.sentry_metrics.use_case_id_registry import UseCaseID from sentry.sentry_metrics.utils import resolve_tag_value from sentry.snuba.dataset import Dataset -from sentry.snuba.metrics.extraction import QUERY_HASH_KEY +from sentry.snuba.metrics import TransactionMRI +from sentry.snuba.metrics.extraction import QUERY_HASH_KEY, OndemandMetricSpec +from sentry.snuba.metrics.naming_layer import TransactionMetricKey from sentry.testutils.cases import MetricsEnhancedPerformanceTestCase pytestmark = pytest.mark.sentry_metrics @@ -1953,36 +1955,104 @@ def test_no_error_if_aggregates_disallowed_but_no_aggregates_included(self): allow_metric_aggregates=False, ) - def test_on_demand_metrics(self): + def test_run_query_with_on_demand_count(self): + field = "count()" + query = "transaction.duration:>0" + spec = OndemandMetricSpec(field=field, query=query) + + for hour in range(0, 5): + self.store_transaction_metric( + value=hour * 100, + metric=TransactionMetricKey.COUNT_ON_DEMAND.value, + internal_metric=TransactionMRI.COUNT_ON_DEMAND.value, + entity="metrics_counters", + tags={"query_hash": spec.query_hash()}, + timestamp=self.start + datetime.timedelta(hours=hour), + ) + + query = TimeseriesMetricQueryBuilder( + self.params, + dataset=Dataset.PerformanceMetrics, + interval=3600, + query=query, + selected_columns=[field], + on_demand_metrics_enabled=True, + ) + result = query.run_query("test_query") + assert result["data"][:5] == [ + { + "time": self.start.isoformat(), + "count": 0.0, + }, + { + "time": (self.start + datetime.timedelta(hours=1)).isoformat(), + "count": 100.0, + }, + { + "time": (self.start + datetime.timedelta(hours=2)).isoformat(), + "count": 200.0, + }, + { + "time": (self.start + datetime.timedelta(hours=3)).isoformat(), + "count": 300.0, + }, + { + "time": (self.start + datetime.timedelta(hours=4)).isoformat(), + "count": 400.0, + }, + ] + self.assertCountEqual( + result["meta"], + [ + {"name": "time", "type": "DateTime('Universal')"}, + {"name": "count", "type": "Float64"}, + ], + ) + + def test_run_query_with_on_demand_distribution(self): + field = "p75(measurements.fp)" + query = "transaction.duration:>0" + spec = OndemandMetricSpec(field=field, query=query) + + for hour in range(0, 5): + self.store_transaction_metric( + value=hour * 100, + metric=TransactionMetricKey.DIST_ON_DEMAND.value, + internal_metric=TransactionMRI.DIST_ON_DEMAND.value, + entity="metrics_distributions", + tags={"query_hash": spec.query_hash()}, + timestamp=self.start + datetime.timedelta(hours=hour), + ) + query = TimeseriesMetricQueryBuilder( self.params, dataset=Dataset.PerformanceMetrics, interval=3600, - query="transaction.duration:>0", - selected_columns=["count()"], + query=query, + selected_columns=[field], on_demand_metrics_enabled=True, ) result = query.run_query("test_query") assert result["data"][:5] == [ { "time": self.start.isoformat(), - "count": 0, + "count": 0.0, }, { "time": (self.start + datetime.timedelta(hours=1)).isoformat(), - "count": 0, + "count": 100.0, }, { "time": (self.start + datetime.timedelta(hours=2)).isoformat(), - "count": 0, + "count": 200.0, }, { "time": (self.start + datetime.timedelta(hours=3)).isoformat(), - "count": 0, + "count": 300.0, }, { "time": (self.start + datetime.timedelta(hours=4)).isoformat(), - "count": 0, + "count": 400.0, }, ] self.assertCountEqual( @@ -2099,35 +2169,110 @@ def test_query_normal_distribution(self): class AlertMetricsQueryBuilderTest(MetricBuilderBaseTest): - def test_run_on_demand_query(self): + def test_run_query_with_on_demand_distribution(self): + field = "p75(measurements.fp)" + query = "transaction.duration:>=100" + spec = OndemandMetricSpec(field=field, query=query) + + self.store_transaction_metric( + value=200, + metric=TransactionMetricKey.DIST_ON_DEMAND.value, + internal_metric=TransactionMRI.DIST_ON_DEMAND.value, + entity="metrics_distributions", + tags={"query_hash": spec.query_hash()}, + timestamp=self.start + datetime.timedelta(minutes=15), + ) + query = AlertMetricsQueryBuilder( self.params, use_metrics_layer=False, granularity=3600, - query="transaction.duration:>=100", + query=query, dataset=Dataset.PerformanceMetrics, - selected_columns=["p75(measurements.fp)"], + selected_columns=[field], on_demand_metrics_enabled=True, + skip_time_conditions=False, ) result = query.run_query("test_query") - assert len(result["data"]) == 1 - + assert result["data"] == [{"d:transactions/on_demand@none": 200.0}] meta = result["meta"] - assert len(meta) == 1 assert meta[0]["name"] == "d:transactions/on_demand@none" - def test_get_snql_query(self): + def test_run_query_with_on_demand_count(self): + field = "count(measurements.fp)" + query = "transaction.duration:>=100" + spec = OndemandMetricSpec(field=field, query=query) + + self.store_transaction_metric( + value=100, + metric=TransactionMetricKey.COUNT_ON_DEMAND.value, + internal_metric=TransactionMRI.COUNT_ON_DEMAND.value, + entity="metrics_counters", + tags={"query_hash": spec.query_hash()}, + timestamp=self.start + datetime.timedelta(minutes=15), + ) + query = AlertMetricsQueryBuilder( self.params, use_metrics_layer=False, granularity=3600, + query=query, + dataset=Dataset.PerformanceMetrics, + selected_columns=[field], + on_demand_metrics_enabled=True, + skip_time_conditions=False, + ) + + result = query.run_query("test_query") + + assert result["data"] == [{"c:transactions/on_demand@none": 100.0}] + meta = result["meta"] + assert len(meta) == 1 + assert meta[0]["name"] == "c:transactions/on_demand@none" + + def test_get_run_query_with_on_demand_count_and_time_range_required_and_not_supplied(self): + params = { + "organization_id": self.organization.id, + "project_id": self.projects, + } + + query = AlertMetricsQueryBuilder( + params, + use_metrics_layer=False, + granularity=3600, + query="transaction.duration:>=100", + dataset=Dataset.PerformanceMetrics, + selected_columns=["count(transaction.duration)"], + on_demand_metrics_enabled=True, + # We set here the skipping of conditions, since this is true for alert subscriptions, but we want to verify + # whether our secondary error barrier works. + skip_time_conditions=True, + ) + + with pytest.raises(IncompatibleMetricsQuery): + query.run_query("test_query") + + def test_get_snql_query_with_on_demand_distribution_and_time_range_not_required_and_not_supplied( + self, + ): + params = { + "organization_id": self.organization.id, + "project_id": self.projects, + } + query = AlertMetricsQueryBuilder( + params, + use_metrics_layer=False, + granularity=3600, query="transaction.duration:>=100", dataset=Dataset.PerformanceMetrics, selected_columns=["p75(measurements.fp)"], on_demand_metrics_enabled=True, + # We want to test the snql generation when a time range is not supplied, which is the case for alert + # subscriptions. + skip_time_conditions=True, ) snql_request = query.get_snql_query() @@ -2170,3 +2315,55 @@ def test_get_snql_query(self): ) assert query_hash_clause in snql_query.where + + def test_get_snql_query_with_on_demand_count_and_time_range_required_and_supplied(self): + query = AlertMetricsQueryBuilder( + self.params, + use_metrics_layer=False, + granularity=3600, + query="transaction.duration:>=100", + dataset=Dataset.PerformanceMetrics, + selected_columns=["count(transaction.duration)"], + on_demand_metrics_enabled=True, + # We want to test the snql generation when a time range is supplied. + skip_time_conditions=False, + ) + + snql_request = query.get_snql_query() + assert snql_request.dataset == "generic_metrics" + snql_query = snql_request.query + self.assertCountEqual( + [ + Function( + "sumIf", + [ + Column("value"), + Function( + "equals", + [ + Column("metric_id"), + indexer.resolve( + UseCaseID.TRANSACTIONS, + None, + "c:transactions/on_demand@none", + ), + ], + ), + ], + "c:transactions/on_demand@none", + ) + ], + snql_query.select, + ) + + query_hash_index = indexer.resolve(UseCaseID.TRANSACTIONS, None, QUERY_HASH_KEY) + + start_time_clause = Condition(lhs=Column(name="timestamp"), op=Op.GTE, rhs=self.start) + end_time_clause = Condition(lhs=Column(name="timestamp"), op=Op.LT, rhs=self.end) + query_hash_clause = Condition( + lhs=Column(name=f"tags_raw[{query_hash_index}]"), op=Op.EQ, rhs="88f3eb66" + ) + + assert start_time_clause in snql_query.where + assert end_time_clause in snql_query.where + assert query_hash_clause in snql_query.where