Skip to content

Commit

Permalink
fix: EAP timeseries order, add weight to calculations (#6302)
Browse files Browse the repository at this point in the history
The old tests used the same number a bunch of times, so there was a bug
where the results were not ordered by time.

This makes the tests a lot more robust and implements weight in the
aggregate functions

---------

Co-authored-by: William Mak <william@wmak.io>
  • Loading branch information
colin-sentry and wmak authored Sep 17, 2024
1 parent d7fa544 commit 069f0eb
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 29 deletions.
47 changes: 37 additions & 10 deletions snuba/web/rpc/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,26 @@
AggregateBucketRequest,
AggregateBucketResponse,
)
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey

from snuba.attribution.appid import AppID
from snuba.attribution.attribution_info import AttributionInfo
from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity
from snuba.datasets.pluggable_dataset import PluggableDataset
from snuba.query import SelectedExpression
from snuba.query import OrderBy, OrderByDirection, SelectedExpression
from snuba.query.data_source.simple import Entity
from snuba.query.dsl import CurriedFunctions as cf
from snuba.query.dsl import Functions as f
from snuba.query.dsl import column
from snuba.query.dsl import column, literal
from snuba.query.expressions import Expression
from snuba.query.logical import Query
from snuba.query.query_settings import HTTPQuerySettings
from snuba.request import Request as SnubaRequest
from snuba.utils.metrics.timer import Timer
from snuba.web.query import run_query
from snuba.web.rpc.common import (
NORMALIZED_COLUMNS,
attribute_key_to_expression,
base_conditions_and,
trace_item_filters_to_expression,
Expand All @@ -34,19 +36,43 @@
def _get_aggregate_func(
request: AggregateBucketRequest,
) -> Expression:
key_col = attribute_key_to_expression(request.key)
key_expr = attribute_key_to_expression(request.key)
exists_condition: Expression = literal(True)
if request.key.name not in NORMALIZED_COLUMNS:
if request.key.type == AttributeKey.TYPE_STRING:
exists_condition = f.mapContains(
column("attr_str"), literal(request.key.name)
)
else:
exists_condition = f.mapContains(
column("attr_num"), literal(request.key.name)
)
sampling_weight_expr = column("sampling_weight_2")
sign_expr = column("sign")
sampling_weight_times_sign = f.multiply(sampling_weight_expr, sign_expr)

if request.aggregate == AggregateBucketRequest.FUNCTION_SUM:
return f.sum(key_col, alias="sum")
if request.aggregate == AggregateBucketRequest.FUNCTION_AVERAGE:
return f.avg(key_col, alias="avg")
return f.sum(f.multiply(key_expr, sampling_weight_times_sign), alias="sum")
if request.aggregate == AggregateBucketRequest.FUNCTION_COUNT:
return f.count(key_col, alias="count")
return f.sumIf(sampling_weight_times_sign, exists_condition, alias="count")
if request.aggregate == AggregateBucketRequest.FUNCTION_AVERAGE:
return f.divide(
f.sum(f.multiply(key_expr, sampling_weight_times_sign)),
f.sumIf(sampling_weight_times_sign, exists_condition, alias="count"),
alias="avg",
)
if request.aggregate == AggregateBucketRequest.FUNCTION_P50:
return cf.quantile(0.5)(key_col, alias="p50")
return cf.quantileTDigestWeighted(0.5)(
key_expr, sampling_weight_expr, alias="p50"
)
if request.aggregate == AggregateBucketRequest.FUNCTION_P95:
return cf.quantile(0.95)(key_col, alias="p90")
return cf.quantileTDigestWeighted(0.95)(
key_expr, sampling_weight_expr, alias="p95"
)
if request.aggregate == AggregateBucketRequest.FUNCTION_P99:
return cf.quantile(0.99)(key_col, alias="p95")
return cf.quantileTDigestWeighted(0.99)(
key_expr, sampling_weight_expr, alias="p99"
)

raise BadSnubaRPCRequestException(
f"Aggregate {request.aggregate} had an unknown or unset type"
Expand All @@ -72,6 +98,7 @@ def _build_query(request: AggregateBucketRequest) -> Query:
),
granularity=request.granularity_secs,
groupby=[column("time")],
order_by=[OrderBy(direction=OrderByDirection.ASC, expression=column("time"))],
)
treeify_or_and_conditions(res)
return res
Expand Down
97 changes: 78 additions & 19 deletions tests/web/rpc/test_timeseries_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from tests.helpers import write_raw_unprocessed_events


def gen_message(dt: datetime) -> Mapping[str, Any]:
def gen_message(dt: datetime, msg_index: int) -> Mapping[str, Any]:
dt = dt - timedelta(hours=1) + timedelta(minutes=msg_index)
return {
"description": "/api/0/relays/projectconfigs/",
"duration_ms": 152,
Expand All @@ -42,7 +43,12 @@ def gen_message(dt: datetime) -> Mapping[str, Any]:
},
"measurements": {
"num_of_spans": {"value": 50.0},
"eap.measurement": {"value": 420},
"eap.measurement": {"value": msg_index},
"client_sample_rate": {
"value": 0.01
if msg_index % 10 == 0
else 1 # every 10th span should be 100x upscaled
},
},
"organization_id": 1,
"origin": "auto.http.django",
Expand Down Expand Up @@ -83,7 +89,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]:
"location": random.choice(["mobile", "frontend", "backend"]),
},
"trace_id": uuid.uuid4().hex,
"start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)),
"start_timestamp_ms": int(dt.timestamp()) * 1000,
"start_timestamp_precise": dt.timestamp(),
"end_timestamp_precise": dt.timestamp() + 1,
}
Expand All @@ -97,8 +103,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]:
@pytest.fixture(autouse=True)
def setup_teardown(clickhouse_db: None, redis_db: None) -> None:
spans_storage = get_storage(StorageKey("eap_spans"))
start = BASE_TIME
messages = [gen_message(start - timedelta(minutes=i)) for i in range(120)]
messages = [gen_message(BASE_TIME, i) for i in range(120)]
write_raw_unprocessed_events(spans_storage, messages) # type: ignore


Expand Down Expand Up @@ -126,40 +131,94 @@ def test_basic(self) -> None:
)
assert response.status_code == 200

def test_with_data(self, setup_teardown: Any) -> None:
ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
def test_sum(self, setup_teardown: Any) -> None:
message = AggregateBucketRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=hour_ago),
end_timestamp=ts,
start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())),
end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 30)),
),
key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT),
aggregate=AggregateBucketRequest.FUNCTION_AVERAGE,
granularity_secs=1,
aggregate=AggregateBucketRequest.FUNCTION_SUM,
granularity_secs=300,
)
response = timeseries_query(message)
assert response.result == [420 for _ in range(60)]
# spans have (measurement, sample rate) = (0, 100), (10, 1), ..., (100, 100)
# granularity puts five spans into the same bucket
# whole interval is 30 minutes, so there should be 6 buckets
# and our start time is exactly 1 hour after data stops
expected_results = [
60 * 100 + 61 + 62 + 63 + 64,
65 + 66 + 67 + 68 + 69,
70 * 100 + 71 + 72 + 73 + 74,
75 + 76 + 77 + 78 + 79,
80 * 100 + 81 + 82 + 83 + 84,
85 + 86 + 87 + 88 + 89,
]
assert response.result == expected_results

def test_quantiles(self, setup_teardown: Any) -> None:
ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
message = AggregateBucketRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=hour_ago),
end_timestamp=ts,
start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())),
end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 60)),
),
key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT),
aggregate=AggregateBucketRequest.FUNCTION_P99,
granularity_secs=1,
granularity_secs=60 * 15,
)
response = timeseries_query(message)
# spans have measurement = 0, 1, 2, ...
# for us, starts at 60, and granularity puts 15 spans into each bucket
# and the P99 of 15 spans is just the maximum of the 15.
# T-Digest is approximate, so these numbers can be +- 3 or so.
expected_results = pytest.approx(
[
60 + 15 * 0 + 14,
60 + 15 * 1 + 14,
60 + 15 * 2 + 14,
60 + 15 * 3 + 14,
],
rel=3,
)
print(response.result)
print(expected_results)
assert response.result == expected_results

def test_average(self, setup_teardown: Any) -> None:
message = AggregateBucketRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())),
end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 30)),
),
key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT),
aggregate=AggregateBucketRequest.FUNCTION_AVERAGE,
granularity_secs=300,
)
response = timeseries_query(message)
assert response.result == [420 for _ in range(60)]
# spans have (measurement, sample rate) = (0, 100), (10, 1), ..., (100, 100)
# granularity puts five spans into the same bucket
# whole interval is 30 minutes, so there should be 6 buckets
# and our start time is exactly 1 hour after data stops
expected_results = pytest.approx(
[
(60 * 100 + 61 + 62 + 63 + 64) / 104,
(65 + 66 + 67 + 68 + 69) / 5,
(70 * 100 + 71 + 72 + 73 + 74) / 104,
(75 + 76 + 77 + 78 + 79) / 5,
(80 * 100 + 81 + 82 + 83 + 84) / 104,
(85 + 86 + 87 + 88 + 89) / 5,
]
)
assert response.result == expected_results

0 comments on commit 069f0eb

Please sign in to comment.