Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into add-metrics-around-spans-size
Browse files Browse the repository at this point in the history
  • Loading branch information
ayirr7 committed Sep 13, 2024
2 parents fa1f7f4 + b43bf24 commit 3b65f44
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 63 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ sqlparse==0.4.2
google-api-python-client==2.88.0
sentry-usage-accountant==0.0.10
freezegun==1.2.2
sentry-protos==0.1.16
sentry-protos==0.1.21
74 changes: 72 additions & 2 deletions snuba/web/rpc/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Final, Mapping, Set
from typing import Final, Mapping, Sequence, Set

from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import (
AttributeKey,
VirtualColumnContext,
)
from sentry_protos.snuba.v1alpha.trace_item_filter_pb2 import (
ComparisonFilter,
TraceItemFilter,
Expand Down Expand Up @@ -130,6 +133,73 @@ def attribute_key_to_expression(attr_key: AttributeKey) -> Expression:
)


def apply_virtual_columns(
query: Query, virtual_column_contexts: Sequence[VirtualColumnContext]
) -> None:
"""Injects virtual column mappings into the clickhouse query. Works with NORMALIZED_COLUMNS on the table or
dynamic columns in attr_str
attr_num not supported because mapping on floats is a bad idea
Example:
SELECT
project_name AS `project_name`,
attr_str['release'] AS `release`,
attr_str['sentry.sdk.name'] AS `sentry.sdk.name`,
... rest of query
contexts:
[ {from_column_name: project_id, to_column_name: project_name, value_map: {1: "sentry", 2: "snuba"}} ]
Query will be transformed into:
SELECT
-- see the project name column transformed and the value mapping injected
transform( CAST( project_id, 'String'), array( '1', '2'), array( 'sentry', 'snuba'), 'unknown') AS `project_name`,
--
attr_str['release'] AS `release`,
attr_str['sentry.sdk.name'] AS `sentry.sdk.name`,
... rest of query
"""

if not virtual_column_contexts:
return

mapped_column_to_context = {c.to_column_name: c for c in virtual_column_contexts}

def transform_expressions(expression: Expression) -> Expression:
# virtual columns will show up as `attr_str[virtual_column_name]` or `attr_num[virtual_column_name]`
if not isinstance(expression, SubscriptableReference):
return expression

if expression.column.column_name != "attr_str":
return expression
context = mapped_column_to_context.get(str(expression.key.value))
if context:
attribute_expression = attribute_key_to_expression(
AttributeKey(
name=context.from_column_name,
type=NORMALIZED_COLUMNS.get(
context.from_column_name, AttributeKey.TYPE_STRING
),
)
)
return f.transform(
f.CAST(attribute_expression, "String"),
literals_array(None, [literal(k) for k in context.value_map.keys()]),
literals_array(None, [literal(v) for v in context.value_map.values()]),
literal("unknown"),
alias=context.to_column_name,
)

return expression

query.transform_expressions(transform_expressions)


def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression:
"""
Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024")
Expand Down
2 changes: 2 additions & 0 deletions snuba/web/rpc/span_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from snuba.utils.metrics.timer import Timer
from snuba.web.query import run_query
from snuba.web.rpc.common import (
apply_virtual_columns,
attribute_key_to_expression,
base_conditions_and,
trace_item_filters_to_expression,
Expand Down Expand Up @@ -71,6 +72,7 @@ def _build_query(request: SpanSamplesRequest) -> Query:
limit=request.limit,
)
treeify_or_and_conditions(res)
apply_virtual_columns(res, request.virtual_column_contexts)
return res


Expand Down
58 changes: 24 additions & 34 deletions snuba/web/rpc/tags_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from typing import List, Optional

from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import (
TagsListRequest,
TagsListResponse,
TraceItemAttributesRequest,
TraceItemAttributesResponse,
)
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey

from snuba.clickhouse.formatter.nodes import FormattedQuery, StringNode
from snuba.datasets.schemas.tables import TableSource
Expand All @@ -15,15 +16,17 @@


def tags_list_query(
request: TagsListRequest, _timer: Optional[Timer] = None
) -> TagsListResponse:
str_storage = get_storage(StorageKey("spans_str_attrs"))
num_storage = get_storage(StorageKey("spans_num_attrs"))

str_data_source = str_storage.get_schema().get_data_source()
assert isinstance(str_data_source, TableSource)
num_data_source = num_storage.get_schema().get_data_source()
assert isinstance(num_data_source, TableSource)
request: TraceItemAttributesRequest, _timer: Optional[Timer] = None
) -> TraceItemAttributesResponse:
if request.type == AttributeKey.Type.TYPE_STRING:
storage = get_storage(StorageKey("spans_str_attrs"))
elif request.type == AttributeKey.Type.TYPE_FLOAT:
storage = get_storage(StorageKey("spans_num_attrs"))
else:
return TraceItemAttributesResponse(tags=[])

data_source = storage.get_schema().get_data_source()
assert isinstance(data_source, TableSource)

if request.limit > 1000:
raise BadSnubaRPCRequestException("Limit can be at most 1000")
Expand All @@ -35,39 +38,26 @@ def tags_list_query(
)

query = f"""
SELECT * FROM (
SELECT DISTINCT attr_key, 'str' as type, timestamp
FROM {str_data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})
UNION ALL
SELECT DISTINCT attr_key, 'num' as type, timestamp
FROM {num_data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})
)
SELECT DISTINCT attr_key, timestamp
FROM {data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})
ORDER BY attr_key
LIMIT {request.limit} OFFSET {request.offset}
"""

cluster = str_storage.get_cluster()
cluster = storage.get_cluster()
reader = cluster.get_reader()
result = reader.execute(FormattedQuery([StringNode(query)]))

tags: List[TagsListResponse.Tag] = []
tags: List[TraceItemAttributesResponse.Tag] = []
for row in result.get("data", []):
tags.append(
TagsListResponse.Tag(
TraceItemAttributesResponse.Tag(
name=row["attr_key"],
type={
"str": TagsListResponse.TYPE_STRING,
"num": TagsListResponse.TYPE_NUMBER,
}[row["type"]],
type=request.type,
)
)

return TagsListResponse(tags=tags)
return TraceItemAttributesResponse(tags=tags)
6 changes: 4 additions & 2 deletions snuba/web/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
AggregateBucketRequest,
)
from sentry_protos.snuba.v1alpha.endpoint_span_samples_pb2 import SpanSamplesRequest
from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import TagsListRequest
from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import (
TraceItemAttributesRequest,
)
from werkzeug import Response as WerkzeugResponse
from werkzeug.exceptions import InternalServerError

Expand Down Expand Up @@ -288,7 +290,7 @@ def rpc(*, name: str, timer: Timer) -> Response:
] = {
"AggregateBucketRequest": (timeseries_query, AggregateBucketRequest),
"SpanSamplesRequest": (span_samples_query, SpanSamplesRequest),
"TagsListRequest": (tags_list_query, TagsListRequest),
"TraceItemAttributesRequest": (tags_list_query, TraceItemAttributesRequest),
}
try:
endpoint, req_class = rpcs[name]
Expand Down
109 changes: 105 additions & 4 deletions tests/web/rpc/test_span_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import (
AttributeKey,
AttributeValue,
VirtualColumnContext,
)
from sentry_protos.snuba.v1alpha.trace_item_filter_pb2 import (
ComparisonFilter,
Expand All @@ -24,6 +25,8 @@
from tests.base import BaseApiTest
from tests.helpers import write_raw_unprocessed_events

_RELEASE_TAG = "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b"


def gen_message(dt: datetime) -> Mapping[str, Any]:
return {
Expand All @@ -34,7 +37,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]:
"is_segment": True,
"data": {
"sentry.environment": "development",
"sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b",
"sentry.release": _RELEASE_TAG,
"thread.name": "uWSGIWorker1Core0",
"thread.id": "8522009600",
"sentry.segment.name": "/api/0/relays/projectconfigs/",
Expand Down Expand Up @@ -62,7 +65,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]:
"environment": "development",
"op": "http.server",
"platform": "python",
"release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b",
"release": _RELEASE_TAG,
"sdk.name": "sentry.python.django",
"sdk.version": "2.7.0",
"status": "ok",
Expand Down Expand Up @@ -166,7 +169,6 @@ def test_with_data(self, setup_teardown: Any) -> None:
)
],
limit=61,
attribute_key_transform_context=None,
)
response = span_samples_query(message)
assert [
Expand Down Expand Up @@ -222,7 +224,6 @@ def test_booleans_and_number_compares(self, setup_teardown: Any) -> None:
)
],
limit=61,
attribute_key_transform_context=None,
)
response = span_samples_query(message)
assert [
Expand All @@ -231,3 +232,103 @@ def test_booleans_and_number_compares(self, setup_teardown: Any) -> None:
)
for x in response.span_samples
] == [{"is_segment": True, "span_id": "123456781234567d"} for _ in range(60)]

def test_with_virtual_columns(self, setup_teardown: Any) -> None:
ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
message = SpanSamplesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=hour_ago),
end_timestamp=ts,
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="category")
)
),
keys=[
AttributeKey(type=AttributeKey.TYPE_STRING, name="project_name"),
AttributeKey(type=AttributeKey.TYPE_STRING, name="release_version"),
AttributeKey(type=AttributeKey.TYPE_STRING, name="sentry.sdk.name"),
],
order_by=[
SpanSamplesRequest.OrderBy(
key=AttributeKey(
type=AttributeKey.TYPE_STRING, name="project_name"
),
)
],
limit=61,
virtual_column_contexts=[
VirtualColumnContext(
from_column_name="project_id",
to_column_name="project_name",
value_map={"1": "sentry", "2": "snuba"},
),
VirtualColumnContext(
from_column_name="release",
to_column_name="release_version",
value_map={_RELEASE_TAG: "4.2.0.69"},
),
],
)
response = span_samples_query(message)
assert [
dict((k, x.results[k].val_str) for k in x.results)
for x in response.span_samples
] == [
{
"project_name": "sentry",
"sentry.sdk.name": "sentry.python.django",
"release_version": "4.2.0.69",
}
for _ in range(60)
]

def test_order_by_virtual_columns(self, setup_teardown: Any) -> None:
ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
message = SpanSamplesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=hour_ago),
end_timestamp=ts,
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="category")
)
),
keys=[
AttributeKey(type=AttributeKey.TYPE_STRING, name="special_color"),
],
order_by=[
SpanSamplesRequest.OrderBy(
key=AttributeKey(
type=AttributeKey.TYPE_STRING, name="special_color"
)
)
],
limit=61,
virtual_column_contexts=[
VirtualColumnContext(
from_column_name="color",
to_column_name="special_color",
value_map={"red": "1", "green": "2", "blue": "3"},
),
],
)
response = span_samples_query(message)
result_dicts = [
dict((k, x.results[k].val_str) for k in x.results)
for x in response.span_samples
]
colors = [d["special_color"] for d in result_dicts]
assert sorted(colors) == colors
Loading

0 comments on commit 3b65f44

Please sign in to comment.