diff --git a/snuba/web/rpc/common/eap_execute.py b/snuba/web/rpc/common/eap_execute.py new file mode 100644 index 0000000000..e9eff6d9e8 --- /dev/null +++ b/snuba/web/rpc/common/eap_execute.py @@ -0,0 +1,84 @@ +import uuid +from concurrent.futures import Future, ThreadPoolExecutor +from typing import Any, Literal, MutableMapping, Optional + +from snuba import settings +from snuba.attribution import AppID +from snuba.attribution.attribution_info import AttributionInfo +from snuba.datasets.entities.entity_key import EntityKey +from snuba.datasets.entities.factory import get_entity +from snuba.datasets.pluggable_dataset import PluggableDataset +from snuba.query.data_source.simple import Entity +from snuba.query.logical import Query +from snuba.query.query_settings import HTTPQuerySettings +from snuba.request import Request +from snuba.utils.metrics.timer import Timer +from snuba.web import QueryResult +from snuba.web.query import run_query + +eap_executor = ThreadPoolExecutor(max_workers=settings.CLICKHOUSE_MAX_POOL_SIZE) + + +def run_eap_query( + dataset: Literal["eap_spans", "spans_str_attrs", "spans_num_attrs"], + query: Query, + original_body: dict[str, Any], + referrer: str, + organization_id: int, + parent_api: str, + timer: Timer, + clickhouse_settings: Optional[MutableMapping[str, Any]] = None, +) -> QueryResult: + entity = Entity( + key=EntityKey("eap_spans"), + schema=get_entity(EntityKey("eap_spans")).get_data_model(), + sample=None, + ) + query_settings = HTTPQuerySettings() + if clickhouse_settings is not None: + query_settings.set_clickhouse_settings(clickhouse_settings) + + query.set_from_clause(entity) + + request = Request( + id=str(uuid.uuid4()), + original_body=original_body, + query=query, + query_settings=query_settings, + attribution_info=AttributionInfo( + referrer=referrer, + team="eap", + feature="eap", + tenant_ids={ + "organization_id": organization_id, + "referrer": referrer, + }, + app_id=AppID("eap"), + parent_api=parent_api, + ), + ) + + return run_query(PluggableDataset(name=dataset, all_entities=[]), request, timer) + + +def run_eap_query_async( + dataset: Literal["eap_spans", "spans_str_attrs", "spans_num_attrs"], + query: Query, + original_body: dict[str, Any], + referrer: str, + organization_id: int, + parent_api: str, + timer: Timer, + clickhouse_settings: Optional[MutableMapping[str, Any]] = None, +) -> Future[QueryResult]: + return eap_executor.submit( + run_eap_query, + dataset=dataset, + query=query, + original_body=original_body, + referrer=referrer, + organization_id=organization_id, + parent_api=parent_api, + timer=timer, + clickhouse_settings=clickhouse_settings, + ) diff --git a/snuba/web/rpc/v1alpha/timeseries/timeseries.py b/snuba/web/rpc/v1alpha/timeseries/timeseries.py index 30a89f0f6f..2882324bbe 100644 --- a/snuba/web/rpc/v1alpha/timeseries/timeseries.py +++ b/snuba/web/rpc/v1alpha/timeseries/timeseries.py @@ -1,7 +1,6 @@ import itertools import time -import uuid -from typing import Any, Iterable +from typing import Any, Iterable, MutableMapping, Optional from google.protobuf.json_format import MessageToDict from sentry_protos.snuba.v1alpha.endpoint_aggregate_bucket_pb2 import ( @@ -9,45 +8,23 @@ AggregateBucketResponse, ) -from snuba.attribution.appid import AppID -from snuba.attribution.attribution_info import AttributionInfo -from snuba.datasets.entities.entity_key import EntityKey -from snuba.datasets.entities.factory import get_entity -from snuba.datasets.pluggable_dataset import PluggableDataset from snuba.query import SelectedExpression -from snuba.query.data_source.simple import Entity from snuba.query.dsl import and_cond from snuba.query.logical import Query -from snuba.query.query_settings import HTTPQuerySettings -from snuba.request import Request as SnubaRequest from snuba.utils.metrics.timer import Timer -from snuba.web.query import run_query from snuba.web.rpc.common.common import ( project_id_and_org_conditions, timestamp_in_range_condition, trace_item_filters_to_expression, treeify_or_and_conditions, ) +from snuba.web.rpc.common.eap_execute import run_eap_query_async from snuba.web.rpc.v1alpha.timeseries import aggregate_functions EIGHT_HOUR_GRANULARITY = 60 * 60 * 8 ONE_HOUR_GRANULARITY = 60 * 60 -class UnmergedTimeseriesQuerierResult: - def __init__(self, start_ts: int, end_ts: int, aggregate_results: list[Any]): - self.start_ts = start_ts - self.end_ts = end_ts - self.raw_aggregate_results = aggregate_results - - -class TimeseriesQuerierResult: - def __init__(self, start_ts: int, end_ts: int, aggregate_results: list[float]): - self.start_ts = start_ts - self.end_ts = end_ts - self.aggregate_results = aggregate_results - - class TimeseriesQuerier: def __init__(self, request: AggregateBucketRequest, timer: Timer): self.start_ts = request.meta.start_timestamp.seconds @@ -66,17 +43,11 @@ def __init__(self, request: AggregateBucketRequest, timer: Timer): self.referrer = request.meta.referrer self.organization_id = request.meta.organization_id - def aggregate_bucket_request( + def create_clickhouse_query( self, start_ts: int, end_ts: int, bucket_size_secs: int - ) -> SnubaRequest: - entity = Entity( - key=EntityKey("eap_spans"), - schema=get_entity(EntityKey("eap_spans")).get_data_model(), - sample=None, - ) - + ) -> Query: query = Query( - from_clause=entity, + from_clause=None, selected_columns=[ SelectedExpression( name=f"agg{i}", expression=self.aggregates[i].expression @@ -89,54 +60,26 @@ def aggregate_bucket_request( ), ) treeify_or_and_conditions(query) - settings = HTTPQuerySettings() - # we don't want to cache the "last bucket", we'll never get cache hits on it - if (end_ts - start_ts) % bucket_size_secs == 0: - clickhouse_settings = { - "use_query_cache": "true", - "query_cache_ttl": 60 * 5, # 5 minutes - } - # store things in the query cache long-term if they are >4 hours old - if end_ts < time.time() - 4 * 60 * 60: - clickhouse_settings["query_cache_ttl"] = ( - 90 * 24 * 60 * 60 - ) # store this query cache entry for 90 days - settings.set_clickhouse_settings(clickhouse_settings) - - return SnubaRequest( - id=str(uuid.uuid4()), - original_body=self.original_body, - query=query, - query_settings=settings, - attribution_info=AttributionInfo( - referrer=self.referrer, - team="eap", - feature="eap", - tenant_ids={ - "organization_id": self.organization_id, - "referrer": self.referrer, - }, - app_id=AppID("eap"), - parent_api="eap_timeseries", - ), - ) + return query - def execute( - self, start_ts: int, end_ts: int, bucket_size_secs: int - ) -> UnmergedTimeseriesQuerierResult: - data = run_query( - dataset=PluggableDataset(name="eap", all_entities=[]), - request=self.aggregate_bucket_request(start_ts, end_ts, bucket_size_secs), - timer=self.timer, - ).result["data"] - - return UnmergedTimeseriesQuerierResult( - start_ts=start_ts, - end_ts=end_ts, - aggregate_results=list( - data[0][f"agg{agg_idx}"] for agg_idx in range(len(self.aggregates)) - ), - ) + @staticmethod + def get_clickhouse_settings( + start_ts: int, is_full_bucket: bool, bucket_size_secs: int + ) -> Optional[MutableMapping[str, Any]]: + # we don't want to cache the "last bucket", we'll never get cache hits on it + if not is_full_bucket: + return None + + clickhouse_settings: MutableMapping[str, Any] = { + "use_query_cache": "true", + "query_cache_ttl": 60 * 5, # 5 minutes + } + # store things in the query cache long-term if they are >4 hours old + if start_ts + bucket_size_secs < time.time() - 4 * 60 * 60: + clickhouse_settings["query_cache_ttl"] = ( + 90 * 24 * 60 * 60 + ) # store this query cache entry for 90 days + return clickhouse_settings def get_request_granularity(self) -> int: if ( @@ -150,9 +93,9 @@ def get_request_granularity(self) -> int: def merge_results( self, - unmerged_results: Iterable[UnmergedTimeseriesQuerierResult], + unmerged_results: Iterable[list[Any]], request_granularity: int, - ) -> Iterable[TimeseriesQuerierResult]: + ) -> Iterable[list[float]]: # if we fulfilled a "1 day of data" request with 6 x 4 hour blocks, we need to merge those 6 back into # one big bucket to send back to the UI number_of_results_to_merge = self.granularity_secs // request_granularity @@ -160,14 +103,10 @@ def merge_results( chunk = list(itertools.islice(unmerged_results, number_of_results_to_merge)) if not chunk: return None - yield TimeseriesQuerierResult( - start_ts=chunk[0].start_ts, - end_ts=chunk[-1].end_ts, - aggregate_results=[ - agg.merge(x.raw_aggregate_results[agg_idx] for x in chunk) - for agg_idx, agg in enumerate(self.aggregates) - ], - ) + yield [ + agg.merge(x[agg_idx] for x in chunk) + for agg_idx, agg in enumerate(self.aggregates) + ] def run(self) -> AggregateBucketResponse: # if you request one day of data, we'd ideally like to split that up into 6 requests of 4 hours of data @@ -177,12 +116,30 @@ def run(self) -> AggregateBucketResponse: # into one big response (if necessary) request_granularity = self.get_request_granularity() - all_results: Iterable[UnmergedTimeseriesQuerierResult] = ( - self.execute( - start_ts, - min(start_ts + request_granularity, self.end_ts), - request_granularity, - ) + all_results: Iterable[list[Any]] = ( + [ + run_eap_query_async( + dataset="eap_spans", + query=self.create_clickhouse_query( + start_ts, + min(start_ts + request_granularity, self.end_ts), + request_granularity, + ), + clickhouse_settings=self.get_clickhouse_settings( + start_ts, + start_ts + request_granularity < self.end_ts, + request_granularity, + ), + referrer="eap.timeseries", + organization_id=self.organization_id, + parent_api="eap.timeseries", + timer=self.timer, + original_body=self.original_body, + ) + .result(60) + .result["data"][0][f"agg{agg_idx}"] + for agg_idx in range(len(self.aggregates)) + ] for start_ts in range( self.rounded_start_ts, self.end_ts, request_granularity ) @@ -191,9 +148,7 @@ def run(self) -> AggregateBucketResponse: merged_results = self.merge_results(all_results, request_granularity) # TODO: allow multiple aggregates once proto is done - return AggregateBucketResponse( - result=[float(r.aggregate_results[0]) for r in merged_results] - ) + return AggregateBucketResponse(result=[float(r[0]) for r in merged_results]) def timeseries_query( diff --git a/tests/web/rpc/v1alpha/test_timeseries_api.py b/tests/web/rpc/v1alpha/test_timeseries_api.py index 5a91b1c334..454a53d844 100644 --- a/tests/web/rpc/v1alpha/test_timeseries_api.py +++ b/tests/web/rpc/v1alpha/test_timeseries_api.py @@ -14,7 +14,7 @@ from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey -from snuba.web.query import run_query +from snuba.web.rpc.common.eap_execute import run_eap_query from snuba.web.rpc.v1alpha.timeseries.timeseries import timeseries_query from tests.base import BaseApiTest from tests.helpers import write_raw_unprocessed_events @@ -327,8 +327,8 @@ def test_query_caching( expected_result: list[float], ) -> None: with patch( - "snuba.web.rpc.v1alpha.timeseries.timeseries.run_query", - side_effect=run_query, + "snuba.web.rpc.common.eap_execute.run_eap_query", + side_effect=run_eap_query, ) as mocked_run_query: # this test does a daily aggregate on a week + 9 hours of data. # that is, the user is visiting the page at 09:01 GMT, so the 8 buckets returned would be: @@ -364,17 +364,13 @@ def test_query_caching( # we expect all buckets except the last one to be cached assert list( - call[1]["request"].query_settings.get_clickhouse_settings()[ - "use_query_cache" - ] + call[1]["clickhouse_settings"]["use_query_cache"] for call in mocked_run_query.call_args_list[:-1] ) == ["true"] * (3 * 7 + 1) # we expect all buckets >4 hours old to have a long TTL assert list( - call[1]["request"].query_settings.get_clickhouse_settings()[ - "query_cache_ttl" - ] + call[1]["clickhouse_settings"]["query_cache_ttl"] for call in mocked_run_query.call_args_list[:-2] ) == [90 * 24 * 60 * 60] * (3 * 7)