Skip to content

Commit

Permalink
feat: add a way of running EAP queries async with a thread pool (#6352)
Browse files Browse the repository at this point in the history
This is a performance optimization that removes most of the overhead of
running multiple concurrent clickhouse queries.

Benchmarked against `SNUBA_SETTINGS=test pytest -vv
tests/web/rpc/test_timeseries_api.py::TestTimeSeriesApi::test_query_caching`

before:
`took 688ms to run timeseries_query`

after:
`took 102ms to run timeseries_query`

This is a request that doesn't hit query cache at all and does ~25
requests.
  • Loading branch information
colin-sentry authored Sep 30, 2024
1 parent f464819 commit 517e298
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 109 deletions.
84 changes: 84 additions & 0 deletions snuba/web/rpc/common/eap_execute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import uuid
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Literal, MutableMapping, Optional

from snuba import settings
from snuba.attribution import AppID
from snuba.attribution.attribution_info import AttributionInfo
from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity
from snuba.datasets.pluggable_dataset import PluggableDataset
from snuba.query.data_source.simple import Entity
from snuba.query.logical import Query
from snuba.query.query_settings import HTTPQuerySettings
from snuba.request import Request
from snuba.utils.metrics.timer import Timer
from snuba.web import QueryResult
from snuba.web.query import run_query

eap_executor = ThreadPoolExecutor(max_workers=settings.CLICKHOUSE_MAX_POOL_SIZE)


def run_eap_query(
dataset: Literal["eap_spans", "spans_str_attrs", "spans_num_attrs"],
query: Query,
original_body: dict[str, Any],
referrer: str,
organization_id: int,
parent_api: str,
timer: Timer,
clickhouse_settings: Optional[MutableMapping[str, Any]] = None,
) -> QueryResult:
entity = Entity(
key=EntityKey("eap_spans"),
schema=get_entity(EntityKey("eap_spans")).get_data_model(),
sample=None,
)
query_settings = HTTPQuerySettings()
if clickhouse_settings is not None:
query_settings.set_clickhouse_settings(clickhouse_settings)

query.set_from_clause(entity)

request = Request(
id=str(uuid.uuid4()),
original_body=original_body,
query=query,
query_settings=query_settings,
attribution_info=AttributionInfo(
referrer=referrer,
team="eap",
feature="eap",
tenant_ids={
"organization_id": organization_id,
"referrer": referrer,
},
app_id=AppID("eap"),
parent_api=parent_api,
),
)

return run_query(PluggableDataset(name=dataset, all_entities=[]), request, timer)


def run_eap_query_async(
dataset: Literal["eap_spans", "spans_str_attrs", "spans_num_attrs"],
query: Query,
original_body: dict[str, Any],
referrer: str,
organization_id: int,
parent_api: str,
timer: Timer,
clickhouse_settings: Optional[MutableMapping[str, Any]] = None,
) -> Future[QueryResult]:
return eap_executor.submit(
run_eap_query,
dataset=dataset,
query=query,
original_body=original_body,
referrer=referrer,
organization_id=organization_id,
parent_api=parent_api,
timer=timer,
clickhouse_settings=clickhouse_settings,
)
155 changes: 55 additions & 100 deletions snuba/web/rpc/v1alpha/timeseries/timeseries.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,30 @@
import itertools
import time
import uuid
from typing import Any, Iterable
from typing import Any, Iterable, MutableMapping, Optional

from google.protobuf.json_format import MessageToDict
from sentry_protos.snuba.v1alpha.endpoint_aggregate_bucket_pb2 import (
AggregateBucketRequest,
AggregateBucketResponse,
)

from snuba.attribution.appid import AppID
from snuba.attribution.attribution_info import AttributionInfo
from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity
from snuba.datasets.pluggable_dataset import PluggableDataset
from snuba.query import SelectedExpression
from snuba.query.data_source.simple import Entity
from snuba.query.dsl import and_cond
from snuba.query.logical import Query
from snuba.query.query_settings import HTTPQuerySettings
from snuba.request import Request as SnubaRequest
from snuba.utils.metrics.timer import Timer
from snuba.web.query import run_query
from snuba.web.rpc.common.common import (
project_id_and_org_conditions,
timestamp_in_range_condition,
trace_item_filters_to_expression,
treeify_or_and_conditions,
)
from snuba.web.rpc.common.eap_execute import run_eap_query_async
from snuba.web.rpc.v1alpha.timeseries import aggregate_functions

EIGHT_HOUR_GRANULARITY = 60 * 60 * 8
ONE_HOUR_GRANULARITY = 60 * 60


class UnmergedTimeseriesQuerierResult:
def __init__(self, start_ts: int, end_ts: int, aggregate_results: list[Any]):
self.start_ts = start_ts
self.end_ts = end_ts
self.raw_aggregate_results = aggregate_results


class TimeseriesQuerierResult:
def __init__(self, start_ts: int, end_ts: int, aggregate_results: list[float]):
self.start_ts = start_ts
self.end_ts = end_ts
self.aggregate_results = aggregate_results


class TimeseriesQuerier:
def __init__(self, request: AggregateBucketRequest, timer: Timer):
self.start_ts = request.meta.start_timestamp.seconds
Expand All @@ -66,17 +43,11 @@ def __init__(self, request: AggregateBucketRequest, timer: Timer):
self.referrer = request.meta.referrer
self.organization_id = request.meta.organization_id

def aggregate_bucket_request(
def create_clickhouse_query(
self, start_ts: int, end_ts: int, bucket_size_secs: int
) -> SnubaRequest:
entity = Entity(
key=EntityKey("eap_spans"),
schema=get_entity(EntityKey("eap_spans")).get_data_model(),
sample=None,
)

) -> Query:
query = Query(
from_clause=entity,
from_clause=None,
selected_columns=[
SelectedExpression(
name=f"agg{i}", expression=self.aggregates[i].expression
Expand All @@ -89,54 +60,26 @@ def aggregate_bucket_request(
),
)
treeify_or_and_conditions(query)
settings = HTTPQuerySettings()
# we don't want to cache the "last bucket", we'll never get cache hits on it
if (end_ts - start_ts) % bucket_size_secs == 0:
clickhouse_settings = {
"use_query_cache": "true",
"query_cache_ttl": 60 * 5, # 5 minutes
}
# store things in the query cache long-term if they are >4 hours old
if end_ts < time.time() - 4 * 60 * 60:
clickhouse_settings["query_cache_ttl"] = (
90 * 24 * 60 * 60
) # store this query cache entry for 90 days
settings.set_clickhouse_settings(clickhouse_settings)

return SnubaRequest(
id=str(uuid.uuid4()),
original_body=self.original_body,
query=query,
query_settings=settings,
attribution_info=AttributionInfo(
referrer=self.referrer,
team="eap",
feature="eap",
tenant_ids={
"organization_id": self.organization_id,
"referrer": self.referrer,
},
app_id=AppID("eap"),
parent_api="eap_timeseries",
),
)
return query

def execute(
self, start_ts: int, end_ts: int, bucket_size_secs: int
) -> UnmergedTimeseriesQuerierResult:
data = run_query(
dataset=PluggableDataset(name="eap", all_entities=[]),
request=self.aggregate_bucket_request(start_ts, end_ts, bucket_size_secs),
timer=self.timer,
).result["data"]

return UnmergedTimeseriesQuerierResult(
start_ts=start_ts,
end_ts=end_ts,
aggregate_results=list(
data[0][f"agg{agg_idx}"] for agg_idx in range(len(self.aggregates))
),
)
@staticmethod
def get_clickhouse_settings(
start_ts: int, is_full_bucket: bool, bucket_size_secs: int
) -> Optional[MutableMapping[str, Any]]:
# we don't want to cache the "last bucket", we'll never get cache hits on it
if not is_full_bucket:
return None

clickhouse_settings: MutableMapping[str, Any] = {
"use_query_cache": "true",
"query_cache_ttl": 60 * 5, # 5 minutes
}
# store things in the query cache long-term if they are >4 hours old
if start_ts + bucket_size_secs < time.time() - 4 * 60 * 60:
clickhouse_settings["query_cache_ttl"] = (
90 * 24 * 60 * 60
) # store this query cache entry for 90 days
return clickhouse_settings

def get_request_granularity(self) -> int:
if (
Expand All @@ -150,24 +93,20 @@ def get_request_granularity(self) -> int:

def merge_results(
self,
unmerged_results: Iterable[UnmergedTimeseriesQuerierResult],
unmerged_results: Iterable[list[Any]],
request_granularity: int,
) -> Iterable[TimeseriesQuerierResult]:
) -> Iterable[list[float]]:
# if we fulfilled a "1 day of data" request with 6 x 4 hour blocks, we need to merge those 6 back into
# one big bucket to send back to the UI
number_of_results_to_merge = self.granularity_secs // request_granularity
while True:
chunk = list(itertools.islice(unmerged_results, number_of_results_to_merge))
if not chunk:
return None
yield TimeseriesQuerierResult(
start_ts=chunk[0].start_ts,
end_ts=chunk[-1].end_ts,
aggregate_results=[
agg.merge(x.raw_aggregate_results[agg_idx] for x in chunk)
for agg_idx, agg in enumerate(self.aggregates)
],
)
yield [
agg.merge(x[agg_idx] for x in chunk)
for agg_idx, agg in enumerate(self.aggregates)
]

def run(self) -> AggregateBucketResponse:
# if you request one day of data, we'd ideally like to split that up into 6 requests of 4 hours of data
Expand All @@ -177,12 +116,30 @@ def run(self) -> AggregateBucketResponse:
# into one big response (if necessary)
request_granularity = self.get_request_granularity()

all_results: Iterable[UnmergedTimeseriesQuerierResult] = (
self.execute(
start_ts,
min(start_ts + request_granularity, self.end_ts),
request_granularity,
)
all_results: Iterable[list[Any]] = (
[
run_eap_query_async(
dataset="eap_spans",
query=self.create_clickhouse_query(
start_ts,
min(start_ts + request_granularity, self.end_ts),
request_granularity,
),
clickhouse_settings=self.get_clickhouse_settings(
start_ts,
start_ts + request_granularity < self.end_ts,
request_granularity,
),
referrer="eap.timeseries",
organization_id=self.organization_id,
parent_api="eap.timeseries",
timer=self.timer,
original_body=self.original_body,
)
.result(60)
.result["data"][0][f"agg{agg_idx}"]
for agg_idx in range(len(self.aggregates))
]
for start_ts in range(
self.rounded_start_ts, self.end_ts, request_granularity
)
Expand All @@ -191,9 +148,7 @@ def run(self) -> AggregateBucketResponse:
merged_results = self.merge_results(all_results, request_granularity)

# TODO: allow multiple aggregates once proto is done
return AggregateBucketResponse(
result=[float(r.aggregate_results[0]) for r in merged_results]
)
return AggregateBucketResponse(result=[float(r[0]) for r in merged_results])


def timeseries_query(
Expand Down
14 changes: 5 additions & 9 deletions tests/web/rpc/v1alpha/test_timeseries_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.web.query import run_query
from snuba.web.rpc.common.eap_execute import run_eap_query
from snuba.web.rpc.v1alpha.timeseries.timeseries import timeseries_query
from tests.base import BaseApiTest
from tests.helpers import write_raw_unprocessed_events
Expand Down Expand Up @@ -327,8 +327,8 @@ def test_query_caching(
expected_result: list[float],
) -> None:
with patch(
"snuba.web.rpc.v1alpha.timeseries.timeseries.run_query",
side_effect=run_query,
"snuba.web.rpc.common.eap_execute.run_eap_query",
side_effect=run_eap_query,
) as mocked_run_query:
# this test does a daily aggregate on a week + 9 hours of data.
# that is, the user is visiting the page at 09:01 GMT, so the 8 buckets returned would be:
Expand Down Expand Up @@ -364,17 +364,13 @@ def test_query_caching(

# we expect all buckets except the last one to be cached
assert list(
call[1]["request"].query_settings.get_clickhouse_settings()[
"use_query_cache"
]
call[1]["clickhouse_settings"]["use_query_cache"]
for call in mocked_run_query.call_args_list[:-1]
) == ["true"] * (3 * 7 + 1)

# we expect all buckets >4 hours old to have a long TTL
assert list(
call[1]["request"].query_settings.get_clickhouse_settings()[
"query_cache_ttl"
]
call[1]["clickhouse_settings"]["query_cache_ttl"]
for call in mocked_run_query.call_args_list[:-2]
) == [90 * 24 * 60 * 60] * (3 * 7)

Expand Down

0 comments on commit 517e298

Please sign in to comment.