From 851f577c9ccb738cddecb83ed2f95f816e2b27a5 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Wed, 11 Sep 2024 10:38:10 -0400 Subject: [PATCH 01/23] bug(query): Run entity validators in composite query pipeline (#6285) This PR contains a fix for this sentry [issue](https://sentry.sentry.io/issues/5825035976/?project=300688&query=is%3Aunresolved%20issue.priority%3A%5Bhigh%2C%20medium%5D&referrer=issue-stream&statsPeriod=1h&stream_index=4&utc=true). In the composite query pipeline, snuba is not running entity validators. As a result, join queries with missing columns (e.g. mql formula queries with unresolved tag keys) are reaching ClickHouse and affecting our SLO. Instead, the entity validator should catch these invalid queries and fail accordingly. --- snuba/pipeline/stages/query_processing.py | 2 +- .../test_entity_processing_stage_composite.py | 90 +++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/snuba/pipeline/stages/query_processing.py b/snuba/pipeline/stages/query_processing.py index 393f7da83b..7b2fd4311c 100644 --- a/snuba/pipeline/stages/query_processing.py +++ b/snuba/pipeline/stages/query_processing.py @@ -34,10 +34,10 @@ def _process_data( if translated_storage_query: return translated_storage_query + run_entity_validators(cast(EntityQuery, query), pipe_input.query_settings) if isinstance(query, LogicalQuery) and isinstance( query.get_from_clause(), Entity ): - run_entity_validators(cast(EntityQuery, query), pipe_input.query_settings) return run_entity_processing_executor(query, pipe_input.query_settings) elif isinstance(query, CompositeQuery): # if we were not able to translate the storage query earlier and we got to this point, this is diff --git a/tests/pipeline/test_entity_processing_stage_composite.py b/tests/pipeline/test_entity_processing_stage_composite.py index 376e13a5d4..6eda556057 100644 --- a/tests/pipeline/test_entity_processing_stage_composite.py +++ b/tests/pipeline/test_entity_processing_stage_composite.py @@ -29,6 +29,7 @@ JoinType, ) from snuba.query.data_source.simple import Entity, Table +from snuba.query.exceptions import ValidationException from snuba.query.expressions import ( Column, FunctionCall, @@ -456,3 +457,92 @@ def test_composite( .data ) assert actual == expected + + +TEST_CASES_INVALID = [ + pytest.param( + CompositeQuery( + from_clause=JoinClause( + left_node=IndividualNode( + alias="err", + data_source=events_ent, + ), + right_node=IndividualNode( + alias="groups", + data_source=groups_ent, + ), + keys=[ + JoinCondition( + left=JoinConditionExpression("err", "group_id"), + right=JoinConditionExpression("groups", "id"), + ) + ], + join_type=JoinType.INNER, + ), + selected_columns=[ + SelectedExpression( + "f_release", + FunctionCall( + "f_release", + "f", + (Column(None, "err", "release"),), + ), + ), + SelectedExpression( + "_snuba_right", + Column("_snuba_right", "groups", "status"), + ), + ], + condition=binary_condition( + BooleanFunctions.AND, + binary_condition( + ConditionFunctions.EQ, + Column(None, "err", "project_id"), + Literal(None, 1), + ), + binary_condition( + BooleanFunctions.AND, + binary_condition( + ConditionFunctions.GTE, + Column(None, "err", "timestamp"), + Literal(None, datetime(2020, 1, 1, 12, 0)), + ), + binary_condition( + ConditionFunctions.GTE, + Column(None, "err", "foo"), + Literal(None, 1), + ), + ), + ), + ), + ValidationException, + id="Join query with missing column", + ), +] + + +@pytest.mark.parametrize("logical_query, expected_error", TEST_CASES_INVALID) +@pytest.mark.clickhouse_db +def test_invalid_composite( + logical_query: CompositeQuery[Entity], + expected_error: Exception, +) -> None: + request = Request( + id="", + original_body={"query": "placeholder"}, + query=cast(LogicalQuery, logical_query), + query_settings=HTTPQuerySettings(), + attribution_info=AttributionInfo( + get_app_id("blah"), {"tenant_type": "tenant_id"}, "blah", None, None, None + ), + ) + actual = EntityProcessingStage().execute( + QueryPipelineResult( + data=request, + query_settings=request.query_settings, + timer=Timer("test"), + error=None, + ) + ) + assert actual.error and not actual.data + assert isinstance(type(actual.error), type(expected_error)) From e7fe504332c7d118eb6baf508b2dd5653a44ec8d Mon Sep 17 00:00:00 2001 From: colin-sentry <161344340+colin-sentry@users.noreply.github.com> Date: Wed, 11 Sep 2024 13:27:51 -0400 Subject: [PATCH 02/23] fix(eap): Switch to sampling_weight_2 in entity (#6287) Evan added this in https://github.com/getsentry/snuba/pull/6190, make it queryable via the entity with a rename --- .../events_analytics_platform/entities/eap_spans.yaml | 8 +++++++- .../events_analytics_platform/storages/eap_spans.yaml | 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml index 6c08867f91..a30887d44c 100644 --- a/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml @@ -22,7 +22,7 @@ schema: { name: retention_days, type: UInt, args: { size: 16 } }, { name: name, type: String }, { name: sampling_factor, type: Float, args: { size: 64 } }, - { name: sampling_weight, type: Float, args: { size: 64 } }, + { name: sampling_weight, type: UInt, args: { size: 64 } }, { name: sign, type: Int, args: { size: 8 } }, { name: attr_str, type: Map, args: { key: { type: String }, value: { type: String } } }, { name: attr_num, type: Map, args: { key: { type: String }, value: { type: Float, args: { size: 64 } } } }, @@ -39,6 +39,12 @@ storages: from_col_name: timestamp to_table_name: null to_col_name: _sort_timestamp + - mapper: ColumnToColumn + args: + from_table_name: null + from_col_name: sampling_weight + to_table_name: null + to_col_name: sampling_weight_2 subscriptables: - mapper: SubscriptableHashBucketMapper args: diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml index 4d4245a959..39a4a97f31 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml @@ -29,6 +29,7 @@ schema: { name: name, type: String }, { name: sampling_factor, type: Float, args: { size: 64 } }, { name: sampling_weight, type: Float, args: { size: 64 } }, + { name: sampling_weight_2, type: UInt, args: { size: 64 } }, { name: sign, type: Int, args: { size: 8 } }, { name: attr_str_0, type: Map, args: { key: { type: String }, value: { type: String } } }, { name: attr_str_1, type: Map, args: { key: { type: String }, value: { type: String } } }, From 4c9ce40ef0e92a1085290a81f5566dfb2d2424fe Mon Sep 17 00:00:00 2001 From: Riya Chakraborty <47572810+ayirr7@users.noreply.github.com> Date: Wed, 11 Sep 2024 14:36:48 -0700 Subject: [PATCH 03/23] Update docker entrypoint to run heaptrack (#6273) Updating this to run heaptrack if the corresponding environment variable is set. Otherwise just run the Snuba process by default. --------- Co-authored-by: anthony sottile <103459774+asottile-sentry@users.noreply.github.com> --- Dockerfile | 2 ++ docker_entrypoint.sh | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/Dockerfile b/Dockerfile index f882c5a07f..0fe9af0e03 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,6 +30,8 @@ RUN set -ex; \ runtimeDeps=' \ curl \ libjemalloc2 \ + gdb \ + heaptrack \ '; \ apt-get update; \ apt-get install -y $buildDeps $runtimeDeps --no-install-recommends; \ diff --git a/docker_entrypoint.sh b/docker_entrypoint.sh index 68ce582619..26041c4d6a 100755 --- a/docker_entrypoint.sh +++ b/docker_entrypoint.sh @@ -18,4 +18,8 @@ else printf "\n${help_result}" fi +if [ -n "${ENABLE_HEAPTRACK:-}" ]; then + set -- heaptrack "$@" +fi + exec "$@" From 81a535973dc52bc2ad5cca69afadf5df8885183d Mon Sep 17 00:00:00 2001 From: colin-sentry <161344340+colin-sentry@users.noreply.github.com> Date: Wed, 11 Sep 2024 18:12:42 -0400 Subject: [PATCH 04/23] feat(eap): Start ingesting data into sample_weight_2 column (#6290) This migration was added, but the consumer was never set up to insert into it. --- rust_snuba/src/processors/eap_spans.rs | 10 ++++++++-- ...a__processors__eap_spans__tests__serialization.snap | 5 +++-- ...ssageProcessor-snuba-spans__1__basic_span.json.snap | 1 + .../events_analytics_platform/entities/eap_spans.yaml | 10 +++------- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/rust_snuba/src/processors/eap_spans.rs b/rust_snuba/src/processors/eap_spans.rs index 795c85cbdb..a78181e4a6 100644 --- a/rust_snuba/src/processors/eap_spans.rs +++ b/rust_snuba/src/processors/eap_spans.rs @@ -52,7 +52,8 @@ struct EAPSpan { name: String, //aka description sampling_factor: f64, - sampling_weight: f64, + sampling_weight: f64, //remove eventually + sampling_weight_2: u64, sign: u8, //1 for additions, -1 for deletions - for this worker it should be 1 #( @@ -101,7 +102,8 @@ impl From for EAPSpan { retention_days: from.retention_days, name: from.description.unwrap_or_default(), - sampling_weight: 1., + sampling_weight: 1., //remove eventually + sampling_weight_2: 1, sampling_factor: 1., sign: 1, @@ -153,6 +155,7 @@ impl From for EAPSpan { if k == "client_sample_rate" && v.value != 0.0 { res.sampling_factor = v.value; res.sampling_weight = 1.0 / v.value; + res.sampling_weight_2 = (1.0 / v.value) as u64; } else { insert_num(k.clone(), v.value); } @@ -217,6 +220,9 @@ mod tests { "measurements": { "num_of_spans": { "value": 50.0 + }, + "client_sample_rate": { + "value": 0.01 } }, "organization_id": 1, diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap index 7ab2b7f26d..05eac7c04f 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap @@ -19,8 +19,9 @@ expression: span "exclusive_time_ms": 0.228, "retention_days": 90, "name": "/api/0/relays/projectconfigs/", - "sampling_factor": 1.0, - "sampling_weight": 1.0, + "sampling_factor": 0.01, + "sampling_weight": 100.0, + "sampling_weight_2": 100, "sign": 1, "attr_str_0": { "relay_protocol_version": "3", diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-spans-EAPSpansMessageProcessor-snuba-spans__1__basic_span.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-spans-EAPSpansMessageProcessor-snuba-spans__1__basic_span.json.snap index 3b8fdaf3a4..64a32b2761 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-spans-EAPSpansMessageProcessor-snuba-spans__1__basic_span.json.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-spans-EAPSpansMessageProcessor-snuba-spans__1__basic_span.json.snap @@ -52,6 +52,7 @@ expression: snapshot_payload "retention_days": 90, "sampling_factor": 1.0, "sampling_weight": 1.0, + "sampling_weight_2": 1, "segment_id": 16045690984833335023, "segment_name": "/organizations/:orgId/issues/", "service": "1", diff --git a/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml index a30887d44c..b10c58d583 100644 --- a/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml @@ -22,7 +22,8 @@ schema: { name: retention_days, type: UInt, args: { size: 16 } }, { name: name, type: String }, { name: sampling_factor, type: Float, args: { size: 64 } }, - { name: sampling_weight, type: UInt, args: { size: 64 } }, + { name: sampling_weight, type: Float, args: { size: 64 } }, + { name: sampling_weight_2, type: UInt, args: { size: 64 } }, { name: sign, type: Int, args: { size: 8 } }, { name: attr_str, type: Map, args: { key: { type: String }, value: { type: String } } }, { name: attr_num, type: Map, args: { key: { type: String }, value: { type: Float, args: { size: 64 } } } }, @@ -39,12 +40,7 @@ storages: from_col_name: timestamp to_table_name: null to_col_name: _sort_timestamp - - mapper: ColumnToColumn - args: - from_table_name: null - from_col_name: sampling_weight - to_table_name: null - to_col_name: sampling_weight_2 + subscriptables: - mapper: SubscriptableHashBucketMapper args: From bdfe450485651e9f56ac45519730d64b925b523e Mon Sep 17 00:00:00 2001 From: viglia Date: Thu, 12 Sep 2024 17:29:03 +0200 Subject: [PATCH 05/23] ref(doc): add documentation for the ReadinessState enum (#6295) Adding some doc for the `ReadinessState` _Enum_ class to explain its use and the meaning of each of its members. --- snuba/datasets/readiness_state.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/snuba/datasets/readiness_state.py b/snuba/datasets/readiness_state.py index ca4bc96f1d..ada34b7f62 100644 --- a/snuba/datasets/readiness_state.py +++ b/snuba/datasets/readiness_state.py @@ -4,6 +4,26 @@ class ReadinessState(Enum): + """ + Readiness states are essentially feature flags for snuba datasets. + The readiness state defines whether or not a dataset is made available + in specific sentry environments. + Currently, sentry environments include the following: + * local/CI + * SaaS + * S4S + * Self-Hosted + * Single-Tenant + + The following is a list of readiness states and the environments + they map to: + * limited -> local/CI + * experimental -> local/CI, S4S + * partial -> local/CI, SaaS, S4S + * deprecate -> local/CI, Self-Hosted + * complete -> local/CI, SaaS, S4S, Self-Hosted, Single-Tenant + """ + LIMITED = "limited" DEPRECATE = "deprecate" PARTIAL = "partial" From 6c680fecbd0dab60caaa243ad1c45a69643f5fcd Mon Sep 17 00:00:00 2001 From: volokluev <3169433+volokluev@users.noreply.github.com> Date: Thu, 12 Sep 2024 13:38:12 -0700 Subject: [PATCH 06/23] tweak(eap): Allow more memory usage for eap spans (#6298) --- .../events_analytics_platform/storages/eap_spans.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml index 39a4a97f31..ed2d22011d 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml @@ -114,6 +114,12 @@ query_processors: prewhere_candidates: [span_id, trace_id, segment_name] - processor: TupleUnaliaser + - processor: ClickhouseSettingsOverride + args: + settings: + max_memory_usage: 5000000000 + max_rows_to_group_by: 1000000 + group_by_overflow_mode: any mandatory_condition_checkers: - condition: OrgIdEnforcer From 08928b9391714d7de8ad4993317d31606e4f7388 Mon Sep 17 00:00:00 2001 From: volokluev <3169433+volokluev@users.noreply.github.com> Date: Thu, 12 Sep 2024 16:17:05 -0700 Subject: [PATCH 07/23] feat(eap): add virtual column support (#6292) Add support for virtual columns to allow for sorting by project name or semver release (as opposed to release id). --- requirements.txt | 2 +- snuba/web/rpc/common.py | 74 +++++++++++++++++++- snuba/web/rpc/span_samples.py | 2 + tests/web/rpc/test_span_samples.py | 109 +++++++++++++++++++++++++++-- 4 files changed, 180 insertions(+), 7 deletions(-) diff --git a/requirements.txt b/requirements.txt index 3723e817ee..01ec162c8f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,4 +45,4 @@ sqlparse==0.4.2 google-api-python-client==2.88.0 sentry-usage-accountant==0.0.10 freezegun==1.2.2 -sentry-protos==0.1.16 +sentry-protos==0.1.20 diff --git a/snuba/web/rpc/common.py b/snuba/web/rpc/common.py index 53c273119f..f230736907 100644 --- a/snuba/web/rpc/common.py +++ b/snuba/web/rpc/common.py @@ -1,7 +1,10 @@ -from typing import Final, Mapping, Set +from typing import Final, Mapping, Sequence, Set from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta -from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import ( + AttributeKey, + VirtualColumnContext, +) from sentry_protos.snuba.v1alpha.trace_item_filter_pb2 import ( ComparisonFilter, TraceItemFilter, @@ -130,6 +133,73 @@ def attribute_key_to_expression(attr_key: AttributeKey) -> Expression: ) +def apply_virtual_columns( + query: Query, virtual_column_contexts: Sequence[VirtualColumnContext] +) -> None: + """Injects virtual column mappings into the clickhouse query. Works with NORMALIZED_COLUMNS on the table or + dynamic columns in attr_str + + attr_num not supported because mapping on floats is a bad idea + + Example: + + SELECT + project_name AS `project_name`, + attr_str['release'] AS `release`, + attr_str['sentry.sdk.name'] AS `sentry.sdk.name`, + ... rest of query + + contexts: + [ {from_column_name: project_id, to_column_name: project_name, value_map: {1: "sentry", 2: "snuba"}} ] + + + Query will be transformed into: + + SELECT + -- see the project name column transformed and the value mapping injected + transform( CAST( project_id, 'String'), array( '1', '2'), array( 'sentry', 'snuba'), 'unknown') AS `project_name`, + -- + attr_str['release'] AS `release`, + attr_str['sentry.sdk.name'] AS `sentry.sdk.name`, + ... rest of query + + """ + + if not virtual_column_contexts: + return + + mapped_column_to_context = {c.to_column_name: c for c in virtual_column_contexts} + + def transform_expressions(expression: Expression) -> Expression: + # virtual columns will show up as `attr_str[virtual_column_name]` or `attr_num[virtual_column_name]` + if not isinstance(expression, SubscriptableReference): + return expression + + if expression.column.column_name != "attr_str": + return expression + context = mapped_column_to_context.get(str(expression.key.value)) + if context: + attribute_expression = attribute_key_to_expression( + AttributeKey( + name=context.from_column_name, + type=NORMALIZED_COLUMNS.get( + context.from_column_name, AttributeKey.TYPE_STRING + ), + ) + ) + return f.transform( + f.CAST(attribute_expression, "String"), + literals_array(None, [literal(k) for k in context.value_map.keys()]), + literals_array(None, [literal(v) for v in context.value_map.values()]), + literal("unknown"), + alias=context.to_column_name, + ) + + return expression + + query.transform_expressions(transform_expressions) + + def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression: """ Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024") diff --git a/snuba/web/rpc/span_samples.py b/snuba/web/rpc/span_samples.py index 7d5ebf6da6..4051932ce8 100644 --- a/snuba/web/rpc/span_samples.py +++ b/snuba/web/rpc/span_samples.py @@ -25,6 +25,7 @@ from snuba.utils.metrics.timer import Timer from snuba.web.query import run_query from snuba.web.rpc.common import ( + apply_virtual_columns, attribute_key_to_expression, base_conditions_and, trace_item_filters_to_expression, @@ -71,6 +72,7 @@ def _build_query(request: SpanSamplesRequest) -> Query: limit=request.limit, ) treeify_or_and_conditions(res) + apply_virtual_columns(res, request.virtual_column_context) return res diff --git a/tests/web/rpc/test_span_samples.py b/tests/web/rpc/test_span_samples.py index a933d54daf..6c19a1773b 100644 --- a/tests/web/rpc/test_span_samples.py +++ b/tests/web/rpc/test_span_samples.py @@ -10,6 +10,7 @@ from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import ( AttributeKey, AttributeValue, + VirtualColumnContext, ) from sentry_protos.snuba.v1alpha.trace_item_filter_pb2 import ( ComparisonFilter, @@ -24,6 +25,8 @@ from tests.base import BaseApiTest from tests.helpers import write_raw_unprocessed_events +_RELEASE_TAG = "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b" + def gen_message(dt: datetime) -> Mapping[str, Any]: return { @@ -34,7 +37,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: "is_segment": True, "data": { "sentry.environment": "development", - "sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", + "sentry.release": _RELEASE_TAG, "thread.name": "uWSGIWorker1Core0", "thread.id": "8522009600", "sentry.segment.name": "/api/0/relays/projectconfigs/", @@ -62,7 +65,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: "environment": "development", "op": "http.server", "platform": "python", - "release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", + "release": _RELEASE_TAG, "sdk.name": "sentry.python.django", "sdk.version": "2.7.0", "status": "ok", @@ -166,7 +169,6 @@ def test_with_data(self, setup_teardown: Any) -> None: ) ], limit=61, - attribute_key_transform_context=None, ) response = span_samples_query(message) assert [ @@ -222,7 +224,6 @@ def test_booleans_and_number_compares(self, setup_teardown: Any) -> None: ) ], limit=61, - attribute_key_transform_context=None, ) response = span_samples_query(message) assert [ @@ -231,3 +232,103 @@ def test_booleans_and_number_compares(self, setup_teardown: Any) -> None: ) for x in response.span_samples ] == [{"is_segment": True, "span_id": "123456781234567d"} for _ in range(60)] + + def test_with_virtual_columns(self, setup_teardown: Any) -> None: + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + message = SpanSamplesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="category") + ) + ), + keys=[ + AttributeKey(type=AttributeKey.TYPE_STRING, name="project_name"), + AttributeKey(type=AttributeKey.TYPE_STRING, name="release_version"), + AttributeKey(type=AttributeKey.TYPE_STRING, name="sentry.sdk.name"), + ], + order_by=[ + SpanSamplesRequest.OrderBy( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, name="project_name" + ), + ) + ], + limit=61, + virtual_column_context=[ + VirtualColumnContext( + from_column_name="project_id", + to_column_name="project_name", + value_map={"1": "sentry", "2": "snuba"}, + ), + VirtualColumnContext( + from_column_name="release", + to_column_name="release_version", + value_map={_RELEASE_TAG: "4.2.0.69"}, + ), + ], + ) + response = span_samples_query(message) + assert [ + dict((k, x.results[k].val_str) for k in x.results) + for x in response.span_samples + ] == [ + { + "project_name": "sentry", + "sentry.sdk.name": "sentry.python.django", + "release_version": "4.2.0.69", + } + for _ in range(60) + ] + + def test_order_by_virtual_columns(self, setup_teardown: Any) -> None: + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + message = SpanSamplesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="category") + ) + ), + keys=[ + AttributeKey(type=AttributeKey.TYPE_STRING, name="special_color"), + ], + order_by=[ + SpanSamplesRequest.OrderBy( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, name="special_color" + ) + ) + ], + limit=61, + virtual_column_context=[ + VirtualColumnContext( + from_column_name="color", + to_column_name="special_color", + value_map={"red": "1", "green": "2", "blue": "3"}, + ), + ], + ) + response = span_samples_query(message) + result_dicts = [ + dict((k, x.results[k].val_str) for k in x.results) + for x in response.span_samples + ] + colors = [d["special_color"] for d in result_dicts] + assert sorted(colors) == colors From b43bf24bbb8117df8a2e4355118455b5d7a48503 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Fri, 13 Sep 2024 13:07:03 -0400 Subject: [PATCH 08/23] feat(rpc): Update tags list rpc (#6301) sentry-protos 0.1.21 changes the definition a bit, updating the endpoint to accept it. --- requirements.txt | 2 +- snuba/web/rpc/span_samples.py | 2 +- snuba/web/rpc/tags_list.py | 58 +++++++++----------- snuba/web/views.py | 6 ++- tests/web/rpc/test_span_samples.py | 4 +- tests/web/rpc/test_tags_list.py | 85 +++++++++++++++++++++++------- 6 files changed, 97 insertions(+), 60 deletions(-) diff --git a/requirements.txt b/requirements.txt index 01ec162c8f..d45afa1c07 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,4 +45,4 @@ sqlparse==0.4.2 google-api-python-client==2.88.0 sentry-usage-accountant==0.0.10 freezegun==1.2.2 -sentry-protos==0.1.20 +sentry-protos==0.1.21 diff --git a/snuba/web/rpc/span_samples.py b/snuba/web/rpc/span_samples.py index 4051932ce8..8f5db267a5 100644 --- a/snuba/web/rpc/span_samples.py +++ b/snuba/web/rpc/span_samples.py @@ -72,7 +72,7 @@ def _build_query(request: SpanSamplesRequest) -> Query: limit=request.limit, ) treeify_or_and_conditions(res) - apply_virtual_columns(res, request.virtual_column_context) + apply_virtual_columns(res, request.virtual_column_contexts) return res diff --git a/snuba/web/rpc/tags_list.py b/snuba/web/rpc/tags_list.py index 2febde7de0..e0dbb81421 100644 --- a/snuba/web/rpc/tags_list.py +++ b/snuba/web/rpc/tags_list.py @@ -2,9 +2,10 @@ from typing import List, Optional from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( - TagsListRequest, - TagsListResponse, + TraceItemAttributesRequest, + TraceItemAttributesResponse, ) +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey from snuba.clickhouse.formatter.nodes import FormattedQuery, StringNode from snuba.datasets.schemas.tables import TableSource @@ -15,15 +16,17 @@ def tags_list_query( - request: TagsListRequest, _timer: Optional[Timer] = None -) -> TagsListResponse: - str_storage = get_storage(StorageKey("spans_str_attrs")) - num_storage = get_storage(StorageKey("spans_num_attrs")) - - str_data_source = str_storage.get_schema().get_data_source() - assert isinstance(str_data_source, TableSource) - num_data_source = num_storage.get_schema().get_data_source() - assert isinstance(num_data_source, TableSource) + request: TraceItemAttributesRequest, _timer: Optional[Timer] = None +) -> TraceItemAttributesResponse: + if request.type == AttributeKey.Type.TYPE_STRING: + storage = get_storage(StorageKey("spans_str_attrs")) + elif request.type == AttributeKey.Type.TYPE_FLOAT: + storage = get_storage(StorageKey("spans_num_attrs")) + else: + return TraceItemAttributesResponse(tags=[]) + + data_source = storage.get_schema().get_data_source() + assert isinstance(data_source, TableSource) if request.limit > 1000: raise BadSnubaRPCRequestException("Limit can be at most 1000") @@ -35,39 +38,26 @@ def tags_list_query( ) query = f""" -SELECT * FROM ( - SELECT DISTINCT attr_key, 'str' as type, timestamp - FROM {str_data_source.get_table_name()} - WHERE organization_id={request.meta.organization_id} - AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)}) - AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds}) - - UNION ALL - - SELECT DISTINCT attr_key, 'num' as type, timestamp - FROM {num_data_source.get_table_name()} - WHERE organization_id={request.meta.organization_id} - AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)}) - AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds}) -) +SELECT DISTINCT attr_key, timestamp +FROM {data_source.get_table_name()} +WHERE organization_id={request.meta.organization_id} +AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)}) +AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds}) ORDER BY attr_key LIMIT {request.limit} OFFSET {request.offset} """ - cluster = str_storage.get_cluster() + cluster = storage.get_cluster() reader = cluster.get_reader() result = reader.execute(FormattedQuery([StringNode(query)])) - tags: List[TagsListResponse.Tag] = [] + tags: List[TraceItemAttributesResponse.Tag] = [] for row in result.get("data", []): tags.append( - TagsListResponse.Tag( + TraceItemAttributesResponse.Tag( name=row["attr_key"], - type={ - "str": TagsListResponse.TYPE_STRING, - "num": TagsListResponse.TYPE_NUMBER, - }[row["type"]], + type=request.type, ) ) - return TagsListResponse(tags=tags) + return TraceItemAttributesResponse(tags=tags) diff --git a/snuba/web/views.py b/snuba/web/views.py index 088a9f9e5d..720f700123 100644 --- a/snuba/web/views.py +++ b/snuba/web/views.py @@ -40,7 +40,9 @@ AggregateBucketRequest, ) from sentry_protos.snuba.v1alpha.endpoint_span_samples_pb2 import SpanSamplesRequest -from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import TagsListRequest +from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( + TraceItemAttributesRequest, +) from werkzeug import Response as WerkzeugResponse from werkzeug.exceptions import InternalServerError @@ -288,7 +290,7 @@ def rpc(*, name: str, timer: Timer) -> Response: ] = { "AggregateBucketRequest": (timeseries_query, AggregateBucketRequest), "SpanSamplesRequest": (span_samples_query, SpanSamplesRequest), - "TagsListRequest": (tags_list_query, TagsListRequest), + "TraceItemAttributesRequest": (tags_list_query, TraceItemAttributesRequest), } try: endpoint, req_class = rpcs[name] diff --git a/tests/web/rpc/test_span_samples.py b/tests/web/rpc/test_span_samples.py index 6c19a1773b..1e70c69ceb 100644 --- a/tests/web/rpc/test_span_samples.py +++ b/tests/web/rpc/test_span_samples.py @@ -263,7 +263,7 @@ def test_with_virtual_columns(self, setup_teardown: Any) -> None: ) ], limit=61, - virtual_column_context=[ + virtual_column_contexts=[ VirtualColumnContext( from_column_name="project_id", to_column_name="project_name", @@ -317,7 +317,7 @@ def test_order_by_virtual_columns(self, setup_teardown: Any) -> None: ) ], limit=61, - virtual_column_context=[ + virtual_column_contexts=[ VirtualColumnContext( from_column_name="color", to_column_name="special_color", diff --git a/tests/web/rpc/test_tags_list.py b/tests/web/rpc/test_tags_list.py index bb826ac7ba..ff353fb9af 100644 --- a/tests/web/rpc/test_tags_list.py +++ b/tests/web/rpc/test_tags_list.py @@ -5,10 +5,11 @@ import pytest from google.protobuf.timestamp_pb2 import Timestamp from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( - TagsListRequest, - TagsListResponse, + TraceItemAttributesRequest, + TraceItemAttributesResponse, ) from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey @@ -63,11 +64,11 @@ def setup_teardown(clickhouse_db: None, redis_db: None) -> None: @pytest.mark.clickhouse_db @pytest.mark.redis_db -class TestTagsList(BaseApiTest): +class TestTraceItemAttributes(BaseApiTest): def test_basic(self) -> None: ts = Timestamp() ts.GetCurrentTime() - message = TagsListRequest( + message = TraceItemAttributesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -98,12 +99,12 @@ def test_basic(self) -> None: offset=20, ) response = self.app.post( - "/rpc/TagsListRequest", data=message.SerializeToString() + "/rpc/TraceItemAttributesRequest", data=message.SerializeToString() ) assert response.status_code == 200 - def test_simple_case(self, setup_teardown: Any) -> None: - message = TagsListRequest( + def test_simple_case_str(self, setup_teardown: Any) -> None: + message = TraceItemAttributesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -132,17 +133,58 @@ def test_simple_case(self, setup_teardown: Any) -> None: ), limit=10, offset=0, + type=AttributeKey.Type.TYPE_STRING, ) response = tags_list_query(message) assert response.tags == [ - TagsListResponse.Tag( - name=f"a_tag_{i:03}", type=TagsListResponse.TYPE_STRING + TraceItemAttributesResponse.Tag( + name=f"a_tag_{i:03}", type=AttributeKey.Type.TYPE_STRING + ) + for i in range(0, 10) + ] + + def test_simple_case_float(self, setup_teardown: Any) -> None: + message = TraceItemAttributesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp( + seconds=int( + datetime( + year=BASE_TIME.year, + month=BASE_TIME.month, + day=BASE_TIME.day - 1, + tzinfo=UTC, + ).timestamp() + ) + ), + end_timestamp=Timestamp( + seconds=int( + datetime( + year=BASE_TIME.year, + month=BASE_TIME.month, + day=BASE_TIME.day + 1, + tzinfo=UTC, + ).timestamp() + ) + ), + ), + limit=10, + offset=0, + type=AttributeKey.Type.TYPE_FLOAT, + ) + response = tags_list_query(message) + assert response.tags == [ + TraceItemAttributesResponse.Tag( + name=f"b_measurement_{i:03}", type=AttributeKey.Type.TYPE_FLOAT ) for i in range(0, 10) ] def test_with_offset(self, setup_teardown: Any) -> None: - message = TagsListRequest( + message = TraceItemAttributesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -170,21 +212,24 @@ def test_with_offset(self, setup_teardown: Any) -> None: ), ), limit=5, - offset=29, + offset=10, + type=AttributeKey.Type.TYPE_FLOAT, ) response = tags_list_query(message) assert response.tags == [ - TagsListResponse.Tag(name="a_tag_029", type=TagsListResponse.TYPE_STRING), - TagsListResponse.Tag( - name="b_measurement_000", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_010", type=AttributeKey.Type.TYPE_FLOAT + ), + TraceItemAttributesResponse.Tag( + name="b_measurement_011", type=AttributeKey.Type.TYPE_FLOAT ), - TagsListResponse.Tag( - name="b_measurement_001", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_012", type=AttributeKey.Type.TYPE_FLOAT ), - TagsListResponse.Tag( - name="b_measurement_002", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_013", type=AttributeKey.Type.TYPE_FLOAT ), - TagsListResponse.Tag( - name="b_measurement_003", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_014", type=AttributeKey.Type.TYPE_FLOAT ), ] From d789447ee79e4fd64b120d9859cbe17530bfd165 Mon Sep 17 00:00:00 2001 From: Riya Chakraborty <47572810+ayirr7@users.noreply.github.com> Date: Fri, 13 Sep 2024 10:45:53 -0700 Subject: [PATCH 09/23] metric(consumer): Add a metric to track the size of individual spans (#6300) We only have this metric for generic metrics but it would be useful to have for spans to potentially help understand memory profiles of the spans consumer. --- rust_snuba/src/processors/spans.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust_snuba/src/processors/spans.rs b/rust_snuba/src/processors/spans.rs index 49a1f7e398..eefe8a11a1 100644 --- a/rust_snuba/src/processors/spans.rs +++ b/rust_snuba/src/processors/spans.rs @@ -3,6 +3,7 @@ use std::str::FromStr; use anyhow::Context; use chrono::DateTime; +use rust_arroyo::timer; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -22,6 +23,8 @@ pub fn process_message( let payload_bytes = payload.payload().context("Expected payload")?; let msg: FromSpanMessage = serde_json::from_slice(payload_bytes)?; + timer!("spans.messages.size", payload_bytes.len() as f64); + let origin_timestamp = DateTime::from_timestamp(msg.received as i64, 0); let mut span: Span = msg.try_into()?; From 68b4ebd4e9eda0e57e164b0e0b20007b9cfee1c2 Mon Sep 17 00:00:00 2001 From: davidtsuk <132949946+davidtsuk@users.noreply.github.com> Date: Fri, 13 Sep 2024 11:52:52 -0700 Subject: [PATCH 10/23] Update migrations list command to show migrations that no longer exist in the codebase (#6299) Fixes #2159 Running `snuba migrations list` will now show migrations that are in clickhouse but no longer in the codebase (e.g. because snuba was downgraded). # Testing Deleted migration in codebase to check that it shows up in the list. image --- snuba/admin/clickhouse/migration_checks.py | 2 +- snuba/admin/views.py | 2 +- snuba/cli/migrations.py | 10 ++++--- snuba/migrations/runner.py | 27 ++++++++++++++++--- .../test_migration_checks.py | 17 +++++++----- tests/migrations/test_runner.py | 20 ++++++++++++++ 6 files changed, 63 insertions(+), 15 deletions(-) diff --git a/snuba/admin/clickhouse/migration_checks.py b/snuba/admin/clickhouse/migration_checks.py index 0439d56956..8fdd9e495b 100644 --- a/snuba/admin/clickhouse/migration_checks.py +++ b/snuba/admin/clickhouse/migration_checks.py @@ -100,7 +100,7 @@ def __init__( ).get_migrations() migration_statuses = {} - for migration_id, status, _ in migrations: + for migration_id, status, _, _ in migrations: migration_statuses[migration_id] = { "migration_id": migration_id, "status": status, diff --git a/snuba/admin/views.py b/snuba/admin/views.py index 91136e1646..9942b55419 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -208,7 +208,7 @@ def migrations_groups_list(group: str) -> Response: "status": status.value, "blocking": blocking, } - for migration_id, status, blocking in runner_group_migrations + for migration_id, status, blocking, _ in runner_group_migrations ] ), 200, diff --git a/snuba/cli/migrations.py b/snuba/cli/migrations.py index 95d180cfaa..bacb266dbe 100644 --- a/snuba/cli/migrations.py +++ b/snuba/cli/migrations.py @@ -37,10 +37,10 @@ def list() -> None: setup_logging() check_clickhouse_connections(CLUSTERS) runner = Runner() - for group, group_migrations in runner.show_all(): + for group, group_migrations in runner.show_all(include_nonexistent=True): readiness_state = get_group_readiness_state(group) click.echo(f"{group.value} (readiness_state: {readiness_state.value})") - for migration_id, status, blocking in group_migrations: + for migration_id, status, blocking, existing in group_migrations: symbol = { Status.COMPLETED: "X", Status.NOT_STARTED: " ", @@ -53,7 +53,11 @@ def list() -> None: if status != Status.COMPLETED and blocking: blocking_text = " (blocking)" - click.echo(f"[{symbol}] {migration_id}{in_progress_text}{blocking_text}") + existing_text = "" if existing else " (this migration no longer exists)" + + click.echo( + f"[{symbol}] {migration_id}{in_progress_text}{blocking_text}{existing_text}" + ) click.echo() diff --git a/snuba/migrations/runner.py b/snuba/migrations/runner.py index 445e5f4ee6..82a970a3b6 100644 --- a/snuba/migrations/runner.py +++ b/snuba/migrations/runner.py @@ -1,3 +1,4 @@ +from collections import defaultdict from datetime import datetime from functools import partial from typing import List, Mapping, MutableMapping, NamedTuple, Optional, Sequence, Tuple @@ -64,6 +65,7 @@ class MigrationDetails(NamedTuple): migration_id: str status: Status blocking: bool + exists: bool class Runner: @@ -133,7 +135,7 @@ def force_overwrite_status( ) def show_all( - self, groups: Optional[Sequence[str]] = None + self, groups: Optional[Sequence[str]] = None, include_nonexistent: bool = False ) -> List[Tuple[MigrationGroup, List[MigrationDetails]]]: """ Returns the list of migrations and their statuses for each group. @@ -148,6 +150,9 @@ def show_all( migration_groups = get_active_migration_groups() migration_status = self._get_migration_status(migration_groups) + clickhouse_group_migrations = defaultdict(set) + for group, migration_id in migration_status.keys(): + clickhouse_group_migrations[group].add(migration_id) def get_status(migration_key: MigrationKey) -> Status: return migration_status.get(migration_key, Status.NOT_STARTED) @@ -156,15 +161,31 @@ def get_status(migration_key: MigrationKey) -> Status: group_migrations: List[MigrationDetails] = [] group_loader = get_group_loader(group) - for migration_id in group_loader.get_migrations(): + migration_ids = group_loader.get_migrations() + for migration_id in migration_ids: migration_key = MigrationKey(group, migration_id) migration = group_loader.load_migration(migration_id) group_migrations.append( MigrationDetails( - migration_id, get_status(migration_key), migration.blocking + migration_id, + get_status(migration_key), + migration.blocking, + True, ) ) + if include_nonexistent: + non_existing_migrations = clickhouse_group_migrations.get( + group, set() + ).difference(set(migration_ids)) + for migration_id in non_existing_migrations: + migration_key = MigrationKey(group, migration_id) + group_migrations.append( + MigrationDetails( + migration_id, get_status(migration_key), False, False + ) + ) + migrations.append((group, group_migrations)) return migrations diff --git a/tests/admin/clickhouse_migrations/test_migration_checks.py b/tests/admin/clickhouse_migrations/test_migration_checks.py index 7ad0790759..198ef35dca 100644 --- a/tests/admin/clickhouse_migrations/test_migration_checks.py +++ b/tests/admin/clickhouse_migrations/test_migration_checks.py @@ -29,9 +29,9 @@ def group_loader() -> GroupLoader: RUN_MIGRATIONS: Sequence[MigrationDetails] = [ - MigrationDetails("0001", Status.COMPLETED, True), - MigrationDetails("0002", Status.NOT_STARTED, True), - MigrationDetails("0003", Status.NOT_STARTED, True), + MigrationDetails("0001", Status.COMPLETED, True, True), + MigrationDetails("0002", Status.NOT_STARTED, True, True), + MigrationDetails("0003", Status.NOT_STARTED, True, True), ] @@ -62,9 +62,9 @@ def test_status_checker_run( REVERSE_MIGRATIONS: Sequence[MigrationDetails] = [ - MigrationDetails("0001", Status.COMPLETED, True), - MigrationDetails("0002", Status.IN_PROGRESS, True), - MigrationDetails("0003", Status.NOT_STARTED, True), + MigrationDetails("0001", Status.COMPLETED, True, True), + MigrationDetails("0002", Status.IN_PROGRESS, True, True), + MigrationDetails("0003", Status.NOT_STARTED, True, True), ] @@ -155,7 +155,10 @@ def test_run_migration_checks_and_policies( mock_policy = Mock() checker = mock_checker() mock_runner.show_all.return_value = [ - (MigrationGroup("events"), [MigrationDetails("0001", Status.COMPLETED, True)]) + ( + MigrationGroup("events"), + [MigrationDetails("0001", Status.COMPLETED, True, True)], + ) ] mock_policy.can_run.return_value = policy_result[0] diff --git a/tests/migrations/test_runner.py b/tests/migrations/test_runner.py index 44d096f014..6b1360dcf5 100644 --- a/tests/migrations/test_runner.py +++ b/tests/migrations/test_runner.py @@ -110,6 +110,26 @@ def test_show_all_for_groups() -> None: assert all([migration.status == Status.COMPLETED for migration in migrations]) +@pytest.mark.clickhouse_db +def test_show_all_nonexistent_migration() -> None: + runner = Runner() + assert all( + [ + migration.status == Status.NOT_STARTED + for (_, group_migrations) in runner.show_all() + for migration in group_migrations + ] + ) + runner.run_all(force=True) + assert all( + [ + migration.status == Status.COMPLETED + for (_, group_migrations) in runner.show_all() + for migration in group_migrations + ] + ) + + @pytest.mark.clickhouse_db def test_run_migration() -> None: runner = Runner() From 530bd23d39ca27a2f1c6e5856c1128f607118bd4 Mon Sep 17 00:00:00 2001 From: getsentry-bot Date: Sun, 15 Sep 2024 17:05:10 +0000 Subject: [PATCH 11/23] release: 24.9.0 --- CHANGELOG.md | 31 +++++++++++++++++++++++++++++++ docs/source/conf.py | 2 +- setup.py | 2 +- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b20e80ed1d..6f3fa97afd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,36 @@ # Changelog +## 24.9.0 + +### Various fixes & improvements + +- Update migrations list command to show migrations that no longer exist in the codebase (#6299) by @davidtsuk +- metric(consumer): Add a metric to track the size of individual spans (#6300) by @ayirr7 +- feat(rpc): Update tags list rpc (#6301) by @Zylphrex +- feat(eap): add virtual column support (#6292) by @volokluev +- tweak(eap): Allow more memory usage for eap spans (#6298) by @volokluev +- ref(doc): add documentation for the ReadinessState enum (#6295) by @viglia +- feat(eap): Start ingesting data into sample_weight_2 column (#6290) by @colin-sentry +- Update docker entrypoint to run heaptrack (#6273) by @ayirr7 +- fix(eap): Switch to sampling_weight_2 in entity (#6287) by @colin-sentry +- bug(query): Run entity validators in composite query pipeline (#6285) by @enochtangg +- feat(eap): make mapContains work with EAP dataset (#6284) by @colin-sentry +- feat(job-runner): create a new `snuba jobs` command (#6281) by @xurui-c +- feat(eap): Shard meta tables by trace ID (#6286) by @colin-sentry +- fix(eap): Make span_id be returned as a string correctly (#6283) by @colin-sentry +- feat(job-runner): scaffolding for job manifest testing (#6282) by @onewland +- bug(admin): Fix invalid query error alerting in snuba admin (#6280) by @enochtangg +- Fixing Snuba Admin trace UI error. (#6278) by @nachivrn +- feat(eap): Add a processor that allows you to do mapKeys on attr_str (#6277) by @colin-sentry +- cleanup(capman): remove legacy table rate limits (#6274) by @volokluev +- Fixing Snuba Admin trace UI error. (#6276) by @nachivrn +- hackweek(snuba-admin): MQL query tool (#6235) by @enochtangg +- feat(eap): Endpoint to get the tags available for a project (#6270) by @colin-sentry +- feat(sudo): issue slack notifications when sudo mode is used (#6271) by @volokluev +- chore(eap): Add entities and storages for EAP span meta tables (#6269) by @colin-sentry + +_Plus 60 more_ + ## 24.8.0 ### Various fixes & improvements diff --git a/docs/source/conf.py b/docs/source/conf.py index 993e13ed5e..be9c625c30 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -7,7 +7,7 @@ copyright = "2021, Sentry Team and Contributors" author = "Sentry Team and Contributors" -release = "24.9.0.dev0" +release = "24.9.0" # -- General configuration --------------------------------------------------- diff --git a/setup.py b/setup.py index 245e9bb196..c45faef3f9 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import find_packages, setup -VERSION = "24.9.0.dev0" +VERSION = "24.9.0" def get_requirements() -> Sequence[str]: From acbd3e0ac02270ced8b01ea8a53ac71b9dc88fc0 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 16 Sep 2024 17:34:18 +0200 Subject: [PATCH 12/23] fix(admin): Print SENTRY_ENVIRONMENT to slack (#6307) The slack messages already contain the admin URL which identifies the region, but let's print it explicitly as well. SENTRY_ENVIRONMENT corresponds to the region. --- snuba/admin/notifications/slack/utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/snuba/admin/notifications/slack/utils.py b/snuba/admin/notifications/slack/utils.py index 037f0036c4..ae817eb2e2 100644 --- a/snuba/admin/notifications/slack/utils.py +++ b/snuba/admin/notifications/slack/utils.py @@ -1,3 +1,4 @@ +import os from typing import Any, Dict, List, Optional, Union from snuba import settings @@ -95,12 +96,13 @@ def build_context( user: str, timestamp: str, action: AuditLogAction ) -> Dict[str, Union[str, List[Dict[str, str]]]]: url = f"{settings.ADMIN_URL}/#auditlog" + environ = os.environ.get("SENTRY_ENVIRONMENT") or "unknown environment" return { "type": "context", "elements": [ { "type": "mrkdwn", - "text": f"{action.value} at *<{url}|{timestamp}>* by *<{user}>*", + "text": f"{action.value} at *<{url}|{timestamp}>* by *<{user}>* in *<{environ}>*", } ], } From 10350a1c51671cdd54edf62bfb1f6d7d2e3ec15d Mon Sep 17 00:00:00 2001 From: getsentry-bot Date: Mon, 16 Sep 2024 22:04:04 +0000 Subject: [PATCH 13/23] meta: Bump new development version --- docs/source/conf.py | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index be9c625c30..06531023f6 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -7,7 +7,7 @@ copyright = "2021, Sentry Team and Contributors" author = "Sentry Team and Contributors" -release = "24.9.0" +release = "24.10.0.dev0" # -- General configuration --------------------------------------------------- diff --git a/setup.py b/setup.py index c45faef3f9..2f4325659c 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import find_packages, setup -VERSION = "24.9.0" +VERSION = "24.10.0.dev0" def get_requirements() -> Sequence[str]: From c0e26e3bae0c99ba2b9bc5664dcce3d3503c706d Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Mon, 16 Sep 2024 15:28:31 -0700 Subject: [PATCH 14/23] fix(job-runner): update job manifest filename to match ops repo (#6310) I used a different filename in our repository that seeds the jobs manifest with real data. I'm updating this filename to match it because we like it better. --- snuba/manual_jobs/{run_manifest.json => job_manifest.json} | 0 snuba/manual_jobs/manifest_reader.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename snuba/manual_jobs/{run_manifest.json => job_manifest.json} (100%) diff --git a/snuba/manual_jobs/run_manifest.json b/snuba/manual_jobs/job_manifest.json similarity index 100% rename from snuba/manual_jobs/run_manifest.json rename to snuba/manual_jobs/job_manifest.json diff --git a/snuba/manual_jobs/manifest_reader.py b/snuba/manual_jobs/manifest_reader.py index 9756f0c5d2..ace21e548b 100644 --- a/snuba/manual_jobs/manifest_reader.py +++ b/snuba/manual_jobs/manifest_reader.py @@ -15,4 +15,4 @@ def read(filename: str) -> Sequence[Any]: def read_jobs_manifest() -> Sequence[Any]: - return _ManifestReader.read("run_manifest.json") + return _ManifestReader.read("job_manifest.json") From 757753486e60b73de0b7a9e8ff2551c0c1a1409a Mon Sep 17 00:00:00 2001 From: Onkar Deshpande Date: Mon, 16 Sep 2024 16:14:35 -0700 Subject: [PATCH 15/23] Specify UTC timezone argument in datetime.now() and fix unit tests (#6279) While running unit tests locally using `make test`, I saw these failures: ``` =========================================================================================================== short test summary info =========================================================================================================== FAILED tests/clickhouse/optimize/test_optimize.py::test_optimize_partitions_raises_exception_with_cutoff_time - Failed: DID NOT RAISE FAILED tests/clickhouse/optimize/test_optimize_scheduler.py::test_get_next_schedule[non parallel] - assert OptimizationSchedule(partitions_groups=[["(90,'2022-06-27')",\n "(90,'2022-06-20')",\n ... FAILED tests/clickhouse/optimize/test_optimize_scheduler.py::test_get_next_schedule[parallel before final cutoff] - assert OptimizationSchedule(partitions_groups=[["(90,'2022-03-28')",\n "(90,'202... FAILED tests/clickhouse/optimize/test_optimize_scheduler.py::test_get_next_schedule_raises_exception - Failed: DID NOT RAISE FAILED tests/subscriptions/test_scheduler_consumer.py::test_tick_time_shift - AssertionError: assert Tick(partition=0, offsets=Interval(lower=0, upper=1), timestamps=Interval(lower=86400.0, upper=172800.0)) == Tick(partition=0, offsets=... ========================================================================== 5 failed, 2601 passed, 4 skipped, 2 deselected, 1 xfailed, 6 xpassed in 430.69s (0:07:10) ========================================================================== ``` The reason for failure is that some of these tests use [time_machine's context manager](https://github.com/adamchainz/time-machine?tab=readme-ov-file#context-manager), which expects the time in UTC. When these tests are run locally in PST, there is a difference of 7 hours that never triggers the cutoff time expiry. ``` >>> from datetime import datetime, timezone >>> datetime.now() datetime.datetime(2024, 9, 9, 11, 51, 15, 279890) >>> datetime.now(timezone.utc) datetime.datetime(2024, 9, 9, 18, 51, 26, 40580, tzinfo=datetime.timezone.utc) ``` The fix is to use `datetime`'s `timezone.utc` instead of `datetime.now()`. `Datetime` also used to have `datetime.utcnow()` but it has been deprecated. Note: I also fixed some non-test code, so I would appreciate reviews from experts in those areas. --- snuba/clickhouse/optimize/optimize.py | 4 ++-- snuba/clickhouse/optimize/optimize_scheduler.py | 10 +++++----- snuba/settings/settings_test.py | 2 +- tests/clickhouse/optimize/test_optimize_scheduler.py | 4 ++-- tests/clickhouse/optimize/test_optimize_tracker.py | 10 +++++----- tests/subscriptions/test_scheduler_consumer.py | 9 ++++++--- 6 files changed, 21 insertions(+), 18 deletions(-) diff --git a/snuba/clickhouse/optimize/optimize.py b/snuba/clickhouse/optimize/optimize.py index 35076d77a0..6efb417a1d 100644 --- a/snuba/clickhouse/optimize/optimize.py +++ b/snuba/clickhouse/optimize/optimize.py @@ -5,7 +5,7 @@ import time from collections import deque from concurrent.futures import Future, ThreadPoolExecutor -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Any, Mapping, Optional, Sequence import structlog @@ -347,7 +347,7 @@ def optimize_partitions( """ for partition in partitions: - if cutoff_time is not None and datetime.now() > cutoff_time: + if cutoff_time is not None and datetime.now(timezone.utc) > cutoff_time: logger.info( f"Optimize job is running past provided cutoff time" f" {cutoff_time}. Cancelling.", diff --git a/snuba/clickhouse/optimize/optimize_scheduler.py b/snuba/clickhouse/optimize/optimize_scheduler.py index 9e8c18b90a..9c07326181 100644 --- a/snuba/clickhouse/optimize/optimize_scheduler.py +++ b/snuba/clickhouse/optimize/optimize_scheduler.py @@ -1,6 +1,6 @@ import re from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import MutableSequence, Sequence from snuba import settings @@ -42,9 +42,9 @@ class OptimizeScheduler: def __init__(self, default_parallel_threads: int) -> None: self.__default_parallel_threads = default_parallel_threads - self.__last_midnight = (datetime.now() + timedelta(minutes=10)).replace( - hour=0, minute=0, second=0, microsecond=0 - ) + self.__last_midnight = ( + datetime.now(timezone.utc) + timedelta(minutes=10) + ).replace(hour=0, minute=0, second=0, microsecond=0) self.__parallel_start_time = self.__last_midnight + timedelta( hours=settings.PARALLEL_OPTIMIZE_JOB_START_TIME ) @@ -95,7 +95,7 @@ def get_next_schedule(self, partitions: Sequence[str]) -> OptimizationSchedule: reached. """ num_threads = get_num_threads(self.__default_parallel_threads) - current_time = datetime.now() + current_time = datetime.now(timezone.utc) if current_time >= self.__full_job_end_time: raise OptimizedSchedulerTimeout( f"Optimize job cutoff time exceeded " diff --git a/snuba/settings/settings_test.py b/snuba/settings/settings_test.py index c59bb3b37e..35acec35ae 100644 --- a/snuba/settings/settings_test.py +++ b/snuba/settings/settings_test.py @@ -40,7 +40,7 @@ ENFORCE_RETENTION = True # Ignore optimize job cut off time for tests -OPTIMIZE_JOB_CUTOFF_TIME = 24 +OPTIMIZE_JOB_CUTOFF_TIME = 23 OPTIMIZE_PARALLEL_MAX_JITTER_MINUTES = 0 diff --git a/tests/clickhouse/optimize/test_optimize_scheduler.py b/tests/clickhouse/optimize/test_optimize_scheduler.py index d68947cef3..6375adf290 100644 --- a/tests/clickhouse/optimize/test_optimize_scheduler.py +++ b/tests/clickhouse/optimize/test_optimize_scheduler.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Sequence import pytest @@ -132,7 +132,7 @@ def test_subdivide_partitions( ) -last_midnight = (datetime.now() + timedelta(minutes=10)).replace( +last_midnight = (datetime.now(timezone.utc) + timedelta(minutes=10)).replace( hour=0, minute=0, second=0, microsecond=0 ) diff --git a/tests/clickhouse/optimize/test_optimize_tracker.py b/tests/clickhouse/optimize/test_optimize_tracker.py index c5796dc19a..faf47cad2e 100644 --- a/tests/clickhouse/optimize/test_optimize_tracker.py +++ b/tests/clickhouse/optimize/test_optimize_tracker.py @@ -1,6 +1,6 @@ import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Optional, Set from unittest.mock import call, patch @@ -136,11 +136,11 @@ def write_error_message(writable_storage: WritableTableStorage, time: int) -> No for week in range(0, 4): write_error_message( writable_storage=storage, - time=int((datetime.now() - timedelta(weeks=week)).timestamp()), + time=int((datetime.now(timezone.utc) - timedelta(weeks=week)).timestamp()), ) write_error_message( writable_storage=storage, - time=int((datetime.now() - timedelta(weeks=week)).timestamp()), + time=int((datetime.now(timezone.utc) - timedelta(weeks=week)).timestamp()), ) partitions = optimize.get_partitions_to_optimize( @@ -227,11 +227,11 @@ def write_error_message(writable_storage: WritableTableStorage, time: int) -> No for week in range(0, 4): write_error_message( writable_storage=storage, - time=int((datetime.now() - timedelta(weeks=week)).timestamp()), + time=int((datetime.now(timezone.utc) - timedelta(weeks=week)).timestamp()), ) write_error_message( writable_storage=storage, - time=int((datetime.now() - timedelta(weeks=week)).timestamp()), + time=int((datetime.now(timezone.utc) - timedelta(weeks=week)).timestamp()), ) partitions = optimize.get_partitions_to_optimize( diff --git a/tests/subscriptions/test_scheduler_consumer.py b/tests/subscriptions/test_scheduler_consumer.py index ecb7ad21df..8432657a4c 100644 --- a/tests/subscriptions/test_scheduler_consumer.py +++ b/tests/subscriptions/test_scheduler_consumer.py @@ -2,7 +2,7 @@ import logging import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Any, Mapping, Optional from unittest import mock @@ -148,7 +148,10 @@ def test_tick_time_shift() -> None: assert tick.time_shift(timedelta(hours=24).total_seconds()) == Tick( partition, offsets, - Interval(datetime(1970, 1, 2).timestamp(), datetime(1970, 1, 3).timestamp()), + Interval( + datetime(1970, 1, 2, tzinfo=timezone.utc).timestamp(), + datetime(1970, 1, 3, tzinfo=timezone.utc).timestamp(), + ), ) @@ -437,7 +440,7 @@ def test_invalid_commit_log_message(caplog: Any) -> None: "orig_message_ts", ) - now = datetime.now() + now = datetime.now(timezone.utc) def _assignment_callback(offsets: Mapping[Partition, int]) -> None: assert inner_consumer.tell() == {partition: 0} From f506b7d38eb17a1ff309d6d3c2b370cdb294b691 Mon Sep 17 00:00:00 2001 From: Onkar Deshpande Date: Mon, 16 Sep 2024 17:10:46 -0700 Subject: [PATCH 16/23] Update github @actions/upload-artifact to v4 (#6311) After a PR is merged, the github actions is failing in `docs.yml` - https://github.com/getsentry/snuba/actions/workflows/docs.yml. The failure is because we are using deprecated upload artifact v2 instead of the latest and greatest [v4](https://github.com/actions/upload-artifact?tab=readme-ov-file#v4---whats-new). --- .github/workflows/docs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 3c5f822556..beb953664e 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -30,7 +30,7 @@ jobs: force_orphan: true - name: Archive Docs - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: docs path: docs/build From 6b5efd64bb903f88b153f1814e91a52f826dc77b Mon Sep 17 00:00:00 2001 From: Onkar Deshpande Date: Mon, 16 Sep 2024 18:47:13 -0700 Subject: [PATCH 17/23] =?UTF-8?q?Revert=20"Specify=20UTC=20timezone=20argu?= =?UTF-8?q?ment=20in=20datetime.now()=20and=20fix=20unit=20=E2=80=A6=20(#6?= =?UTF-8?q?312)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …tests (#6279)" This reverts commit 757753486e60b73de0b7a9e8ff2551c0c1a1409a. Surprisingly, the commit runs fine locally and even in the CI but I am seeing these failures: - https://github.com/getsentry/snuba/actions/runs/10893514288/job/30228865217 - https://github.com/getsentry/snuba/actions/runs/10894057864/job/30230460999 Reverting this commit to unblock deploy-snuba-s4s. --- snuba/clickhouse/optimize/optimize.py | 4 ++-- snuba/clickhouse/optimize/optimize_scheduler.py | 10 +++++----- snuba/settings/settings_test.py | 2 +- tests/clickhouse/optimize/test_optimize_scheduler.py | 4 ++-- tests/clickhouse/optimize/test_optimize_tracker.py | 10 +++++----- tests/subscriptions/test_scheduler_consumer.py | 9 +++------ 6 files changed, 18 insertions(+), 21 deletions(-) diff --git a/snuba/clickhouse/optimize/optimize.py b/snuba/clickhouse/optimize/optimize.py index 6efb417a1d..35076d77a0 100644 --- a/snuba/clickhouse/optimize/optimize.py +++ b/snuba/clickhouse/optimize/optimize.py @@ -5,7 +5,7 @@ import time from collections import deque from concurrent.futures import Future, ThreadPoolExecutor -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from typing import Any, Mapping, Optional, Sequence import structlog @@ -347,7 +347,7 @@ def optimize_partitions( """ for partition in partitions: - if cutoff_time is not None and datetime.now(timezone.utc) > cutoff_time: + if cutoff_time is not None and datetime.now() > cutoff_time: logger.info( f"Optimize job is running past provided cutoff time" f" {cutoff_time}. Cancelling.", diff --git a/snuba/clickhouse/optimize/optimize_scheduler.py b/snuba/clickhouse/optimize/optimize_scheduler.py index 9c07326181..9e8c18b90a 100644 --- a/snuba/clickhouse/optimize/optimize_scheduler.py +++ b/snuba/clickhouse/optimize/optimize_scheduler.py @@ -1,6 +1,6 @@ import re from dataclasses import dataclass -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from typing import MutableSequence, Sequence from snuba import settings @@ -42,9 +42,9 @@ class OptimizeScheduler: def __init__(self, default_parallel_threads: int) -> None: self.__default_parallel_threads = default_parallel_threads - self.__last_midnight = ( - datetime.now(timezone.utc) + timedelta(minutes=10) - ).replace(hour=0, minute=0, second=0, microsecond=0) + self.__last_midnight = (datetime.now() + timedelta(minutes=10)).replace( + hour=0, minute=0, second=0, microsecond=0 + ) self.__parallel_start_time = self.__last_midnight + timedelta( hours=settings.PARALLEL_OPTIMIZE_JOB_START_TIME ) @@ -95,7 +95,7 @@ def get_next_schedule(self, partitions: Sequence[str]) -> OptimizationSchedule: reached. """ num_threads = get_num_threads(self.__default_parallel_threads) - current_time = datetime.now(timezone.utc) + current_time = datetime.now() if current_time >= self.__full_job_end_time: raise OptimizedSchedulerTimeout( f"Optimize job cutoff time exceeded " diff --git a/snuba/settings/settings_test.py b/snuba/settings/settings_test.py index 35acec35ae..c59bb3b37e 100644 --- a/snuba/settings/settings_test.py +++ b/snuba/settings/settings_test.py @@ -40,7 +40,7 @@ ENFORCE_RETENTION = True # Ignore optimize job cut off time for tests -OPTIMIZE_JOB_CUTOFF_TIME = 23 +OPTIMIZE_JOB_CUTOFF_TIME = 24 OPTIMIZE_PARALLEL_MAX_JITTER_MINUTES = 0 diff --git a/tests/clickhouse/optimize/test_optimize_scheduler.py b/tests/clickhouse/optimize/test_optimize_scheduler.py index 6375adf290..d68947cef3 100644 --- a/tests/clickhouse/optimize/test_optimize_scheduler.py +++ b/tests/clickhouse/optimize/test_optimize_scheduler.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from typing import Sequence import pytest @@ -132,7 +132,7 @@ def test_subdivide_partitions( ) -last_midnight = (datetime.now(timezone.utc) + timedelta(minutes=10)).replace( +last_midnight = (datetime.now() + timedelta(minutes=10)).replace( hour=0, minute=0, second=0, microsecond=0 ) diff --git a/tests/clickhouse/optimize/test_optimize_tracker.py b/tests/clickhouse/optimize/test_optimize_tracker.py index faf47cad2e..c5796dc19a 100644 --- a/tests/clickhouse/optimize/test_optimize_tracker.py +++ b/tests/clickhouse/optimize/test_optimize_tracker.py @@ -1,6 +1,6 @@ import time import uuid -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from typing import Optional, Set from unittest.mock import call, patch @@ -136,11 +136,11 @@ def write_error_message(writable_storage: WritableTableStorage, time: int) -> No for week in range(0, 4): write_error_message( writable_storage=storage, - time=int((datetime.now(timezone.utc) - timedelta(weeks=week)).timestamp()), + time=int((datetime.now() - timedelta(weeks=week)).timestamp()), ) write_error_message( writable_storage=storage, - time=int((datetime.now(timezone.utc) - timedelta(weeks=week)).timestamp()), + time=int((datetime.now() - timedelta(weeks=week)).timestamp()), ) partitions = optimize.get_partitions_to_optimize( @@ -227,11 +227,11 @@ def write_error_message(writable_storage: WritableTableStorage, time: int) -> No for week in range(0, 4): write_error_message( writable_storage=storage, - time=int((datetime.now(timezone.utc) - timedelta(weeks=week)).timestamp()), + time=int((datetime.now() - timedelta(weeks=week)).timestamp()), ) write_error_message( writable_storage=storage, - time=int((datetime.now(timezone.utc) - timedelta(weeks=week)).timestamp()), + time=int((datetime.now() - timedelta(weeks=week)).timestamp()), ) partitions = optimize.get_partitions_to_optimize( diff --git a/tests/subscriptions/test_scheduler_consumer.py b/tests/subscriptions/test_scheduler_consumer.py index 8432657a4c..ecb7ad21df 100644 --- a/tests/subscriptions/test_scheduler_consumer.py +++ b/tests/subscriptions/test_scheduler_consumer.py @@ -2,7 +2,7 @@ import logging import time import uuid -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from typing import Any, Mapping, Optional from unittest import mock @@ -148,10 +148,7 @@ def test_tick_time_shift() -> None: assert tick.time_shift(timedelta(hours=24).total_seconds()) == Tick( partition, offsets, - Interval( - datetime(1970, 1, 2, tzinfo=timezone.utc).timestamp(), - datetime(1970, 1, 3, tzinfo=timezone.utc).timestamp(), - ), + Interval(datetime(1970, 1, 2).timestamp(), datetime(1970, 1, 3).timestamp()), ) @@ -440,7 +437,7 @@ def test_invalid_commit_log_message(caplog: Any) -> None: "orig_message_ts", ) - now = datetime.now(timezone.utc) + now = datetime.now() def _assignment_callback(offsets: Mapping[Partition, int]) -> None: assert inner_consumer.tell() == {partition: 0} From e113f9106eadf37cfeeb6e9e044cefaa8f7a9bf1 Mon Sep 17 00:00:00 2001 From: Onkar Deshpande Date: Tue, 17 Sep 2024 08:28:02 -0700 Subject: [PATCH 18/23] Cleanup some settings from settings_test.py (#6313) During the investigation of https://github.com/getsentry/snuba/pull/6279, I found that: - `OPTIMIZE_JOB_CUTOFF_TIME` is overridden to `24` and it was causing test failures. - `OPTIMIZE_PARALLEL_MAX_JITTER_MINUTES` does not seem to be used anywhere in the code. With the change, less no. of local tests fail. The failure number is non-zero because https://github.com/getsentry/snuba/pull/6279 was reverted yesterday . I would like to verify if this small change passes through the CI and stays stable in prod. --- snuba/settings/settings_test.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/snuba/settings/settings_test.py b/snuba/settings/settings_test.py index c59bb3b37e..c39978f00e 100644 --- a/snuba/settings/settings_test.py +++ b/snuba/settings/settings_test.py @@ -39,11 +39,6 @@ # Set enforce retention to true for tests ENFORCE_RETENTION = True -# Ignore optimize job cut off time for tests -OPTIMIZE_JOB_CUTOFF_TIME = 24 - -OPTIMIZE_PARALLEL_MAX_JITTER_MINUTES = 0 - ADMIN_ALLOWED_PROD_PROJECTS = [1, 11276] REDIS_CLUSTERS = { From 40bf5501ddefb90d437722da29048d47c4c207ff Mon Sep 17 00:00:00 2001 From: Riya Chakraborty <47572810+ayirr7@users.noreply.github.com> Date: Tue, 17 Sep 2024 09:32:35 -0700 Subject: [PATCH 19/23] Bump Arroyo to 2.17.6 (#6289) Bumped the Python requirements, not sure if anything needs to be changed on the Rust side because of https://github.com/getsentry/snuba/blob/master/rust_snuba/Cargo.toml#L34 --------- Co-authored-by: Markus Unterwaditzer --- requirements.txt | 2 +- rust_snuba/Cargo.lock | 4 ++-- rust_snuba/bin/python_processor_infinite.rs | 4 ++-- snuba/subscriptions/scheduler_consumer.py | 4 ++++ 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/requirements.txt b/requirements.txt index d45afa1c07..58c57916d9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,7 +26,7 @@ pytest-watch==4.2.0 python-dateutil==2.8.2 python-rapidjson==1.8 redis==4.3.4 -sentry-arroyo==2.17.1 +sentry-arroyo==2.17.6 sentry-kafka-schemas==0.1.106 sentry-redis-tools==0.3.0 sentry-relay==0.8.44 diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index 620e3027cb..b0b0025916 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -2852,8 +2852,8 @@ dependencies = [ [[package]] name = "rust_arroyo" -version = "2.17.4" -source = "git+https://github.com/getsentry/arroyo#b15d5467a4958b18ba50175e468e4b70535cb669" +version = "2.17.6" +source = "git+https://github.com/getsentry/arroyo#084c60732a11006a70e6ae56f0ae1cdf8a6dd7d8" dependencies = [ "chrono", "coarsetime", diff --git a/rust_snuba/bin/python_processor_infinite.rs b/rust_snuba/bin/python_processor_infinite.rs index 8aaf285836..19f1cf7908 100644 --- a/rust_snuba/bin/python_processor_infinite.rs +++ b/rust_snuba/bin/python_processor_infinite.rs @@ -20,9 +20,9 @@ fn main() { let output2 = output.clone(); let step = RunTask::new( - move |_| { + move |message| { output2.fetch_add(1, Ordering::Relaxed); - Ok(()) + Ok(message) }, step, ); diff --git a/snuba/subscriptions/scheduler_consumer.py b/snuba/subscriptions/scheduler_consumer.py index 794b3fe6c6..cda3021cbe 100644 --- a/snuba/subscriptions/scheduler_consumer.py +++ b/snuba/subscriptions/scheduler_consumer.py @@ -210,6 +210,10 @@ def close(self, timeout: Optional[float] = None) -> None: def closed(self) -> bool: return self.__consumer.closed + @property + def member_id(self) -> str: + return self.__consumer.member_id + class SchedulerBuilder: def __init__( From d7fa544af717197309ed39f4f9fb3e85b2227d47 Mon Sep 17 00:00:00 2001 From: colin-sentry <161344340+colin-sentry@users.noreply.github.com> Date: Tue, 17 Sep 2024 12:50:13 -0400 Subject: [PATCH 20/23] fix: make tests folder as owned by EAP (#6315) --- CODEOWNERS | 1 + 1 file changed, 1 insertion(+) diff --git a/CODEOWNERS b/CODEOWNERS index 65a1109146..a18aa3643d 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -16,6 +16,7 @@ # EAP-related /snuba/web/rpc @getsentry/events-analytics-platform @getsentry/owners-snuba +/tests/web/rpc @getsentry/events-analytics-platform @getsentry/owners-snuba /snuba/snuba_migrations/events_analytics_platform @getsentry/events-analytics-platform @getsentry/owners-snuba /rust_snuba/src/processors/eap_spans.rs @getsentry/events-analytics-platform From 069f0eba3378fd29a7959a9fd7b093fd227a388e Mon Sep 17 00:00:00 2001 From: colin-sentry <161344340+colin-sentry@users.noreply.github.com> Date: Tue, 17 Sep 2024 12:51:07 -0400 Subject: [PATCH 21/23] fix: EAP timeseries order, add weight to calculations (#6302) The old tests used the same number a bunch of times, so there was a bug where the results were not ordered by time. This makes the tests a lot more robust and implements weight in the aggregate functions --------- Co-authored-by: William Mak --- snuba/web/rpc/timeseries.py | 47 +++++++++++--- tests/web/rpc/test_timeseries_api.py | 97 ++++++++++++++++++++++------ 2 files changed, 115 insertions(+), 29 deletions(-) diff --git a/snuba/web/rpc/timeseries.py b/snuba/web/rpc/timeseries.py index 7e63822f65..6c0f63c5e9 100644 --- a/snuba/web/rpc/timeseries.py +++ b/snuba/web/rpc/timeseries.py @@ -5,17 +5,18 @@ AggregateBucketRequest, AggregateBucketResponse, ) +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey from snuba.attribution.appid import AppID from snuba.attribution.attribution_info import AttributionInfo from snuba.datasets.entities.entity_key import EntityKey from snuba.datasets.entities.factory import get_entity from snuba.datasets.pluggable_dataset import PluggableDataset -from snuba.query import SelectedExpression +from snuba.query import OrderBy, OrderByDirection, SelectedExpression from snuba.query.data_source.simple import Entity from snuba.query.dsl import CurriedFunctions as cf from snuba.query.dsl import Functions as f -from snuba.query.dsl import column +from snuba.query.dsl import column, literal from snuba.query.expressions import Expression from snuba.query.logical import Query from snuba.query.query_settings import HTTPQuerySettings @@ -23,6 +24,7 @@ from snuba.utils.metrics.timer import Timer from snuba.web.query import run_query from snuba.web.rpc.common import ( + NORMALIZED_COLUMNS, attribute_key_to_expression, base_conditions_and, trace_item_filters_to_expression, @@ -34,19 +36,43 @@ def _get_aggregate_func( request: AggregateBucketRequest, ) -> Expression: - key_col = attribute_key_to_expression(request.key) + key_expr = attribute_key_to_expression(request.key) + exists_condition: Expression = literal(True) + if request.key.name not in NORMALIZED_COLUMNS: + if request.key.type == AttributeKey.TYPE_STRING: + exists_condition = f.mapContains( + column("attr_str"), literal(request.key.name) + ) + else: + exists_condition = f.mapContains( + column("attr_num"), literal(request.key.name) + ) + sampling_weight_expr = column("sampling_weight_2") + sign_expr = column("sign") + sampling_weight_times_sign = f.multiply(sampling_weight_expr, sign_expr) + if request.aggregate == AggregateBucketRequest.FUNCTION_SUM: - return f.sum(key_col, alias="sum") - if request.aggregate == AggregateBucketRequest.FUNCTION_AVERAGE: - return f.avg(key_col, alias="avg") + return f.sum(f.multiply(key_expr, sampling_weight_times_sign), alias="sum") if request.aggregate == AggregateBucketRequest.FUNCTION_COUNT: - return f.count(key_col, alias="count") + return f.sumIf(sampling_weight_times_sign, exists_condition, alias="count") + if request.aggregate == AggregateBucketRequest.FUNCTION_AVERAGE: + return f.divide( + f.sum(f.multiply(key_expr, sampling_weight_times_sign)), + f.sumIf(sampling_weight_times_sign, exists_condition, alias="count"), + alias="avg", + ) if request.aggregate == AggregateBucketRequest.FUNCTION_P50: - return cf.quantile(0.5)(key_col, alias="p50") + return cf.quantileTDigestWeighted(0.5)( + key_expr, sampling_weight_expr, alias="p50" + ) if request.aggregate == AggregateBucketRequest.FUNCTION_P95: - return cf.quantile(0.95)(key_col, alias="p90") + return cf.quantileTDigestWeighted(0.95)( + key_expr, sampling_weight_expr, alias="p95" + ) if request.aggregate == AggregateBucketRequest.FUNCTION_P99: - return cf.quantile(0.99)(key_col, alias="p95") + return cf.quantileTDigestWeighted(0.99)( + key_expr, sampling_weight_expr, alias="p99" + ) raise BadSnubaRPCRequestException( f"Aggregate {request.aggregate} had an unknown or unset type" @@ -72,6 +98,7 @@ def _build_query(request: AggregateBucketRequest) -> Query: ), granularity=request.granularity_secs, groupby=[column("time")], + order_by=[OrderBy(direction=OrderByDirection.ASC, expression=column("time"))], ) treeify_or_and_conditions(res) return res diff --git a/tests/web/rpc/test_timeseries_api.py b/tests/web/rpc/test_timeseries_api.py index d86e7e7ec3..d5093c9641 100644 --- a/tests/web/rpc/test_timeseries_api.py +++ b/tests/web/rpc/test_timeseries_api.py @@ -18,7 +18,8 @@ from tests.helpers import write_raw_unprocessed_events -def gen_message(dt: datetime) -> Mapping[str, Any]: +def gen_message(dt: datetime, msg_index: int) -> Mapping[str, Any]: + dt = dt - timedelta(hours=1) + timedelta(minutes=msg_index) return { "description": "/api/0/relays/projectconfigs/", "duration_ms": 152, @@ -42,7 +43,12 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: }, "measurements": { "num_of_spans": {"value": 50.0}, - "eap.measurement": {"value": 420}, + "eap.measurement": {"value": msg_index}, + "client_sample_rate": { + "value": 0.01 + if msg_index % 10 == 0 + else 1 # every 10th span should be 100x upscaled + }, }, "organization_id": 1, "origin": "auto.http.django", @@ -83,7 +89,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: "location": random.choice(["mobile", "frontend", "backend"]), }, "trace_id": uuid.uuid4().hex, - "start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)), + "start_timestamp_ms": int(dt.timestamp()) * 1000, "start_timestamp_precise": dt.timestamp(), "end_timestamp_precise": dt.timestamp() + 1, } @@ -97,8 +103,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: @pytest.fixture(autouse=True) def setup_teardown(clickhouse_db: None, redis_db: None) -> None: spans_storage = get_storage(StorageKey("eap_spans")) - start = BASE_TIME - messages = [gen_message(start - timedelta(minutes=i)) for i in range(120)] + messages = [gen_message(BASE_TIME, i) for i in range(120)] write_raw_unprocessed_events(spans_storage, messages) # type: ignore @@ -126,40 +131,94 @@ def test_basic(self) -> None: ) assert response.status_code == 200 - def test_with_data(self, setup_teardown: Any) -> None: - ts = Timestamp(seconds=int(BASE_TIME.timestamp())) - hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + def test_sum(self, setup_teardown: Any) -> None: message = AggregateBucketRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, cogs_category="something", referrer="something", - start_timestamp=Timestamp(seconds=hour_ago), - end_timestamp=ts, + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 30)), ), key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT), - aggregate=AggregateBucketRequest.FUNCTION_AVERAGE, - granularity_secs=1, + aggregate=AggregateBucketRequest.FUNCTION_SUM, + granularity_secs=300, ) response = timeseries_query(message) - assert response.result == [420 for _ in range(60)] + # spans have (measurement, sample rate) = (0, 100), (10, 1), ..., (100, 100) + # granularity puts five spans into the same bucket + # whole interval is 30 minutes, so there should be 6 buckets + # and our start time is exactly 1 hour after data stops + expected_results = [ + 60 * 100 + 61 + 62 + 63 + 64, + 65 + 66 + 67 + 68 + 69, + 70 * 100 + 71 + 72 + 73 + 74, + 75 + 76 + 77 + 78 + 79, + 80 * 100 + 81 + 82 + 83 + 84, + 85 + 86 + 87 + 88 + 89, + ] + assert response.result == expected_results def test_quantiles(self, setup_teardown: Any) -> None: - ts = Timestamp(seconds=int(BASE_TIME.timestamp())) - hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) message = AggregateBucketRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, cogs_category="something", referrer="something", - start_timestamp=Timestamp(seconds=hour_ago), - end_timestamp=ts, + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 60)), ), key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT), aggregate=AggregateBucketRequest.FUNCTION_P99, - granularity_secs=1, + granularity_secs=60 * 15, + ) + response = timeseries_query(message) + # spans have measurement = 0, 1, 2, ... + # for us, starts at 60, and granularity puts 15 spans into each bucket + # and the P99 of 15 spans is just the maximum of the 15. + # T-Digest is approximate, so these numbers can be +- 3 or so. + expected_results = pytest.approx( + [ + 60 + 15 * 0 + 14, + 60 + 15 * 1 + 14, + 60 + 15 * 2 + 14, + 60 + 15 * 3 + 14, + ], + rel=3, + ) + print(response.result) + print(expected_results) + assert response.result == expected_results + + def test_average(self, setup_teardown: Any) -> None: + message = AggregateBucketRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 30)), + ), + key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT), + aggregate=AggregateBucketRequest.FUNCTION_AVERAGE, + granularity_secs=300, ) response = timeseries_query(message) - assert response.result == [420 for _ in range(60)] + # spans have (measurement, sample rate) = (0, 100), (10, 1), ..., (100, 100) + # granularity puts five spans into the same bucket + # whole interval is 30 minutes, so there should be 6 buckets + # and our start time is exactly 1 hour after data stops + expected_results = pytest.approx( + [ + (60 * 100 + 61 + 62 + 63 + 64) / 104, + (65 + 66 + 67 + 68 + 69) / 5, + (70 * 100 + 71 + 72 + 73 + 74) / 104, + (75 + 76 + 77 + 78 + 79) / 5, + (80 * 100 + 81 + 82 + 83 + 84) / 104, + (85 + 86 + 87 + 88 + 89) / 5, + ] + ) + assert response.result == expected_results From 29438a7ef2a8ae5bc25f821ca98a9a691f462d52 Mon Sep 17 00:00:00 2001 From: colin-sentry <161344340+colin-sentry@users.noreply.github.com> Date: Tue, 17 Sep 2024 13:57:17 -0400 Subject: [PATCH 22/23] feat(eap): implement backend for "what tag values exist" RPC (#6304) If you have spans with tags like "hello": "world" and "hello": "blah", you'd like to be able to say 'what values are possible for key="hello"?' This endpoint implements that functionality. --- snuba/web/rpc/__init__.py | 32 +++++ ...s_list.py => trace_item_attribute_list.py} | 13 +- snuba/web/rpc/trace_item_attribute_values.py | 115 ++++++++++++++++ snuba/web/views.py | 21 +-- ...t.py => test_trace_item_attribute_list.py} | 8 +- .../rpc/test_trace_item_attribute_values.py | 127 ++++++++++++++++++ 6 files changed, 289 insertions(+), 27 deletions(-) create mode 100644 snuba/web/rpc/__init__.py rename snuba/web/rpc/{tags_list.py => trace_item_attribute_list.py} (77%) create mode 100644 snuba/web/rpc/trace_item_attribute_values.py rename tests/web/rpc/{test_tags_list.py => test_trace_item_attribute_list.py} (96%) create mode 100644 tests/web/rpc/test_trace_item_attribute_values.py diff --git a/snuba/web/rpc/__init__.py b/snuba/web/rpc/__init__.py new file mode 100644 index 0000000000..6034257542 --- /dev/null +++ b/snuba/web/rpc/__init__.py @@ -0,0 +1,32 @@ +from typing import Any, Callable, Mapping, Tuple + +from google.protobuf.message import Message as ProtobufMessage +from sentry_protos.snuba.v1alpha.endpoint_aggregate_bucket_pb2 import ( + AggregateBucketRequest, +) +from sentry_protos.snuba.v1alpha.endpoint_span_samples_pb2 import SpanSamplesRequest +from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( + AttributeValuesRequest, + TraceItemAttributesRequest, +) + +from snuba.utils.metrics.timer import Timer +from snuba.web.rpc.span_samples import span_samples_query +from snuba.web.rpc.timeseries import timeseries_query +from snuba.web.rpc.trace_item_attribute_list import trace_item_attribute_list_query +from snuba.web.rpc.trace_item_attribute_values import trace_item_attribute_values_query + +ALL_RPCS: Mapping[ + str, Tuple[Callable[[Any, Timer], ProtobufMessage], type[ProtobufMessage]] +] = { + "AggregateBucketRequest": (timeseries_query, AggregateBucketRequest), + "SpanSamplesRequest": (span_samples_query, SpanSamplesRequest), + "TraceItemAttributesRequest": ( + trace_item_attribute_list_query, + TraceItemAttributesRequest, + ), + "AttributeValuesRequest": ( + trace_item_attribute_values_query, + AttributeValuesRequest, + ), +} diff --git a/snuba/web/rpc/tags_list.py b/snuba/web/rpc/trace_item_attribute_list.py similarity index 77% rename from snuba/web/rpc/tags_list.py rename to snuba/web/rpc/trace_item_attribute_list.py index e0dbb81421..4ba75235a0 100644 --- a/snuba/web/rpc/tags_list.py +++ b/snuba/web/rpc/trace_item_attribute_list.py @@ -15,7 +15,7 @@ from snuba.web.rpc.exceptions import BadSnubaRPCRequestException -def tags_list_query( +def trace_item_attribute_list_query( request: TraceItemAttributesRequest, _timer: Optional[Timer] = None ) -> TraceItemAttributesResponse: if request.type == AttributeKey.Type.TYPE_STRING: @@ -31,11 +31,16 @@ def tags_list_query( if request.limit > 1000: raise BadSnubaRPCRequestException("Limit can be at most 1000") + # this table stores timestamp as toStartOfDay(x) in UTC, so if you request 4PM - 8PM on a specific day, nada start_timestamp = datetime.utcfromtimestamp(request.meta.start_timestamp.seconds) - if start_timestamp.day >= datetime.utcnow().day and start_timestamp.hour != 0: - raise BadSnubaRPCRequestException( - "Tags' timestamps are stored per-day, you probably want to set start_timestamp to UTC 00:00 today or a time yesterday." + end_timestamp = datetime.utcfromtimestamp(request.meta.end_timestamp.seconds) + if start_timestamp.day == end_timestamp.day: + start_timestamp = start_timestamp.replace( + day=start_timestamp.day - 1, hour=0, minute=0, second=0, microsecond=0 ) + end_timestamp = end_timestamp.replace(day=end_timestamp.day + 1) + request.meta.start_timestamp.seconds = int(start_timestamp.timestamp()) + request.meta.end_timestamp.seconds = int(end_timestamp.timestamp()) query = f""" SELECT DISTINCT attr_key, timestamp diff --git a/snuba/web/rpc/trace_item_attribute_values.py b/snuba/web/rpc/trace_item_attribute_values.py new file mode 100644 index 0000000000..640937c0ea --- /dev/null +++ b/snuba/web/rpc/trace_item_attribute_values.py @@ -0,0 +1,115 @@ +import uuid +from datetime import datetime + +from google.protobuf.json_format import MessageToDict +from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( + AttributeValuesRequest, + AttributeValuesResponse, +) + +from snuba.attribution.appid import AppID +from snuba.attribution.attribution_info import AttributionInfo +from snuba.datasets.entities.entity_key import EntityKey +from snuba.datasets.entities.factory import get_entity +from snuba.datasets.pluggable_dataset import PluggableDataset +from snuba.query import OrderBy, OrderByDirection, SelectedExpression +from snuba.query.data_source.simple import Entity +from snuba.query.dsl import Functions as f +from snuba.query.dsl import column, literal, literals_array +from snuba.query.logical import Query +from snuba.query.query_settings import HTTPQuerySettings +from snuba.request import Request as SnubaRequest +from snuba.utils.metrics.timer import Timer +from snuba.web.query import run_query +from snuba.web.rpc.common import base_conditions_and, treeify_or_and_conditions +from snuba.web.rpc.exceptions import BadSnubaRPCRequestException + + +def _build_query(request: AttributeValuesRequest) -> Query: + if request.limit > 1000: + raise BadSnubaRPCRequestException("Limit can be at most 1000") + + entity = Entity( + key=EntityKey("spans_str_attrs"), + schema=get_entity(EntityKey("spans_str_attrs")).get_data_model(), + sample=None, + ) + + # this table stores timestamp as toStartOfDay(x) in UTC, so if you request 4PM - 8PM on a specific day, nada + start_timestamp = datetime.utcfromtimestamp(request.meta.start_timestamp.seconds) + end_timestamp = datetime.utcfromtimestamp(request.meta.end_timestamp.seconds) + if start_timestamp.day == end_timestamp.day: + start_timestamp = start_timestamp.replace( + day=start_timestamp.day - 1, hour=0, minute=0, second=0, microsecond=0 + ) + end_timestamp = end_timestamp.replace(day=end_timestamp.day + 1) + request.meta.start_timestamp.seconds = int(start_timestamp.timestamp()) + request.meta.end_timestamp.seconds = int(end_timestamp.timestamp()) + + res = Query( + from_clause=entity, + selected_columns=[ + SelectedExpression( + name="attr_value", + expression=f.distinct(column("attr_value", alias="attr_value")), + ), + ], + condition=base_conditions_and( + request.meta, + f.equals(column("attr_key"), literal(request.name)), + # multiSearchAny has special treatment with ngram bloom filters + # https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#functions-support + f.multiSearchAny( + column("attr_value"), + literals_array(None, [literal(request.value_substring_match)]), + ), + ), + order_by=[ + OrderBy( + direction=OrderByDirection.ASC, expression=column("organization_id") + ), + OrderBy(direction=OrderByDirection.ASC, expression=column("attr_key")), + OrderBy(direction=OrderByDirection.ASC, expression=column("attr_value")), + ], + limit=request.limit, + offset=request.offset, + ) + treeify_or_and_conditions(res) + return res + + +def _build_snuba_request( + request: AttributeValuesRequest, +) -> SnubaRequest: + return SnubaRequest( + id=str(uuid.uuid4()), + original_body=MessageToDict(request), + query=_build_query(request), + query_settings=HTTPQuerySettings(), + attribution_info=AttributionInfo( + referrer=request.meta.referrer, + team="eap", + feature="eap", + tenant_ids={ + "organization_id": request.meta.organization_id, + "referrer": request.meta.referrer, + }, + app_id=AppID("eap"), + parent_api="trace_item_values", + ), + ) + + +def trace_item_attribute_values_query( + request: AttributeValuesRequest, timer: Timer | None = None +) -> AttributeValuesResponse: + timer = timer or Timer("trace_item_values") + snuba_request = _build_snuba_request(request) + res = run_query( + dataset=PluggableDataset(name="eap", all_entities=[]), + request=snuba_request, + timer=timer, + ) + return AttributeValuesResponse( + values=[r["attr_value"] for r in res.result.get("data", [])] + ) diff --git a/snuba/web/views.py b/snuba/web/views.py index 720f700123..1861f28937 100644 --- a/snuba/web/views.py +++ b/snuba/web/views.py @@ -35,14 +35,6 @@ render_template, ) from flask import request as http_request -from google.protobuf.message import Message as ProtobufMessage -from sentry_protos.snuba.v1alpha.endpoint_aggregate_bucket_pb2 import ( - AggregateBucketRequest, -) -from sentry_protos.snuba.v1alpha.endpoint_span_samples_pb2 import SpanSamplesRequest -from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( - TraceItemAttributesRequest, -) from werkzeug import Response as WerkzeugResponse from werkzeug.exceptions import InternalServerError @@ -83,10 +75,8 @@ from snuba.web.converters import DatasetConverter, EntityConverter, StorageConverter from snuba.web.delete_query import DeletesNotEnabledError, delete_from_storage from snuba.web.query import parse_and_run_query +from snuba.web.rpc import ALL_RPCS from snuba.web.rpc.exceptions import BadSnubaRPCRequestException -from snuba.web.rpc.span_samples import span_samples_query as span_samples_query -from snuba.web.rpc.tags_list import tags_list_query -from snuba.web.rpc.timeseries import timeseries_query as timeseries_query from snuba.writer import BatchWriterEncoderWrapper, WriterTableRow logger = logging.getLogger("snuba.api") @@ -285,15 +275,8 @@ def unqualified_query_view(*, timer: Timer) -> Union[Response, str, WerkzeugResp @application.route("/rpc/", methods=["POST"]) @util.time_request("timeseries") def rpc(*, name: str, timer: Timer) -> Response: - rpcs: Mapping[ - str, Tuple[Callable[[Any, Timer], ProtobufMessage], type[ProtobufMessage]] - ] = { - "AggregateBucketRequest": (timeseries_query, AggregateBucketRequest), - "SpanSamplesRequest": (span_samples_query, SpanSamplesRequest), - "TraceItemAttributesRequest": (tags_list_query, TraceItemAttributesRequest), - } try: - endpoint, req_class = rpcs[name] + endpoint, req_class = ALL_RPCS[name] req = req_class() req.ParseFromString(http_request.data) diff --git a/tests/web/rpc/test_tags_list.py b/tests/web/rpc/test_trace_item_attribute_list.py similarity index 96% rename from tests/web/rpc/test_tags_list.py rename to tests/web/rpc/test_trace_item_attribute_list.py index ff353fb9af..ecb0846e70 100644 --- a/tests/web/rpc/test_tags_list.py +++ b/tests/web/rpc/test_trace_item_attribute_list.py @@ -13,7 +13,7 @@ from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey -from snuba.web.rpc.tags_list import tags_list_query +from snuba.web.rpc.trace_item_attribute_list import trace_item_attribute_list_query from tests.base import BaseApiTest from tests.helpers import write_raw_unprocessed_events @@ -135,7 +135,7 @@ def test_simple_case_str(self, setup_teardown: Any) -> None: offset=0, type=AttributeKey.Type.TYPE_STRING, ) - response = tags_list_query(message) + response = trace_item_attribute_list_query(message) assert response.tags == [ TraceItemAttributesResponse.Tag( name=f"a_tag_{i:03}", type=AttributeKey.Type.TYPE_STRING @@ -175,7 +175,7 @@ def test_simple_case_float(self, setup_teardown: Any) -> None: offset=0, type=AttributeKey.Type.TYPE_FLOAT, ) - response = tags_list_query(message) + response = trace_item_attribute_list_query(message) assert response.tags == [ TraceItemAttributesResponse.Tag( name=f"b_measurement_{i:03}", type=AttributeKey.Type.TYPE_FLOAT @@ -215,7 +215,7 @@ def test_with_offset(self, setup_teardown: Any) -> None: offset=10, type=AttributeKey.Type.TYPE_FLOAT, ) - response = tags_list_query(message) + response = trace_item_attribute_list_query(message) assert response.tags == [ TraceItemAttributesResponse.Tag( name="b_measurement_010", type=AttributeKey.Type.TYPE_FLOAT diff --git a/tests/web/rpc/test_trace_item_attribute_values.py b/tests/web/rpc/test_trace_item_attribute_values.py new file mode 100644 index 0000000000..3f400219be --- /dev/null +++ b/tests/web/rpc/test_trace_item_attribute_values.py @@ -0,0 +1,127 @@ +import uuid +from datetime import UTC, datetime, timedelta +from typing import Any, Mapping + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import AttributeValuesRequest +from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.web.rpc.trace_item_attribute_values import trace_item_attribute_values_query +from tests.base import BaseApiTest +from tests.helpers import write_raw_unprocessed_events + +BASE_TIME = datetime.utcnow().replace(minute=0, second=0, microsecond=0) - timedelta( + minutes=180 +) +COMMON_META = RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp( + seconds=int( + datetime( + year=BASE_TIME.year, + month=BASE_TIME.month, + day=BASE_TIME.day, + tzinfo=UTC, + ).timestamp() + ) + ), + end_timestamp=Timestamp( + seconds=int( + datetime( + year=BASE_TIME.year, + month=BASE_TIME.month, + day=BASE_TIME.day + 1, + tzinfo=UTC, + ).timestamp() + ) + ), +) + + +def gen_message(tags: Mapping[str, str]) -> Mapping[str, Any]: + return { + "description": "/api/0/relays/projectconfigs/", + "duration_ms": 152, + "event_id": "d826225de75d42d6b2f01b957d51f18f", + "exclusive_time_ms": 0.228, + "is_segment": True, + "data": {}, + "measurements": {}, + "organization_id": 1, + "origin": "auto.http.django", + "project_id": 1, + "received": 1721319572.877828, + "retention_days": 90, + "segment_id": "8873a98879faf06d", + "sentry_tags": {}, + "span_id": uuid.uuid4().hex, + "tags": tags, + "trace_id": uuid.uuid4().hex, + "start_timestamp_ms": int(BASE_TIME.timestamp() * 1000), + "start_timestamp_precise": BASE_TIME.timestamp(), + "end_timestamp_precise": BASE_TIME.timestamp() + 1, + } + + +@pytest.fixture(autouse=True) +def setup_teardown(clickhouse_db: None, redis_db: None) -> None: + spans_storage = get_storage(StorageKey("eap_spans")) + messages = [ + gen_message({"tag1": "herp", "tag2": "herp"}), + gen_message({"tag1": "herpderp", "tag2": "herp"}), + gen_message({"tag1": "durp", "tag3": "herp"}), + gen_message({"tag1": "blah", "tag2": "herp"}), + gen_message({"tag1": "derpderp", "tag2": "derp"}), + gen_message({"tag2": "hehe"}), + gen_message({"tag1": "some_last_value"}), + ] + write_raw_unprocessed_events(spans_storage, messages) # type: ignore + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +class TestTraceItemAttributes(BaseApiTest): + def test_basic(self) -> None: + ts = Timestamp() + ts.GetCurrentTime() + message = AttributeValuesRequest( + meta=COMMON_META, + name="tag1", + limit=10, + offset=20, + ) + response = self.app.post( + "/rpc/AttributeValuesRequest", data=message.SerializeToString() + ) + assert response.status_code == 200 + + def test_simple_case(self, setup_teardown: Any) -> None: + message = AttributeValuesRequest(meta=COMMON_META, limit=5, name="tag1") + response = trace_item_attribute_values_query(message) + assert response.values == ["blah", "derpderp", "durp", "herp", "herpderp"] + + def test_offset(self, setup_teardown: Any) -> None: + message = AttributeValuesRequest( + meta=COMMON_META, limit=5, offset=1, name="tag1" + ) + response = trace_item_attribute_values_query(message) + assert response.values == [ + "derpderp", + "durp", + "herp", + "herpderp", + "some_last_value", + ] + + def test_with_value_filter(self, setup_teardown: Any) -> None: + message = AttributeValuesRequest( + meta=COMMON_META, limit=5, name="tag1", value_substring_match="erp" + ) + response = trace_item_attribute_values_query(message) + assert response.values == ["derpderp", "herp", "herpderp"] From 9b2b0ab8ae7e035bfbd96f83c53b53a979909086 Mon Sep 17 00:00:00 2001 From: davidtsuk <132949946+davidtsuk@users.noreply.github.com> Date: Tue, 17 Sep 2024 11:09:40 -0700 Subject: [PATCH 23/23] Add a setting for batch_join_timeout (#6314) Closes #6306 --- snuba/clickhouse/http.py | 4 +++- snuba/settings/__init__.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/snuba/clickhouse/http.py b/snuba/clickhouse/http.py index 5c91105efe..9a1d35f89b 100644 --- a/snuba/clickhouse/http.py +++ b/snuba/clickhouse/http.py @@ -353,7 +353,9 @@ def write(self, values: Iterable[bytes]) -> None: batch.append(value) batch.close() - batch_join_timeout = state.get_config("http_batch_join_timeout", 10) + batch_join_timeout = state.get_config( + "http_batch_join_timeout", settings.BATCH_JOIN_TIMEOUT + ) # IMPORTANT: Please read the docstring of this method if you ever decide to remove the # timeout argument from the join method. batch.join(timeout=batch_join_timeout) diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index a9e0b298fb..f681b4dd99 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -242,6 +242,7 @@ class RedisClusters(TypedDict): DISCARD_OLD_EVENTS = True CLICKHOUSE_HTTP_CHUNK_SIZE = 8192 HTTP_WRITER_BUFFER_SIZE = 1 +BATCH_JOIN_TIMEOUT = os.environ.get("BATCH_JOIN_TIMEOUT", 10) # Retention related settings ENFORCE_RETENTION: bool = False