diff --git a/requirements.txt b/requirements.txt index 3723e817ee..d45afa1c07 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,4 +45,4 @@ sqlparse==0.4.2 google-api-python-client==2.88.0 sentry-usage-accountant==0.0.10 freezegun==1.2.2 -sentry-protos==0.1.16 +sentry-protos==0.1.21 diff --git a/snuba/web/rpc/common.py b/snuba/web/rpc/common.py index 53c273119f..f230736907 100644 --- a/snuba/web/rpc/common.py +++ b/snuba/web/rpc/common.py @@ -1,7 +1,10 @@ -from typing import Final, Mapping, Set +from typing import Final, Mapping, Sequence, Set from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta -from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import ( + AttributeKey, + VirtualColumnContext, +) from sentry_protos.snuba.v1alpha.trace_item_filter_pb2 import ( ComparisonFilter, TraceItemFilter, @@ -130,6 +133,73 @@ def attribute_key_to_expression(attr_key: AttributeKey) -> Expression: ) +def apply_virtual_columns( + query: Query, virtual_column_contexts: Sequence[VirtualColumnContext] +) -> None: + """Injects virtual column mappings into the clickhouse query. Works with NORMALIZED_COLUMNS on the table or + dynamic columns in attr_str + + attr_num not supported because mapping on floats is a bad idea + + Example: + + SELECT + project_name AS `project_name`, + attr_str['release'] AS `release`, + attr_str['sentry.sdk.name'] AS `sentry.sdk.name`, + ... rest of query + + contexts: + [ {from_column_name: project_id, to_column_name: project_name, value_map: {1: "sentry", 2: "snuba"}} ] + + + Query will be transformed into: + + SELECT + -- see the project name column transformed and the value mapping injected + transform( CAST( project_id, 'String'), array( '1', '2'), array( 'sentry', 'snuba'), 'unknown') AS `project_name`, + -- + attr_str['release'] AS `release`, + attr_str['sentry.sdk.name'] AS `sentry.sdk.name`, + ... rest of query + + """ + + if not virtual_column_contexts: + return + + mapped_column_to_context = {c.to_column_name: c for c in virtual_column_contexts} + + def transform_expressions(expression: Expression) -> Expression: + # virtual columns will show up as `attr_str[virtual_column_name]` or `attr_num[virtual_column_name]` + if not isinstance(expression, SubscriptableReference): + return expression + + if expression.column.column_name != "attr_str": + return expression + context = mapped_column_to_context.get(str(expression.key.value)) + if context: + attribute_expression = attribute_key_to_expression( + AttributeKey( + name=context.from_column_name, + type=NORMALIZED_COLUMNS.get( + context.from_column_name, AttributeKey.TYPE_STRING + ), + ) + ) + return f.transform( + f.CAST(attribute_expression, "String"), + literals_array(None, [literal(k) for k in context.value_map.keys()]), + literals_array(None, [literal(v) for v in context.value_map.values()]), + literal("unknown"), + alias=context.to_column_name, + ) + + return expression + + query.transform_expressions(transform_expressions) + + def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression: """ Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024") diff --git a/snuba/web/rpc/span_samples.py b/snuba/web/rpc/span_samples.py index 7d5ebf6da6..8f5db267a5 100644 --- a/snuba/web/rpc/span_samples.py +++ b/snuba/web/rpc/span_samples.py @@ -25,6 +25,7 @@ from snuba.utils.metrics.timer import Timer from snuba.web.query import run_query from snuba.web.rpc.common import ( + apply_virtual_columns, attribute_key_to_expression, base_conditions_and, trace_item_filters_to_expression, @@ -71,6 +72,7 @@ def _build_query(request: SpanSamplesRequest) -> Query: limit=request.limit, ) treeify_or_and_conditions(res) + apply_virtual_columns(res, request.virtual_column_contexts) return res diff --git a/snuba/web/rpc/tags_list.py b/snuba/web/rpc/tags_list.py index 2febde7de0..e0dbb81421 100644 --- a/snuba/web/rpc/tags_list.py +++ b/snuba/web/rpc/tags_list.py @@ -2,9 +2,10 @@ from typing import List, Optional from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( - TagsListRequest, - TagsListResponse, + TraceItemAttributesRequest, + TraceItemAttributesResponse, ) +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey from snuba.clickhouse.formatter.nodes import FormattedQuery, StringNode from snuba.datasets.schemas.tables import TableSource @@ -15,15 +16,17 @@ def tags_list_query( - request: TagsListRequest, _timer: Optional[Timer] = None -) -> TagsListResponse: - str_storage = get_storage(StorageKey("spans_str_attrs")) - num_storage = get_storage(StorageKey("spans_num_attrs")) - - str_data_source = str_storage.get_schema().get_data_source() - assert isinstance(str_data_source, TableSource) - num_data_source = num_storage.get_schema().get_data_source() - assert isinstance(num_data_source, TableSource) + request: TraceItemAttributesRequest, _timer: Optional[Timer] = None +) -> TraceItemAttributesResponse: + if request.type == AttributeKey.Type.TYPE_STRING: + storage = get_storage(StorageKey("spans_str_attrs")) + elif request.type == AttributeKey.Type.TYPE_FLOAT: + storage = get_storage(StorageKey("spans_num_attrs")) + else: + return TraceItemAttributesResponse(tags=[]) + + data_source = storage.get_schema().get_data_source() + assert isinstance(data_source, TableSource) if request.limit > 1000: raise BadSnubaRPCRequestException("Limit can be at most 1000") @@ -35,39 +38,26 @@ def tags_list_query( ) query = f""" -SELECT * FROM ( - SELECT DISTINCT attr_key, 'str' as type, timestamp - FROM {str_data_source.get_table_name()} - WHERE organization_id={request.meta.organization_id} - AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)}) - AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds}) - - UNION ALL - - SELECT DISTINCT attr_key, 'num' as type, timestamp - FROM {num_data_source.get_table_name()} - WHERE organization_id={request.meta.organization_id} - AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)}) - AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds}) -) +SELECT DISTINCT attr_key, timestamp +FROM {data_source.get_table_name()} +WHERE organization_id={request.meta.organization_id} +AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)}) +AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds}) ORDER BY attr_key LIMIT {request.limit} OFFSET {request.offset} """ - cluster = str_storage.get_cluster() + cluster = storage.get_cluster() reader = cluster.get_reader() result = reader.execute(FormattedQuery([StringNode(query)])) - tags: List[TagsListResponse.Tag] = [] + tags: List[TraceItemAttributesResponse.Tag] = [] for row in result.get("data", []): tags.append( - TagsListResponse.Tag( + TraceItemAttributesResponse.Tag( name=row["attr_key"], - type={ - "str": TagsListResponse.TYPE_STRING, - "num": TagsListResponse.TYPE_NUMBER, - }[row["type"]], + type=request.type, ) ) - return TagsListResponse(tags=tags) + return TraceItemAttributesResponse(tags=tags) diff --git a/snuba/web/views.py b/snuba/web/views.py index 088a9f9e5d..720f700123 100644 --- a/snuba/web/views.py +++ b/snuba/web/views.py @@ -40,7 +40,9 @@ AggregateBucketRequest, ) from sentry_protos.snuba.v1alpha.endpoint_span_samples_pb2 import SpanSamplesRequest -from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import TagsListRequest +from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( + TraceItemAttributesRequest, +) from werkzeug import Response as WerkzeugResponse from werkzeug.exceptions import InternalServerError @@ -288,7 +290,7 @@ def rpc(*, name: str, timer: Timer) -> Response: ] = { "AggregateBucketRequest": (timeseries_query, AggregateBucketRequest), "SpanSamplesRequest": (span_samples_query, SpanSamplesRequest), - "TagsListRequest": (tags_list_query, TagsListRequest), + "TraceItemAttributesRequest": (tags_list_query, TraceItemAttributesRequest), } try: endpoint, req_class = rpcs[name] diff --git a/tests/web/rpc/test_span_samples.py b/tests/web/rpc/test_span_samples.py index a933d54daf..1e70c69ceb 100644 --- a/tests/web/rpc/test_span_samples.py +++ b/tests/web/rpc/test_span_samples.py @@ -10,6 +10,7 @@ from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import ( AttributeKey, AttributeValue, + VirtualColumnContext, ) from sentry_protos.snuba.v1alpha.trace_item_filter_pb2 import ( ComparisonFilter, @@ -24,6 +25,8 @@ from tests.base import BaseApiTest from tests.helpers import write_raw_unprocessed_events +_RELEASE_TAG = "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b" + def gen_message(dt: datetime) -> Mapping[str, Any]: return { @@ -34,7 +37,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: "is_segment": True, "data": { "sentry.environment": "development", - "sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", + "sentry.release": _RELEASE_TAG, "thread.name": "uWSGIWorker1Core0", "thread.id": "8522009600", "sentry.segment.name": "/api/0/relays/projectconfigs/", @@ -62,7 +65,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: "environment": "development", "op": "http.server", "platform": "python", - "release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", + "release": _RELEASE_TAG, "sdk.name": "sentry.python.django", "sdk.version": "2.7.0", "status": "ok", @@ -166,7 +169,6 @@ def test_with_data(self, setup_teardown: Any) -> None: ) ], limit=61, - attribute_key_transform_context=None, ) response = span_samples_query(message) assert [ @@ -222,7 +224,6 @@ def test_booleans_and_number_compares(self, setup_teardown: Any) -> None: ) ], limit=61, - attribute_key_transform_context=None, ) response = span_samples_query(message) assert [ @@ -231,3 +232,103 @@ def test_booleans_and_number_compares(self, setup_teardown: Any) -> None: ) for x in response.span_samples ] == [{"is_segment": True, "span_id": "123456781234567d"} for _ in range(60)] + + def test_with_virtual_columns(self, setup_teardown: Any) -> None: + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + message = SpanSamplesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="category") + ) + ), + keys=[ + AttributeKey(type=AttributeKey.TYPE_STRING, name="project_name"), + AttributeKey(type=AttributeKey.TYPE_STRING, name="release_version"), + AttributeKey(type=AttributeKey.TYPE_STRING, name="sentry.sdk.name"), + ], + order_by=[ + SpanSamplesRequest.OrderBy( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, name="project_name" + ), + ) + ], + limit=61, + virtual_column_contexts=[ + VirtualColumnContext( + from_column_name="project_id", + to_column_name="project_name", + value_map={"1": "sentry", "2": "snuba"}, + ), + VirtualColumnContext( + from_column_name="release", + to_column_name="release_version", + value_map={_RELEASE_TAG: "4.2.0.69"}, + ), + ], + ) + response = span_samples_query(message) + assert [ + dict((k, x.results[k].val_str) for k in x.results) + for x in response.span_samples + ] == [ + { + "project_name": "sentry", + "sentry.sdk.name": "sentry.python.django", + "release_version": "4.2.0.69", + } + for _ in range(60) + ] + + def test_order_by_virtual_columns(self, setup_teardown: Any) -> None: + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + message = SpanSamplesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="category") + ) + ), + keys=[ + AttributeKey(type=AttributeKey.TYPE_STRING, name="special_color"), + ], + order_by=[ + SpanSamplesRequest.OrderBy( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, name="special_color" + ) + ) + ], + limit=61, + virtual_column_contexts=[ + VirtualColumnContext( + from_column_name="color", + to_column_name="special_color", + value_map={"red": "1", "green": "2", "blue": "3"}, + ), + ], + ) + response = span_samples_query(message) + result_dicts = [ + dict((k, x.results[k].val_str) for k in x.results) + for x in response.span_samples + ] + colors = [d["special_color"] for d in result_dicts] + assert sorted(colors) == colors diff --git a/tests/web/rpc/test_tags_list.py b/tests/web/rpc/test_tags_list.py index bb826ac7ba..ff353fb9af 100644 --- a/tests/web/rpc/test_tags_list.py +++ b/tests/web/rpc/test_tags_list.py @@ -5,10 +5,11 @@ import pytest from google.protobuf.timestamp_pb2 import Timestamp from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( - TagsListRequest, - TagsListResponse, + TraceItemAttributesRequest, + TraceItemAttributesResponse, ) from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey @@ -63,11 +64,11 @@ def setup_teardown(clickhouse_db: None, redis_db: None) -> None: @pytest.mark.clickhouse_db @pytest.mark.redis_db -class TestTagsList(BaseApiTest): +class TestTraceItemAttributes(BaseApiTest): def test_basic(self) -> None: ts = Timestamp() ts.GetCurrentTime() - message = TagsListRequest( + message = TraceItemAttributesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -98,12 +99,12 @@ def test_basic(self) -> None: offset=20, ) response = self.app.post( - "/rpc/TagsListRequest", data=message.SerializeToString() + "/rpc/TraceItemAttributesRequest", data=message.SerializeToString() ) assert response.status_code == 200 - def test_simple_case(self, setup_teardown: Any) -> None: - message = TagsListRequest( + def test_simple_case_str(self, setup_teardown: Any) -> None: + message = TraceItemAttributesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -132,17 +133,58 @@ def test_simple_case(self, setup_teardown: Any) -> None: ), limit=10, offset=0, + type=AttributeKey.Type.TYPE_STRING, ) response = tags_list_query(message) assert response.tags == [ - TagsListResponse.Tag( - name=f"a_tag_{i:03}", type=TagsListResponse.TYPE_STRING + TraceItemAttributesResponse.Tag( + name=f"a_tag_{i:03}", type=AttributeKey.Type.TYPE_STRING + ) + for i in range(0, 10) + ] + + def test_simple_case_float(self, setup_teardown: Any) -> None: + message = TraceItemAttributesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp( + seconds=int( + datetime( + year=BASE_TIME.year, + month=BASE_TIME.month, + day=BASE_TIME.day - 1, + tzinfo=UTC, + ).timestamp() + ) + ), + end_timestamp=Timestamp( + seconds=int( + datetime( + year=BASE_TIME.year, + month=BASE_TIME.month, + day=BASE_TIME.day + 1, + tzinfo=UTC, + ).timestamp() + ) + ), + ), + limit=10, + offset=0, + type=AttributeKey.Type.TYPE_FLOAT, + ) + response = tags_list_query(message) + assert response.tags == [ + TraceItemAttributesResponse.Tag( + name=f"b_measurement_{i:03}", type=AttributeKey.Type.TYPE_FLOAT ) for i in range(0, 10) ] def test_with_offset(self, setup_teardown: Any) -> None: - message = TagsListRequest( + message = TraceItemAttributesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -170,21 +212,24 @@ def test_with_offset(self, setup_teardown: Any) -> None: ), ), limit=5, - offset=29, + offset=10, + type=AttributeKey.Type.TYPE_FLOAT, ) response = tags_list_query(message) assert response.tags == [ - TagsListResponse.Tag(name="a_tag_029", type=TagsListResponse.TYPE_STRING), - TagsListResponse.Tag( - name="b_measurement_000", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_010", type=AttributeKey.Type.TYPE_FLOAT + ), + TraceItemAttributesResponse.Tag( + name="b_measurement_011", type=AttributeKey.Type.TYPE_FLOAT ), - TagsListResponse.Tag( - name="b_measurement_001", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_012", type=AttributeKey.Type.TYPE_FLOAT ), - TagsListResponse.Tag( - name="b_measurement_002", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_013", type=AttributeKey.Type.TYPE_FLOAT ), - TagsListResponse.Tag( - name="b_measurement_003", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_014", type=AttributeKey.Type.TYPE_FLOAT ), ]