From 069f0eba3378fd29a7959a9fd7b093fd227a388e Mon Sep 17 00:00:00 2001 From: colin-sentry <161344340+colin-sentry@users.noreply.github.com> Date: Tue, 17 Sep 2024 12:51:07 -0400 Subject: [PATCH] fix: EAP timeseries order, add weight to calculations (#6302) The old tests used the same number a bunch of times, so there was a bug where the results were not ordered by time. This makes the tests a lot more robust and implements weight in the aggregate functions --------- Co-authored-by: William Mak --- snuba/web/rpc/timeseries.py | 47 +++++++++++--- tests/web/rpc/test_timeseries_api.py | 97 ++++++++++++++++++++++------ 2 files changed, 115 insertions(+), 29 deletions(-) diff --git a/snuba/web/rpc/timeseries.py b/snuba/web/rpc/timeseries.py index 7e63822f65..6c0f63c5e9 100644 --- a/snuba/web/rpc/timeseries.py +++ b/snuba/web/rpc/timeseries.py @@ -5,17 +5,18 @@ AggregateBucketRequest, AggregateBucketResponse, ) +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey from snuba.attribution.appid import AppID from snuba.attribution.attribution_info import AttributionInfo from snuba.datasets.entities.entity_key import EntityKey from snuba.datasets.entities.factory import get_entity from snuba.datasets.pluggable_dataset import PluggableDataset -from snuba.query import SelectedExpression +from snuba.query import OrderBy, OrderByDirection, SelectedExpression from snuba.query.data_source.simple import Entity from snuba.query.dsl import CurriedFunctions as cf from snuba.query.dsl import Functions as f -from snuba.query.dsl import column +from snuba.query.dsl import column, literal from snuba.query.expressions import Expression from snuba.query.logical import Query from snuba.query.query_settings import HTTPQuerySettings @@ -23,6 +24,7 @@ from snuba.utils.metrics.timer import Timer from snuba.web.query import run_query from snuba.web.rpc.common import ( + NORMALIZED_COLUMNS, attribute_key_to_expression, base_conditions_and, trace_item_filters_to_expression, @@ -34,19 +36,43 @@ def _get_aggregate_func( request: AggregateBucketRequest, ) -> Expression: - key_col = attribute_key_to_expression(request.key) + key_expr = attribute_key_to_expression(request.key) + exists_condition: Expression = literal(True) + if request.key.name not in NORMALIZED_COLUMNS: + if request.key.type == AttributeKey.TYPE_STRING: + exists_condition = f.mapContains( + column("attr_str"), literal(request.key.name) + ) + else: + exists_condition = f.mapContains( + column("attr_num"), literal(request.key.name) + ) + sampling_weight_expr = column("sampling_weight_2") + sign_expr = column("sign") + sampling_weight_times_sign = f.multiply(sampling_weight_expr, sign_expr) + if request.aggregate == AggregateBucketRequest.FUNCTION_SUM: - return f.sum(key_col, alias="sum") - if request.aggregate == AggregateBucketRequest.FUNCTION_AVERAGE: - return f.avg(key_col, alias="avg") + return f.sum(f.multiply(key_expr, sampling_weight_times_sign), alias="sum") if request.aggregate == AggregateBucketRequest.FUNCTION_COUNT: - return f.count(key_col, alias="count") + return f.sumIf(sampling_weight_times_sign, exists_condition, alias="count") + if request.aggregate == AggregateBucketRequest.FUNCTION_AVERAGE: + return f.divide( + f.sum(f.multiply(key_expr, sampling_weight_times_sign)), + f.sumIf(sampling_weight_times_sign, exists_condition, alias="count"), + alias="avg", + ) if request.aggregate == AggregateBucketRequest.FUNCTION_P50: - return cf.quantile(0.5)(key_col, alias="p50") + return cf.quantileTDigestWeighted(0.5)( + key_expr, sampling_weight_expr, alias="p50" + ) if request.aggregate == AggregateBucketRequest.FUNCTION_P95: - return cf.quantile(0.95)(key_col, alias="p90") + return cf.quantileTDigestWeighted(0.95)( + key_expr, sampling_weight_expr, alias="p95" + ) if request.aggregate == AggregateBucketRequest.FUNCTION_P99: - return cf.quantile(0.99)(key_col, alias="p95") + return cf.quantileTDigestWeighted(0.99)( + key_expr, sampling_weight_expr, alias="p99" + ) raise BadSnubaRPCRequestException( f"Aggregate {request.aggregate} had an unknown or unset type" @@ -72,6 +98,7 @@ def _build_query(request: AggregateBucketRequest) -> Query: ), granularity=request.granularity_secs, groupby=[column("time")], + order_by=[OrderBy(direction=OrderByDirection.ASC, expression=column("time"))], ) treeify_or_and_conditions(res) return res diff --git a/tests/web/rpc/test_timeseries_api.py b/tests/web/rpc/test_timeseries_api.py index d86e7e7ec3..d5093c9641 100644 --- a/tests/web/rpc/test_timeseries_api.py +++ b/tests/web/rpc/test_timeseries_api.py @@ -18,7 +18,8 @@ from tests.helpers import write_raw_unprocessed_events -def gen_message(dt: datetime) -> Mapping[str, Any]: +def gen_message(dt: datetime, msg_index: int) -> Mapping[str, Any]: + dt = dt - timedelta(hours=1) + timedelta(minutes=msg_index) return { "description": "/api/0/relays/projectconfigs/", "duration_ms": 152, @@ -42,7 +43,12 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: }, "measurements": { "num_of_spans": {"value": 50.0}, - "eap.measurement": {"value": 420}, + "eap.measurement": {"value": msg_index}, + "client_sample_rate": { + "value": 0.01 + if msg_index % 10 == 0 + else 1 # every 10th span should be 100x upscaled + }, }, "organization_id": 1, "origin": "auto.http.django", @@ -83,7 +89,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: "location": random.choice(["mobile", "frontend", "backend"]), }, "trace_id": uuid.uuid4().hex, - "start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)), + "start_timestamp_ms": int(dt.timestamp()) * 1000, "start_timestamp_precise": dt.timestamp(), "end_timestamp_precise": dt.timestamp() + 1, } @@ -97,8 +103,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: @pytest.fixture(autouse=True) def setup_teardown(clickhouse_db: None, redis_db: None) -> None: spans_storage = get_storage(StorageKey("eap_spans")) - start = BASE_TIME - messages = [gen_message(start - timedelta(minutes=i)) for i in range(120)] + messages = [gen_message(BASE_TIME, i) for i in range(120)] write_raw_unprocessed_events(spans_storage, messages) # type: ignore @@ -126,40 +131,94 @@ def test_basic(self) -> None: ) assert response.status_code == 200 - def test_with_data(self, setup_teardown: Any) -> None: - ts = Timestamp(seconds=int(BASE_TIME.timestamp())) - hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + def test_sum(self, setup_teardown: Any) -> None: message = AggregateBucketRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, cogs_category="something", referrer="something", - start_timestamp=Timestamp(seconds=hour_ago), - end_timestamp=ts, + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 30)), ), key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT), - aggregate=AggregateBucketRequest.FUNCTION_AVERAGE, - granularity_secs=1, + aggregate=AggregateBucketRequest.FUNCTION_SUM, + granularity_secs=300, ) response = timeseries_query(message) - assert response.result == [420 for _ in range(60)] + # spans have (measurement, sample rate) = (0, 100), (10, 1), ..., (100, 100) + # granularity puts five spans into the same bucket + # whole interval is 30 minutes, so there should be 6 buckets + # and our start time is exactly 1 hour after data stops + expected_results = [ + 60 * 100 + 61 + 62 + 63 + 64, + 65 + 66 + 67 + 68 + 69, + 70 * 100 + 71 + 72 + 73 + 74, + 75 + 76 + 77 + 78 + 79, + 80 * 100 + 81 + 82 + 83 + 84, + 85 + 86 + 87 + 88 + 89, + ] + assert response.result == expected_results def test_quantiles(self, setup_teardown: Any) -> None: - ts = Timestamp(seconds=int(BASE_TIME.timestamp())) - hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) message = AggregateBucketRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, cogs_category="something", referrer="something", - start_timestamp=Timestamp(seconds=hour_ago), - end_timestamp=ts, + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 60)), ), key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT), aggregate=AggregateBucketRequest.FUNCTION_P99, - granularity_secs=1, + granularity_secs=60 * 15, + ) + response = timeseries_query(message) + # spans have measurement = 0, 1, 2, ... + # for us, starts at 60, and granularity puts 15 spans into each bucket + # and the P99 of 15 spans is just the maximum of the 15. + # T-Digest is approximate, so these numbers can be +- 3 or so. + expected_results = pytest.approx( + [ + 60 + 15 * 0 + 14, + 60 + 15 * 1 + 14, + 60 + 15 * 2 + 14, + 60 + 15 * 3 + 14, + ], + rel=3, + ) + print(response.result) + print(expected_results) + assert response.result == expected_results + + def test_average(self, setup_teardown: Any) -> None: + message = AggregateBucketRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 30)), + ), + key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT), + aggregate=AggregateBucketRequest.FUNCTION_AVERAGE, + granularity_secs=300, ) response = timeseries_query(message) - assert response.result == [420 for _ in range(60)] + # spans have (measurement, sample rate) = (0, 100), (10, 1), ..., (100, 100) + # granularity puts five spans into the same bucket + # whole interval is 30 minutes, so there should be 6 buckets + # and our start time is exactly 1 hour after data stops + expected_results = pytest.approx( + [ + (60 * 100 + 61 + 62 + 63 + 64) / 104, + (65 + 66 + 67 + 68 + 69) / 5, + (70 * 100 + 71 + 72 + 73 + 74) / 104, + (75 + 76 + 77 + 78 + 79) / 5, + (80 * 100 + 81 + 82 + 83 + 84) / 104, + (85 + 86 + 87 + 88 + 89) / 5, + ] + ) + assert response.result == expected_results