Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(on-demand): Improve query builder integration #54441

Merged
merged 8 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/sentry/search/events/builder/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def __init__(
self.on_demand_metrics_enabled = on_demand_metrics_enabled
self.auto_fields = auto_fields
self.query = query
self.selected_columns = selected_columns
self.groupby_columns = groupby_columns
self.functions_acl = set() if functions_acl is None else functions_acl
self.equation_config = {} if equation_config is None else equation_config
Expand Down Expand Up @@ -315,6 +316,7 @@ def get_default_converter(self) -> Callable[[event_search.SearchFilter], Optiona
def resolve_time_conditions(self) -> None:
if self.skip_time_conditions:
return

# start/end are required so that we can run a query in a reasonable amount of time
if self.params.start is None or self.params.end is None:
raise InvalidSearchQuery("Cannot query without a valid date range")
Expand Down
197 changes: 105 additions & 92 deletions src/sentry/search/events/builder/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Union

import sentry_sdk
from django.utils.functional import cached_property
from snuba_sdk import (
AliasedExpression,
And,
Expand Down Expand Up @@ -83,15 +84,6 @@ def __init__(
if granularity is not None:
self._granularity = granularity

self._on_demand_spec = self._resolve_on_demand_spec(
dataset,
kwargs.get("selected_columns", []),
kwargs.get("query", ""),
kwargs.get("on_demand_metrics_enabled", False),
)

self.use_on_demand_metrics = self._on_demand_spec is not None

super().__init__(
# TODO: defaulting to Metrics for now so I don't have to update incidents tests. Should be
# PerformanceMetrics
Expand All @@ -107,33 +99,33 @@ def __init__(
raise InvalidSearchQuery("Organization id required to create a metrics query")
self.organization_id: int = org_id

def _resolve_on_demand_spec(
self,
dataset: Optional[Dataset],
selected_cols: List[Optional[str]],
query: str,
on_demand_metrics_enabled: bool,
) -> Optional[OndemandMetricSpec]:
if not on_demand_metrics_enabled:
@cached_property
def _on_demand_metric_spec(self) -> Optional[OndemandMetricSpec]:
if not self.on_demand_metrics_enabled:
return None

field = selected_cols[0] if selected_cols else None
field = self.selected_columns[0] if self.selected_columns else None
if not field:
return None

if not is_on_demand_metric_query(dataset, field, query):
if self.query is None:
return None

if not is_on_demand_metric_query(self.dataset, field, self.query):
return None

try:
return OndemandMetricSpec(field, query)
return OndemandMetricSpec(field, self.query)
except Exception as e:
sentry_sdk.capture_exception(e)
return None

def _get_on_demand_metrics_query(self, snuba_query: Query) -> Optional[MetricsQuery]:
spec = self._on_demand_spec
def _get_metrics_query_from_on_demand_spec(
self, spec: OndemandMetricSpec, require_time_range: bool = True
) -> MetricsQuery:
if self.params.organization is None:
raise InvalidSearchQuery("An on demand metrics query requires an organization")

# TimeseriesQueryBuilder specific parameters
if isinstance(self, TimeseriesMetricQueryBuilder):
limit = Limit(1)
alias = "count"
Expand All @@ -143,7 +135,19 @@ def _get_on_demand_metrics_query(self, snuba_query: Query) -> Optional[MetricsQu
alias = spec.mri
include_series = False

granularity = snuba_query.granularity or self.resolve_granularity()
# Since the query builder is very convoluted, we first try to get the start and end from the validated
# parameters but in case it's none it can be that the `skip_time_conditions` was True, thus in that case we
# try to see if start and end were supplied directly in the constructor.
start = self.start or self.params.start
end = self.end or self.params.end

# The time range can be required or not, since the query generated by the builder can either be used to execute
# the query on its own (requiring a time range) or it can be used to get the snql code necessary to create a
# query subscription from the outside.
if require_time_range and (start is None or end is None):
raise InvalidSearchQuery(
"The on demand metric query requires a time range to be executed"
)

return MetricsQuery(
select=[MetricField(spec.op, spec.mri, alias=alias)],
Expand All @@ -156,13 +160,13 @@ def _get_on_demand_metrics_query(self, snuba_query: Query) -> Optional[MetricsQu
],
limit=limit,
offset=self.offset,
granularity=granularity,
granularity=self.granularity,
is_alerts_query=True,
org_id=self.params.organization.id,
project_ids=[p.id for p in self.params.projects],
include_series=include_series,
start=self.params.start,
end=self.params.end,
start=start,
end=end,
)

def validate_aggregate_arguments(self) -> None:
Expand Down Expand Up @@ -191,29 +195,36 @@ def resolve_query(
equations: Optional[List[str]] = None,
orderby: Optional[List[str]] = None,
) -> None:
# Resolutions that we always must perform, irrespectively of on demand.
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_time_conditions"):
# Has to be done early, since other conditions depend on start and end
self.resolve_time_conditions()
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_conditions"):
self.where, self.having = self.resolve_conditions(
query, use_aggregate_conditions=use_aggregate_conditions
)
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_granularity"):
# Needs to happen before params and after time conditions since granularity can change start&end
self.granularity = self.resolve_granularity()
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_params"):
# params depends on parse_query, and conditions being resolved first since there may be projects in conditions
self.where += self.resolve_params()
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_columns"):
self.columns = self.resolve_select(selected_columns, equations)
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_orderby"):
self.orderby = self.resolve_orderby(orderby)
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_groupby"):
self.groupby = self.resolve_groupby(groupby_columns)

# Resolutions that we will perform only in case the query is not on demand. The reasoning for this is that
# for building an on demand query we only require a time interval and granularity. All the other fields are
# automatically computed given the OndemandMetricSpec.
if not self._on_demand_metric_spec:
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_conditions"):
self.where, self.having = self.resolve_conditions(
query, use_aggregate_conditions=use_aggregate_conditions
)
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_params"):
# params depends on parse_query, and conditions being resolved first since there may be projects
# in conditions
self.where += self.resolve_params()
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_columns"):
self.columns = self.resolve_select(selected_columns, equations)
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_orderby"):
self.orderby = self.resolve_orderby(orderby)
with sentry_sdk.start_span(op="QueryBuilder", description="resolve_groupby"):
self.groupby = self.resolve_groupby(groupby_columns)

if len(self.metric_ids) > 0 and not self.use_metrics_layer:
self.where.append(
# Metric id is intentionally sorted so we create consistent queries here both for testing & caching
# Metric id is intentionally sorted, so we create consistent queries here both for testing & caching.
Condition(Column("metric_id"), Op.IN, sorted(self.metric_ids))
)

Expand All @@ -223,7 +234,7 @@ def resolve_column_name(self, col: str) -> str:
col = tag_match.group("tag") if tag_match else col

# on-demand metrics require metrics layer behavior
if self.use_metrics_layer or self.use_on_demand_metrics:
if self.use_metrics_layer or self._on_demand_metric_spec:
if col in ["project_id", "timestamp"]:
return col
# TODO: update resolve params so this isn't needed
Expand Down Expand Up @@ -591,7 +602,7 @@ def _environment_filter_converter(self, search_filter: SearchFilter) -> Optional
else:
return env_conditions[0]

def get_metrics_layer_snql_query(self) -> Request:
def get_metrics_layer_snql_query(self) -> Query:
"""
This method returns the metrics layer snql of the query being fed into the transformer and then into the metrics
layer.
Expand All @@ -602,7 +613,6 @@ def get_metrics_layer_snql_query(self) -> Request:
This dialect should NEVER be used outside of the transformer as it will create problems if parsed by the
snuba SDK.
"""

if not self.use_metrics_layer and not self.on_demand_metrics_enabled:
# The reasoning for this error is because if "use_metrics_layer" is false, the MQB will not generate the
# snql dialect explained below as there is not need for that because it will directly generate normal snql
Expand All @@ -613,38 +623,31 @@ def get_metrics_layer_snql_query(self) -> Request:
self.validate_orderby_clause()

prefix = "generic_" if self.dataset is Dataset.PerformanceMetrics else ""
return Request(
dataset=self.dataset.value,
app_id="default",
query=Query(
match=Entity(f"{prefix}metrics_distributions", sample=self.sample_rate),
# Metrics doesn't support columns in the select, and instead expects them in the groupby
select=self.aggregates
+ [
# Team key transaction is a special case sigh
col
for col in self.columns
if isinstance(col, Function) and col.function == "team_key_transaction"
],
array_join=self.array_join,
where=self.where,
having=self.having,
groupby=self.groupby,
orderby=self.orderby,
limit=self.limit,
offset=self.offset,
limitby=self.limitby,
granularity=self.granularity,
),
flags=Flags(turbo=self.turbo),
tenant_ids=self.tenant_ids,
return Query(
match=Entity(f"{prefix}metrics_distributions", sample=self.sample_rate),
# Metrics doesn't support columns in the select, and instead expects them in the groupby
select=self.aggregates
+ [
# Team key transaction is a special case sigh
col
for col in self.columns
if isinstance(col, Function) and col.function == "team_key_transaction"
],
array_join=self.array_join,
where=self.where,
having=self.having,
groupby=self.groupby,
orderby=self.orderby,
limit=self.limit,
offset=self.offset,
limitby=self.limitby,
granularity=self.granularity,
)

def get_snql_query(self) -> Request:
"""
This method returns the normal snql of the query being built for execution.
"""

if self.use_metrics_layer:
# The reasoning for this error is because if "use_metrics_layer" is true, the snql built within MQB will
# be a slight variation of snql that is understood only by the "mqb_query_transformer" thus we don't
Expand Down Expand Up @@ -816,22 +819,24 @@ def validate_orderby_clause(self) -> None:
raise IncompatibleMetricsQuery("Can't orderby tags")

def run_query(self, referrer: str, use_cache: bool = False) -> Any:

if self.use_metrics_layer or self.use_on_demand_metrics:
if self.use_metrics_layer or self._on_demand_metric_spec:
from sentry.snuba.metrics.datasource import get_series
from sentry.snuba.metrics.mqb_query_transformer import (
transform_mqb_query_to_metrics_query,
)

try:
with sentry_sdk.start_span(op="metric_layer", description="transform_query"):
snuba_query = self.get_metrics_layer_snql_query()
if self.use_on_demand_metrics:
metrics_query = self._get_on_demand_metrics_query(snuba_query.query)
if self._on_demand_metric_spec:
metrics_query = self._get_metrics_query_from_on_demand_spec(
spec=self._on_demand_metric_spec, require_time_range=True
)
else:
intermediate_query = self.get_metrics_layer_snql_query()
metrics_query = transform_mqb_query_to_metrics_query(
snuba_query.query, self.is_alerts_query
intermediate_query, self.is_alerts_query
)

with sentry_sdk.start_span(op="metric_layer", description="run_query"):
metrics_data = get_series(
projects=self.params.projects,
Expand Down Expand Up @@ -1016,20 +1021,20 @@ def get_snql_query(self) -> Request:
we are going to import the purposfully hidden SnubaQueryBuilder which is a component that takes a MetricsQuery
and returns one or more equivalent snql query(ies).
"""

if self.use_metrics_layer or self.use_on_demand_metrics:
if self.use_metrics_layer or self._on_demand_metric_spec:
from sentry.snuba.metrics import SnubaQueryBuilder
from sentry.snuba.metrics.mqb_query_transformer import (
transform_mqb_query_to_metrics_query,
)

snuba_request = self.get_metrics_layer_snql_query()

if self.use_on_demand_metrics:
metrics_query = self._get_on_demand_metrics_query(snuba_request.query)
if self._on_demand_metric_spec:
metrics_query = self._get_metrics_query_from_on_demand_spec(
spec=self._on_demand_metric_spec, require_time_range=False
)
else:
intermediate_query = self.get_metrics_layer_snql_query()
metrics_query = transform_mqb_query_to_metrics_query(
snuba_request.query, is_alerts_query=self.is_alerts_query
intermediate_query, is_alerts_query=self.is_alerts_query
)

snuba_queries, _ = SnubaQueryBuilder(
Expand All @@ -1042,14 +1047,21 @@ def get_snql_query(self) -> Request:
# If we have zero or more than one queries resulting from the supplied query, we want to generate
# an error as we don't support this case.
raise IncompatibleMetricsQuery(
"The metrics layer generated zero or multiple queries from the supplied query, only a single query is supported"
"The metrics layer generated zero or multiple queries from the supplied query, only a single "
"query is supported"
)

# We take only the first query, supposing a single query is generated.
entity = list(snuba_queries.keys())[0]
snuba_request.query = snuba_queries[entity]["totals"]

return snuba_request
query = snuba_queries[entity]["totals"]

return Request(
dataset=self.dataset.value,
app_id="default",
query=query,
flags=Flags(turbo=self.turbo),
tenant_ids=self.tenant_ids,
)

return super().get_snql_query()

Expand Down Expand Up @@ -1247,19 +1259,20 @@ def get_snql_query(self) -> List[Request]:
return queries

def run_query(self, referrer: str, use_cache: bool = False) -> Any:

if self.use_metrics_layer or self.use_on_demand_metrics:
if self.use_metrics_layer or self._on_demand_metric_spec:
from sentry.snuba.metrics.datasource import get_series
from sentry.snuba.metrics.mqb_query_transformer import (
transform_mqb_query_to_metrics_query,
)

snuba_query = self.get_snql_query()[0].query
try:
with sentry_sdk.start_span(op="metric_layer", description="transform_query"):
if self.use_on_demand_metrics:
metrics_query = self._get_on_demand_metrics_query(snuba_query)
if self._on_demand_metric_spec:
metrics_query = self._get_metrics_query_from_on_demand_spec(
spec=self._on_demand_metric_spec, require_time_range=True
)
elif self.use_metrics_layer:
snuba_query = self.get_snql_query()[0].query
metrics_query = transform_mqb_query_to_metrics_query(
snuba_query, self.is_alerts_query
)
Expand Down
Loading
Loading