diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 3c5f822556..beb953664e 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -30,7 +30,7 @@ jobs: force_orphan: true - name: Archive Docs - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: docs path: docs/build diff --git a/CHANGELOG.md b/CHANGELOG.md index b20e80ed1d..6f3fa97afd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,36 @@ # Changelog +## 24.9.0 + +### Various fixes & improvements + +- Update migrations list command to show migrations that no longer exist in the codebase (#6299) by @davidtsuk +- metric(consumer): Add a metric to track the size of individual spans (#6300) by @ayirr7 +- feat(rpc): Update tags list rpc (#6301) by @Zylphrex +- feat(eap): add virtual column support (#6292) by @volokluev +- tweak(eap): Allow more memory usage for eap spans (#6298) by @volokluev +- ref(doc): add documentation for the ReadinessState enum (#6295) by @viglia +- feat(eap): Start ingesting data into sample_weight_2 column (#6290) by @colin-sentry +- Update docker entrypoint to run heaptrack (#6273) by @ayirr7 +- fix(eap): Switch to sampling_weight_2 in entity (#6287) by @colin-sentry +- bug(query): Run entity validators in composite query pipeline (#6285) by @enochtangg +- feat(eap): make mapContains work with EAP dataset (#6284) by @colin-sentry +- feat(job-runner): create a new `snuba jobs` command (#6281) by @xurui-c +- feat(eap): Shard meta tables by trace ID (#6286) by @colin-sentry +- fix(eap): Make span_id be returned as a string correctly (#6283) by @colin-sentry +- feat(job-runner): scaffolding for job manifest testing (#6282) by @onewland +- bug(admin): Fix invalid query error alerting in snuba admin (#6280) by @enochtangg +- Fixing Snuba Admin trace UI error. (#6278) by @nachivrn +- feat(eap): Add a processor that allows you to do mapKeys on attr_str (#6277) by @colin-sentry +- cleanup(capman): remove legacy table rate limits (#6274) by @volokluev +- Fixing Snuba Admin trace UI error. (#6276) by @nachivrn +- hackweek(snuba-admin): MQL query tool (#6235) by @enochtangg +- feat(eap): Endpoint to get the tags available for a project (#6270) by @colin-sentry +- feat(sudo): issue slack notifications when sudo mode is used (#6271) by @volokluev +- chore(eap): Add entities and storages for EAP span meta tables (#6269) by @colin-sentry + +_Plus 60 more_ + ## 24.8.0 ### Various fixes & improvements diff --git a/CODEOWNERS b/CODEOWNERS index 65a1109146..a18aa3643d 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -16,6 +16,7 @@ # EAP-related /snuba/web/rpc @getsentry/events-analytics-platform @getsentry/owners-snuba +/tests/web/rpc @getsentry/events-analytics-platform @getsentry/owners-snuba /snuba/snuba_migrations/events_analytics_platform @getsentry/events-analytics-platform @getsentry/owners-snuba /rust_snuba/src/processors/eap_spans.rs @getsentry/events-analytics-platform diff --git a/Dockerfile b/Dockerfile index f882c5a07f..0fe9af0e03 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,6 +30,8 @@ RUN set -ex; \ runtimeDeps=' \ curl \ libjemalloc2 \ + gdb \ + heaptrack \ '; \ apt-get update; \ apt-get install -y $buildDeps $runtimeDeps --no-install-recommends; \ diff --git a/docker_entrypoint.sh b/docker_entrypoint.sh index 68ce582619..26041c4d6a 100755 --- a/docker_entrypoint.sh +++ b/docker_entrypoint.sh @@ -18,4 +18,8 @@ else printf "\n${help_result}" fi +if [ -n "${ENABLE_HEAPTRACK:-}" ]; then + set -- heaptrack "$@" +fi + exec "$@" diff --git a/docs/source/conf.py b/docs/source/conf.py index 993e13ed5e..06531023f6 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -7,7 +7,7 @@ copyright = "2021, Sentry Team and Contributors" author = "Sentry Team and Contributors" -release = "24.9.0.dev0" +release = "24.10.0.dev0" # -- General configuration --------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index 3723e817ee..58c57916d9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,7 +26,7 @@ pytest-watch==4.2.0 python-dateutil==2.8.2 python-rapidjson==1.8 redis==4.3.4 -sentry-arroyo==2.17.1 +sentry-arroyo==2.17.6 sentry-kafka-schemas==0.1.106 sentry-redis-tools==0.3.0 sentry-relay==0.8.44 @@ -45,4 +45,4 @@ sqlparse==0.4.2 google-api-python-client==2.88.0 sentry-usage-accountant==0.0.10 freezegun==1.2.2 -sentry-protos==0.1.16 +sentry-protos==0.1.21 diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index 620e3027cb..b0b0025916 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -2852,8 +2852,8 @@ dependencies = [ [[package]] name = "rust_arroyo" -version = "2.17.4" -source = "git+https://github.com/getsentry/arroyo#b15d5467a4958b18ba50175e468e4b70535cb669" +version = "2.17.6" +source = "git+https://github.com/getsentry/arroyo#084c60732a11006a70e6ae56f0ae1cdf8a6dd7d8" dependencies = [ "chrono", "coarsetime", diff --git a/rust_snuba/bin/python_processor_infinite.rs b/rust_snuba/bin/python_processor_infinite.rs index 8aaf285836..19f1cf7908 100644 --- a/rust_snuba/bin/python_processor_infinite.rs +++ b/rust_snuba/bin/python_processor_infinite.rs @@ -20,9 +20,9 @@ fn main() { let output2 = output.clone(); let step = RunTask::new( - move |_| { + move |message| { output2.fetch_add(1, Ordering::Relaxed); - Ok(()) + Ok(message) }, step, ); diff --git a/rust_snuba/src/processors/eap_spans.rs b/rust_snuba/src/processors/eap_spans.rs index 795c85cbdb..a78181e4a6 100644 --- a/rust_snuba/src/processors/eap_spans.rs +++ b/rust_snuba/src/processors/eap_spans.rs @@ -52,7 +52,8 @@ struct EAPSpan { name: String, //aka description sampling_factor: f64, - sampling_weight: f64, + sampling_weight: f64, //remove eventually + sampling_weight_2: u64, sign: u8, //1 for additions, -1 for deletions - for this worker it should be 1 #( @@ -101,7 +102,8 @@ impl From for EAPSpan { retention_days: from.retention_days, name: from.description.unwrap_or_default(), - sampling_weight: 1., + sampling_weight: 1., //remove eventually + sampling_weight_2: 1, sampling_factor: 1., sign: 1, @@ -153,6 +155,7 @@ impl From for EAPSpan { if k == "client_sample_rate" && v.value != 0.0 { res.sampling_factor = v.value; res.sampling_weight = 1.0 / v.value; + res.sampling_weight_2 = (1.0 / v.value) as u64; } else { insert_num(k.clone(), v.value); } @@ -217,6 +220,9 @@ mod tests { "measurements": { "num_of_spans": { "value": 50.0 + }, + "client_sample_rate": { + "value": 0.01 } }, "organization_id": 1, diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap index 7ab2b7f26d..05eac7c04f 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap @@ -19,8 +19,9 @@ expression: span "exclusive_time_ms": 0.228, "retention_days": 90, "name": "/api/0/relays/projectconfigs/", - "sampling_factor": 1.0, - "sampling_weight": 1.0, + "sampling_factor": 0.01, + "sampling_weight": 100.0, + "sampling_weight_2": 100, "sign": 1, "attr_str_0": { "relay_protocol_version": "3", diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-spans-EAPSpansMessageProcessor-snuba-spans__1__basic_span.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-spans-EAPSpansMessageProcessor-snuba-spans__1__basic_span.json.snap index 3b8fdaf3a4..64a32b2761 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-spans-EAPSpansMessageProcessor-snuba-spans__1__basic_span.json.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-spans-EAPSpansMessageProcessor-snuba-spans__1__basic_span.json.snap @@ -52,6 +52,7 @@ expression: snapshot_payload "retention_days": 90, "sampling_factor": 1.0, "sampling_weight": 1.0, + "sampling_weight_2": 1, "segment_id": 16045690984833335023, "segment_name": "/organizations/:orgId/issues/", "service": "1", diff --git a/rust_snuba/src/processors/spans.rs b/rust_snuba/src/processors/spans.rs index 49a1f7e398..eefe8a11a1 100644 --- a/rust_snuba/src/processors/spans.rs +++ b/rust_snuba/src/processors/spans.rs @@ -3,6 +3,7 @@ use std::str::FromStr; use anyhow::Context; use chrono::DateTime; +use rust_arroyo::timer; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -22,6 +23,8 @@ pub fn process_message( let payload_bytes = payload.payload().context("Expected payload")?; let msg: FromSpanMessage = serde_json::from_slice(payload_bytes)?; + timer!("spans.messages.size", payload_bytes.len() as f64); + let origin_timestamp = DateTime::from_timestamp(msg.received as i64, 0); let mut span: Span = msg.try_into()?; diff --git a/setup.py b/setup.py index 245e9bb196..2f4325659c 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import find_packages, setup -VERSION = "24.9.0.dev0" +VERSION = "24.10.0.dev0" def get_requirements() -> Sequence[str]: diff --git a/snuba/admin/clickhouse/migration_checks.py b/snuba/admin/clickhouse/migration_checks.py index 0439d56956..8fdd9e495b 100644 --- a/snuba/admin/clickhouse/migration_checks.py +++ b/snuba/admin/clickhouse/migration_checks.py @@ -100,7 +100,7 @@ def __init__( ).get_migrations() migration_statuses = {} - for migration_id, status, _ in migrations: + for migration_id, status, _, _ in migrations: migration_statuses[migration_id] = { "migration_id": migration_id, "status": status, diff --git a/snuba/admin/notifications/slack/utils.py b/snuba/admin/notifications/slack/utils.py index 037f0036c4..ae817eb2e2 100644 --- a/snuba/admin/notifications/slack/utils.py +++ b/snuba/admin/notifications/slack/utils.py @@ -1,3 +1,4 @@ +import os from typing import Any, Dict, List, Optional, Union from snuba import settings @@ -95,12 +96,13 @@ def build_context( user: str, timestamp: str, action: AuditLogAction ) -> Dict[str, Union[str, List[Dict[str, str]]]]: url = f"{settings.ADMIN_URL}/#auditlog" + environ = os.environ.get("SENTRY_ENVIRONMENT") or "unknown environment" return { "type": "context", "elements": [ { "type": "mrkdwn", - "text": f"{action.value} at *<{url}|{timestamp}>* by *<{user}>*", + "text": f"{action.value} at *<{url}|{timestamp}>* by *<{user}>* in *<{environ}>*", } ], } diff --git a/snuba/admin/views.py b/snuba/admin/views.py index 91136e1646..9942b55419 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -208,7 +208,7 @@ def migrations_groups_list(group: str) -> Response: "status": status.value, "blocking": blocking, } - for migration_id, status, blocking in runner_group_migrations + for migration_id, status, blocking, _ in runner_group_migrations ] ), 200, diff --git a/snuba/cli/migrations.py b/snuba/cli/migrations.py index 95d180cfaa..bacb266dbe 100644 --- a/snuba/cli/migrations.py +++ b/snuba/cli/migrations.py @@ -37,10 +37,10 @@ def list() -> None: setup_logging() check_clickhouse_connections(CLUSTERS) runner = Runner() - for group, group_migrations in runner.show_all(): + for group, group_migrations in runner.show_all(include_nonexistent=True): readiness_state = get_group_readiness_state(group) click.echo(f"{group.value} (readiness_state: {readiness_state.value})") - for migration_id, status, blocking in group_migrations: + for migration_id, status, blocking, existing in group_migrations: symbol = { Status.COMPLETED: "X", Status.NOT_STARTED: " ", @@ -53,7 +53,11 @@ def list() -> None: if status != Status.COMPLETED and blocking: blocking_text = " (blocking)" - click.echo(f"[{symbol}] {migration_id}{in_progress_text}{blocking_text}") + existing_text = "" if existing else " (this migration no longer exists)" + + click.echo( + f"[{symbol}] {migration_id}{in_progress_text}{blocking_text}{existing_text}" + ) click.echo() diff --git a/snuba/clickhouse/http.py b/snuba/clickhouse/http.py index 5c91105efe..9a1d35f89b 100644 --- a/snuba/clickhouse/http.py +++ b/snuba/clickhouse/http.py @@ -353,7 +353,9 @@ def write(self, values: Iterable[bytes]) -> None: batch.append(value) batch.close() - batch_join_timeout = state.get_config("http_batch_join_timeout", 10) + batch_join_timeout = state.get_config( + "http_batch_join_timeout", settings.BATCH_JOIN_TIMEOUT + ) # IMPORTANT: Please read the docstring of this method if you ever decide to remove the # timeout argument from the join method. batch.join(timeout=batch_join_timeout) diff --git a/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml index 6c08867f91..b10c58d583 100644 --- a/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml @@ -23,6 +23,7 @@ schema: { name: name, type: String }, { name: sampling_factor, type: Float, args: { size: 64 } }, { name: sampling_weight, type: Float, args: { size: 64 } }, + { name: sampling_weight_2, type: UInt, args: { size: 64 } }, { name: sign, type: Int, args: { size: 8 } }, { name: attr_str, type: Map, args: { key: { type: String }, value: { type: String } } }, { name: attr_num, type: Map, args: { key: { type: String }, value: { type: Float, args: { size: 64 } } } }, @@ -39,6 +40,7 @@ storages: from_col_name: timestamp to_table_name: null to_col_name: _sort_timestamp + subscriptables: - mapper: SubscriptableHashBucketMapper args: diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml index 4d4245a959..ed2d22011d 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/eap_spans.yaml @@ -29,6 +29,7 @@ schema: { name: name, type: String }, { name: sampling_factor, type: Float, args: { size: 64 } }, { name: sampling_weight, type: Float, args: { size: 64 } }, + { name: sampling_weight_2, type: UInt, args: { size: 64 } }, { name: sign, type: Int, args: { size: 8 } }, { name: attr_str_0, type: Map, args: { key: { type: String }, value: { type: String } } }, { name: attr_str_1, type: Map, args: { key: { type: String }, value: { type: String } } }, @@ -113,6 +114,12 @@ query_processors: prewhere_candidates: [span_id, trace_id, segment_name] - processor: TupleUnaliaser + - processor: ClickhouseSettingsOverride + args: + settings: + max_memory_usage: 5000000000 + max_rows_to_group_by: 1000000 + group_by_overflow_mode: any mandatory_condition_checkers: - condition: OrgIdEnforcer diff --git a/snuba/datasets/readiness_state.py b/snuba/datasets/readiness_state.py index ca4bc96f1d..ada34b7f62 100644 --- a/snuba/datasets/readiness_state.py +++ b/snuba/datasets/readiness_state.py @@ -4,6 +4,26 @@ class ReadinessState(Enum): + """ + Readiness states are essentially feature flags for snuba datasets. + The readiness state defines whether or not a dataset is made available + in specific sentry environments. + Currently, sentry environments include the following: + * local/CI + * SaaS + * S4S + * Self-Hosted + * Single-Tenant + + The following is a list of readiness states and the environments + they map to: + * limited -> local/CI + * experimental -> local/CI, S4S + * partial -> local/CI, SaaS, S4S + * deprecate -> local/CI, Self-Hosted + * complete -> local/CI, SaaS, S4S, Self-Hosted, Single-Tenant + """ + LIMITED = "limited" DEPRECATE = "deprecate" PARTIAL = "partial" diff --git a/snuba/manual_jobs/run_manifest.json b/snuba/manual_jobs/job_manifest.json similarity index 100% rename from snuba/manual_jobs/run_manifest.json rename to snuba/manual_jobs/job_manifest.json diff --git a/snuba/migrations/runner.py b/snuba/migrations/runner.py index 445e5f4ee6..82a970a3b6 100644 --- a/snuba/migrations/runner.py +++ b/snuba/migrations/runner.py @@ -1,3 +1,4 @@ +from collections import defaultdict from datetime import datetime from functools import partial from typing import List, Mapping, MutableMapping, NamedTuple, Optional, Sequence, Tuple @@ -64,6 +65,7 @@ class MigrationDetails(NamedTuple): migration_id: str status: Status blocking: bool + exists: bool class Runner: @@ -133,7 +135,7 @@ def force_overwrite_status( ) def show_all( - self, groups: Optional[Sequence[str]] = None + self, groups: Optional[Sequence[str]] = None, include_nonexistent: bool = False ) -> List[Tuple[MigrationGroup, List[MigrationDetails]]]: """ Returns the list of migrations and their statuses for each group. @@ -148,6 +150,9 @@ def show_all( migration_groups = get_active_migration_groups() migration_status = self._get_migration_status(migration_groups) + clickhouse_group_migrations = defaultdict(set) + for group, migration_id in migration_status.keys(): + clickhouse_group_migrations[group].add(migration_id) def get_status(migration_key: MigrationKey) -> Status: return migration_status.get(migration_key, Status.NOT_STARTED) @@ -156,15 +161,31 @@ def get_status(migration_key: MigrationKey) -> Status: group_migrations: List[MigrationDetails] = [] group_loader = get_group_loader(group) - for migration_id in group_loader.get_migrations(): + migration_ids = group_loader.get_migrations() + for migration_id in migration_ids: migration_key = MigrationKey(group, migration_id) migration = group_loader.load_migration(migration_id) group_migrations.append( MigrationDetails( - migration_id, get_status(migration_key), migration.blocking + migration_id, + get_status(migration_key), + migration.blocking, + True, ) ) + if include_nonexistent: + non_existing_migrations = clickhouse_group_migrations.get( + group, set() + ).difference(set(migration_ids)) + for migration_id in non_existing_migrations: + migration_key = MigrationKey(group, migration_id) + group_migrations.append( + MigrationDetails( + migration_id, get_status(migration_key), False, False + ) + ) + migrations.append((group, group_migrations)) return migrations diff --git a/snuba/pipeline/stages/query_processing.py b/snuba/pipeline/stages/query_processing.py index 393f7da83b..7b2fd4311c 100644 --- a/snuba/pipeline/stages/query_processing.py +++ b/snuba/pipeline/stages/query_processing.py @@ -34,10 +34,10 @@ def _process_data( if translated_storage_query: return translated_storage_query + run_entity_validators(cast(EntityQuery, query), pipe_input.query_settings) if isinstance(query, LogicalQuery) and isinstance( query.get_from_clause(), Entity ): - run_entity_validators(cast(EntityQuery, query), pipe_input.query_settings) return run_entity_processing_executor(query, pipe_input.query_settings) elif isinstance(query, CompositeQuery): # if we were not able to translate the storage query earlier and we got to this point, this is diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index a9e0b298fb..f681b4dd99 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -242,6 +242,7 @@ class RedisClusters(TypedDict): DISCARD_OLD_EVENTS = True CLICKHOUSE_HTTP_CHUNK_SIZE = 8192 HTTP_WRITER_BUFFER_SIZE = 1 +BATCH_JOIN_TIMEOUT = os.environ.get("BATCH_JOIN_TIMEOUT", 10) # Retention related settings ENFORCE_RETENTION: bool = False diff --git a/snuba/settings/settings_test.py b/snuba/settings/settings_test.py index c59bb3b37e..c39978f00e 100644 --- a/snuba/settings/settings_test.py +++ b/snuba/settings/settings_test.py @@ -39,11 +39,6 @@ # Set enforce retention to true for tests ENFORCE_RETENTION = True -# Ignore optimize job cut off time for tests -OPTIMIZE_JOB_CUTOFF_TIME = 24 - -OPTIMIZE_PARALLEL_MAX_JITTER_MINUTES = 0 - ADMIN_ALLOWED_PROD_PROJECTS = [1, 11276] REDIS_CLUSTERS = { diff --git a/snuba/subscriptions/scheduler_consumer.py b/snuba/subscriptions/scheduler_consumer.py index 794b3fe6c6..cda3021cbe 100644 --- a/snuba/subscriptions/scheduler_consumer.py +++ b/snuba/subscriptions/scheduler_consumer.py @@ -210,6 +210,10 @@ def close(self, timeout: Optional[float] = None) -> None: def closed(self) -> bool: return self.__consumer.closed + @property + def member_id(self) -> str: + return self.__consumer.member_id + class SchedulerBuilder: def __init__( diff --git a/snuba/web/rpc/__init__.py b/snuba/web/rpc/__init__.py new file mode 100644 index 0000000000..6034257542 --- /dev/null +++ b/snuba/web/rpc/__init__.py @@ -0,0 +1,32 @@ +from typing import Any, Callable, Mapping, Tuple + +from google.protobuf.message import Message as ProtobufMessage +from sentry_protos.snuba.v1alpha.endpoint_aggregate_bucket_pb2 import ( + AggregateBucketRequest, +) +from sentry_protos.snuba.v1alpha.endpoint_span_samples_pb2 import SpanSamplesRequest +from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( + AttributeValuesRequest, + TraceItemAttributesRequest, +) + +from snuba.utils.metrics.timer import Timer +from snuba.web.rpc.span_samples import span_samples_query +from snuba.web.rpc.timeseries import timeseries_query +from snuba.web.rpc.trace_item_attribute_list import trace_item_attribute_list_query +from snuba.web.rpc.trace_item_attribute_values import trace_item_attribute_values_query + +ALL_RPCS: Mapping[ + str, Tuple[Callable[[Any, Timer], ProtobufMessage], type[ProtobufMessage]] +] = { + "AggregateBucketRequest": (timeseries_query, AggregateBucketRequest), + "SpanSamplesRequest": (span_samples_query, SpanSamplesRequest), + "TraceItemAttributesRequest": ( + trace_item_attribute_list_query, + TraceItemAttributesRequest, + ), + "AttributeValuesRequest": ( + trace_item_attribute_values_query, + AttributeValuesRequest, + ), +} diff --git a/snuba/web/rpc/common.py b/snuba/web/rpc/common.py index 53c273119f..f230736907 100644 --- a/snuba/web/rpc/common.py +++ b/snuba/web/rpc/common.py @@ -1,7 +1,10 @@ -from typing import Final, Mapping, Set +from typing import Final, Mapping, Sequence, Set from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta -from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import ( + AttributeKey, + VirtualColumnContext, +) from sentry_protos.snuba.v1alpha.trace_item_filter_pb2 import ( ComparisonFilter, TraceItemFilter, @@ -130,6 +133,73 @@ def attribute_key_to_expression(attr_key: AttributeKey) -> Expression: ) +def apply_virtual_columns( + query: Query, virtual_column_contexts: Sequence[VirtualColumnContext] +) -> None: + """Injects virtual column mappings into the clickhouse query. Works with NORMALIZED_COLUMNS on the table or + dynamic columns in attr_str + + attr_num not supported because mapping on floats is a bad idea + + Example: + + SELECT + project_name AS `project_name`, + attr_str['release'] AS `release`, + attr_str['sentry.sdk.name'] AS `sentry.sdk.name`, + ... rest of query + + contexts: + [ {from_column_name: project_id, to_column_name: project_name, value_map: {1: "sentry", 2: "snuba"}} ] + + + Query will be transformed into: + + SELECT + -- see the project name column transformed and the value mapping injected + transform( CAST( project_id, 'String'), array( '1', '2'), array( 'sentry', 'snuba'), 'unknown') AS `project_name`, + -- + attr_str['release'] AS `release`, + attr_str['sentry.sdk.name'] AS `sentry.sdk.name`, + ... rest of query + + """ + + if not virtual_column_contexts: + return + + mapped_column_to_context = {c.to_column_name: c for c in virtual_column_contexts} + + def transform_expressions(expression: Expression) -> Expression: + # virtual columns will show up as `attr_str[virtual_column_name]` or `attr_num[virtual_column_name]` + if not isinstance(expression, SubscriptableReference): + return expression + + if expression.column.column_name != "attr_str": + return expression + context = mapped_column_to_context.get(str(expression.key.value)) + if context: + attribute_expression = attribute_key_to_expression( + AttributeKey( + name=context.from_column_name, + type=NORMALIZED_COLUMNS.get( + context.from_column_name, AttributeKey.TYPE_STRING + ), + ) + ) + return f.transform( + f.CAST(attribute_expression, "String"), + literals_array(None, [literal(k) for k in context.value_map.keys()]), + literals_array(None, [literal(v) for v in context.value_map.values()]), + literal("unknown"), + alias=context.to_column_name, + ) + + return expression + + query.transform_expressions(transform_expressions) + + def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression: """ Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024") diff --git a/snuba/web/rpc/span_samples.py b/snuba/web/rpc/span_samples.py index 7d5ebf6da6..8f5db267a5 100644 --- a/snuba/web/rpc/span_samples.py +++ b/snuba/web/rpc/span_samples.py @@ -25,6 +25,7 @@ from snuba.utils.metrics.timer import Timer from snuba.web.query import run_query from snuba.web.rpc.common import ( + apply_virtual_columns, attribute_key_to_expression, base_conditions_and, trace_item_filters_to_expression, @@ -71,6 +72,7 @@ def _build_query(request: SpanSamplesRequest) -> Query: limit=request.limit, ) treeify_or_and_conditions(res) + apply_virtual_columns(res, request.virtual_column_contexts) return res diff --git a/snuba/web/rpc/tags_list.py b/snuba/web/rpc/tags_list.py deleted file mode 100644 index 2febde7de0..0000000000 --- a/snuba/web/rpc/tags_list.py +++ /dev/null @@ -1,73 +0,0 @@ -from datetime import datetime -from typing import List, Optional - -from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( - TagsListRequest, - TagsListResponse, -) - -from snuba.clickhouse.formatter.nodes import FormattedQuery, StringNode -from snuba.datasets.schemas.tables import TableSource -from snuba.datasets.storages.factory import get_storage -from snuba.datasets.storages.storage_key import StorageKey -from snuba.utils.metrics.timer import Timer -from snuba.web.rpc.exceptions import BadSnubaRPCRequestException - - -def tags_list_query( - request: TagsListRequest, _timer: Optional[Timer] = None -) -> TagsListResponse: - str_storage = get_storage(StorageKey("spans_str_attrs")) - num_storage = get_storage(StorageKey("spans_num_attrs")) - - str_data_source = str_storage.get_schema().get_data_source() - assert isinstance(str_data_source, TableSource) - num_data_source = num_storage.get_schema().get_data_source() - assert isinstance(num_data_source, TableSource) - - if request.limit > 1000: - raise BadSnubaRPCRequestException("Limit can be at most 1000") - - start_timestamp = datetime.utcfromtimestamp(request.meta.start_timestamp.seconds) - if start_timestamp.day >= datetime.utcnow().day and start_timestamp.hour != 0: - raise BadSnubaRPCRequestException( - "Tags' timestamps are stored per-day, you probably want to set start_timestamp to UTC 00:00 today or a time yesterday." - ) - - query = f""" -SELECT * FROM ( - SELECT DISTINCT attr_key, 'str' as type, timestamp - FROM {str_data_source.get_table_name()} - WHERE organization_id={request.meta.organization_id} - AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)}) - AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds}) - - UNION ALL - - SELECT DISTINCT attr_key, 'num' as type, timestamp - FROM {num_data_source.get_table_name()} - WHERE organization_id={request.meta.organization_id} - AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)}) - AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds}) -) -ORDER BY attr_key -LIMIT {request.limit} OFFSET {request.offset} -""" - - cluster = str_storage.get_cluster() - reader = cluster.get_reader() - result = reader.execute(FormattedQuery([StringNode(query)])) - - tags: List[TagsListResponse.Tag] = [] - for row in result.get("data", []): - tags.append( - TagsListResponse.Tag( - name=row["attr_key"], - type={ - "str": TagsListResponse.TYPE_STRING, - "num": TagsListResponse.TYPE_NUMBER, - }[row["type"]], - ) - ) - - return TagsListResponse(tags=tags) diff --git a/snuba/web/rpc/timeseries.py b/snuba/web/rpc/timeseries.py index 7e63822f65..6c0f63c5e9 100644 --- a/snuba/web/rpc/timeseries.py +++ b/snuba/web/rpc/timeseries.py @@ -5,17 +5,18 @@ AggregateBucketRequest, AggregateBucketResponse, ) +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey from snuba.attribution.appid import AppID from snuba.attribution.attribution_info import AttributionInfo from snuba.datasets.entities.entity_key import EntityKey from snuba.datasets.entities.factory import get_entity from snuba.datasets.pluggable_dataset import PluggableDataset -from snuba.query import SelectedExpression +from snuba.query import OrderBy, OrderByDirection, SelectedExpression from snuba.query.data_source.simple import Entity from snuba.query.dsl import CurriedFunctions as cf from snuba.query.dsl import Functions as f -from snuba.query.dsl import column +from snuba.query.dsl import column, literal from snuba.query.expressions import Expression from snuba.query.logical import Query from snuba.query.query_settings import HTTPQuerySettings @@ -23,6 +24,7 @@ from snuba.utils.metrics.timer import Timer from snuba.web.query import run_query from snuba.web.rpc.common import ( + NORMALIZED_COLUMNS, attribute_key_to_expression, base_conditions_and, trace_item_filters_to_expression, @@ -34,19 +36,43 @@ def _get_aggregate_func( request: AggregateBucketRequest, ) -> Expression: - key_col = attribute_key_to_expression(request.key) + key_expr = attribute_key_to_expression(request.key) + exists_condition: Expression = literal(True) + if request.key.name not in NORMALIZED_COLUMNS: + if request.key.type == AttributeKey.TYPE_STRING: + exists_condition = f.mapContains( + column("attr_str"), literal(request.key.name) + ) + else: + exists_condition = f.mapContains( + column("attr_num"), literal(request.key.name) + ) + sampling_weight_expr = column("sampling_weight_2") + sign_expr = column("sign") + sampling_weight_times_sign = f.multiply(sampling_weight_expr, sign_expr) + if request.aggregate == AggregateBucketRequest.FUNCTION_SUM: - return f.sum(key_col, alias="sum") - if request.aggregate == AggregateBucketRequest.FUNCTION_AVERAGE: - return f.avg(key_col, alias="avg") + return f.sum(f.multiply(key_expr, sampling_weight_times_sign), alias="sum") if request.aggregate == AggregateBucketRequest.FUNCTION_COUNT: - return f.count(key_col, alias="count") + return f.sumIf(sampling_weight_times_sign, exists_condition, alias="count") + if request.aggregate == AggregateBucketRequest.FUNCTION_AVERAGE: + return f.divide( + f.sum(f.multiply(key_expr, sampling_weight_times_sign)), + f.sumIf(sampling_weight_times_sign, exists_condition, alias="count"), + alias="avg", + ) if request.aggregate == AggregateBucketRequest.FUNCTION_P50: - return cf.quantile(0.5)(key_col, alias="p50") + return cf.quantileTDigestWeighted(0.5)( + key_expr, sampling_weight_expr, alias="p50" + ) if request.aggregate == AggregateBucketRequest.FUNCTION_P95: - return cf.quantile(0.95)(key_col, alias="p90") + return cf.quantileTDigestWeighted(0.95)( + key_expr, sampling_weight_expr, alias="p95" + ) if request.aggregate == AggregateBucketRequest.FUNCTION_P99: - return cf.quantile(0.99)(key_col, alias="p95") + return cf.quantileTDigestWeighted(0.99)( + key_expr, sampling_weight_expr, alias="p99" + ) raise BadSnubaRPCRequestException( f"Aggregate {request.aggregate} had an unknown or unset type" @@ -72,6 +98,7 @@ def _build_query(request: AggregateBucketRequest) -> Query: ), granularity=request.granularity_secs, groupby=[column("time")], + order_by=[OrderBy(direction=OrderByDirection.ASC, expression=column("time"))], ) treeify_or_and_conditions(res) return res diff --git a/snuba/web/rpc/trace_item_attribute_list.py b/snuba/web/rpc/trace_item_attribute_list.py new file mode 100644 index 0000000000..4ba75235a0 --- /dev/null +++ b/snuba/web/rpc/trace_item_attribute_list.py @@ -0,0 +1,68 @@ +from datetime import datetime +from typing import List, Optional + +from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( + TraceItemAttributesRequest, + TraceItemAttributesResponse, +) +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey + +from snuba.clickhouse.formatter.nodes import FormattedQuery, StringNode +from snuba.datasets.schemas.tables import TableSource +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.utils.metrics.timer import Timer +from snuba.web.rpc.exceptions import BadSnubaRPCRequestException + + +def trace_item_attribute_list_query( + request: TraceItemAttributesRequest, _timer: Optional[Timer] = None +) -> TraceItemAttributesResponse: + if request.type == AttributeKey.Type.TYPE_STRING: + storage = get_storage(StorageKey("spans_str_attrs")) + elif request.type == AttributeKey.Type.TYPE_FLOAT: + storage = get_storage(StorageKey("spans_num_attrs")) + else: + return TraceItemAttributesResponse(tags=[]) + + data_source = storage.get_schema().get_data_source() + assert isinstance(data_source, TableSource) + + if request.limit > 1000: + raise BadSnubaRPCRequestException("Limit can be at most 1000") + + # this table stores timestamp as toStartOfDay(x) in UTC, so if you request 4PM - 8PM on a specific day, nada + start_timestamp = datetime.utcfromtimestamp(request.meta.start_timestamp.seconds) + end_timestamp = datetime.utcfromtimestamp(request.meta.end_timestamp.seconds) + if start_timestamp.day == end_timestamp.day: + start_timestamp = start_timestamp.replace( + day=start_timestamp.day - 1, hour=0, minute=0, second=0, microsecond=0 + ) + end_timestamp = end_timestamp.replace(day=end_timestamp.day + 1) + request.meta.start_timestamp.seconds = int(start_timestamp.timestamp()) + request.meta.end_timestamp.seconds = int(end_timestamp.timestamp()) + + query = f""" +SELECT DISTINCT attr_key, timestamp +FROM {data_source.get_table_name()} +WHERE organization_id={request.meta.organization_id} +AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)}) +AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds}) +ORDER BY attr_key +LIMIT {request.limit} OFFSET {request.offset} +""" + + cluster = storage.get_cluster() + reader = cluster.get_reader() + result = reader.execute(FormattedQuery([StringNode(query)])) + + tags: List[TraceItemAttributesResponse.Tag] = [] + for row in result.get("data", []): + tags.append( + TraceItemAttributesResponse.Tag( + name=row["attr_key"], + type=request.type, + ) + ) + + return TraceItemAttributesResponse(tags=tags) diff --git a/snuba/web/rpc/trace_item_attribute_values.py b/snuba/web/rpc/trace_item_attribute_values.py new file mode 100644 index 0000000000..640937c0ea --- /dev/null +++ b/snuba/web/rpc/trace_item_attribute_values.py @@ -0,0 +1,115 @@ +import uuid +from datetime import datetime + +from google.protobuf.json_format import MessageToDict +from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( + AttributeValuesRequest, + AttributeValuesResponse, +) + +from snuba.attribution.appid import AppID +from snuba.attribution.attribution_info import AttributionInfo +from snuba.datasets.entities.entity_key import EntityKey +from snuba.datasets.entities.factory import get_entity +from snuba.datasets.pluggable_dataset import PluggableDataset +from snuba.query import OrderBy, OrderByDirection, SelectedExpression +from snuba.query.data_source.simple import Entity +from snuba.query.dsl import Functions as f +from snuba.query.dsl import column, literal, literals_array +from snuba.query.logical import Query +from snuba.query.query_settings import HTTPQuerySettings +from snuba.request import Request as SnubaRequest +from snuba.utils.metrics.timer import Timer +from snuba.web.query import run_query +from snuba.web.rpc.common import base_conditions_and, treeify_or_and_conditions +from snuba.web.rpc.exceptions import BadSnubaRPCRequestException + + +def _build_query(request: AttributeValuesRequest) -> Query: + if request.limit > 1000: + raise BadSnubaRPCRequestException("Limit can be at most 1000") + + entity = Entity( + key=EntityKey("spans_str_attrs"), + schema=get_entity(EntityKey("spans_str_attrs")).get_data_model(), + sample=None, + ) + + # this table stores timestamp as toStartOfDay(x) in UTC, so if you request 4PM - 8PM on a specific day, nada + start_timestamp = datetime.utcfromtimestamp(request.meta.start_timestamp.seconds) + end_timestamp = datetime.utcfromtimestamp(request.meta.end_timestamp.seconds) + if start_timestamp.day == end_timestamp.day: + start_timestamp = start_timestamp.replace( + day=start_timestamp.day - 1, hour=0, minute=0, second=0, microsecond=0 + ) + end_timestamp = end_timestamp.replace(day=end_timestamp.day + 1) + request.meta.start_timestamp.seconds = int(start_timestamp.timestamp()) + request.meta.end_timestamp.seconds = int(end_timestamp.timestamp()) + + res = Query( + from_clause=entity, + selected_columns=[ + SelectedExpression( + name="attr_value", + expression=f.distinct(column("attr_value", alias="attr_value")), + ), + ], + condition=base_conditions_and( + request.meta, + f.equals(column("attr_key"), literal(request.name)), + # multiSearchAny has special treatment with ngram bloom filters + # https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#functions-support + f.multiSearchAny( + column("attr_value"), + literals_array(None, [literal(request.value_substring_match)]), + ), + ), + order_by=[ + OrderBy( + direction=OrderByDirection.ASC, expression=column("organization_id") + ), + OrderBy(direction=OrderByDirection.ASC, expression=column("attr_key")), + OrderBy(direction=OrderByDirection.ASC, expression=column("attr_value")), + ], + limit=request.limit, + offset=request.offset, + ) + treeify_or_and_conditions(res) + return res + + +def _build_snuba_request( + request: AttributeValuesRequest, +) -> SnubaRequest: + return SnubaRequest( + id=str(uuid.uuid4()), + original_body=MessageToDict(request), + query=_build_query(request), + query_settings=HTTPQuerySettings(), + attribution_info=AttributionInfo( + referrer=request.meta.referrer, + team="eap", + feature="eap", + tenant_ids={ + "organization_id": request.meta.organization_id, + "referrer": request.meta.referrer, + }, + app_id=AppID("eap"), + parent_api="trace_item_values", + ), + ) + + +def trace_item_attribute_values_query( + request: AttributeValuesRequest, timer: Timer | None = None +) -> AttributeValuesResponse: + timer = timer or Timer("trace_item_values") + snuba_request = _build_snuba_request(request) + res = run_query( + dataset=PluggableDataset(name="eap", all_entities=[]), + request=snuba_request, + timer=timer, + ) + return AttributeValuesResponse( + values=[r["attr_value"] for r in res.result.get("data", [])] + ) diff --git a/snuba/web/views.py b/snuba/web/views.py index 088a9f9e5d..1861f28937 100644 --- a/snuba/web/views.py +++ b/snuba/web/views.py @@ -35,12 +35,6 @@ render_template, ) from flask import request as http_request -from google.protobuf.message import Message as ProtobufMessage -from sentry_protos.snuba.v1alpha.endpoint_aggregate_bucket_pb2 import ( - AggregateBucketRequest, -) -from sentry_protos.snuba.v1alpha.endpoint_span_samples_pb2 import SpanSamplesRequest -from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import TagsListRequest from werkzeug import Response as WerkzeugResponse from werkzeug.exceptions import InternalServerError @@ -81,10 +75,8 @@ from snuba.web.converters import DatasetConverter, EntityConverter, StorageConverter from snuba.web.delete_query import DeletesNotEnabledError, delete_from_storage from snuba.web.query import parse_and_run_query +from snuba.web.rpc import ALL_RPCS from snuba.web.rpc.exceptions import BadSnubaRPCRequestException -from snuba.web.rpc.span_samples import span_samples_query as span_samples_query -from snuba.web.rpc.tags_list import tags_list_query -from snuba.web.rpc.timeseries import timeseries_query as timeseries_query from snuba.writer import BatchWriterEncoderWrapper, WriterTableRow logger = logging.getLogger("snuba.api") @@ -283,15 +275,8 @@ def unqualified_query_view(*, timer: Timer) -> Union[Response, str, WerkzeugResp @application.route("/rpc/", methods=["POST"]) @util.time_request("timeseries") def rpc(*, name: str, timer: Timer) -> Response: - rpcs: Mapping[ - str, Tuple[Callable[[Any, Timer], ProtobufMessage], type[ProtobufMessage]] - ] = { - "AggregateBucketRequest": (timeseries_query, AggregateBucketRequest), - "SpanSamplesRequest": (span_samples_query, SpanSamplesRequest), - "TagsListRequest": (tags_list_query, TagsListRequest), - } try: - endpoint, req_class = rpcs[name] + endpoint, req_class = ALL_RPCS[name] req = req_class() req.ParseFromString(http_request.data) diff --git a/tests/admin/clickhouse_migrations/test_migration_checks.py b/tests/admin/clickhouse_migrations/test_migration_checks.py index 7ad0790759..198ef35dca 100644 --- a/tests/admin/clickhouse_migrations/test_migration_checks.py +++ b/tests/admin/clickhouse_migrations/test_migration_checks.py @@ -29,9 +29,9 @@ def group_loader() -> GroupLoader: RUN_MIGRATIONS: Sequence[MigrationDetails] = [ - MigrationDetails("0001", Status.COMPLETED, True), - MigrationDetails("0002", Status.NOT_STARTED, True), - MigrationDetails("0003", Status.NOT_STARTED, True), + MigrationDetails("0001", Status.COMPLETED, True, True), + MigrationDetails("0002", Status.NOT_STARTED, True, True), + MigrationDetails("0003", Status.NOT_STARTED, True, True), ] @@ -62,9 +62,9 @@ def test_status_checker_run( REVERSE_MIGRATIONS: Sequence[MigrationDetails] = [ - MigrationDetails("0001", Status.COMPLETED, True), - MigrationDetails("0002", Status.IN_PROGRESS, True), - MigrationDetails("0003", Status.NOT_STARTED, True), + MigrationDetails("0001", Status.COMPLETED, True, True), + MigrationDetails("0002", Status.IN_PROGRESS, True, True), + MigrationDetails("0003", Status.NOT_STARTED, True, True), ] @@ -155,7 +155,10 @@ def test_run_migration_checks_and_policies( mock_policy = Mock() checker = mock_checker() mock_runner.show_all.return_value = [ - (MigrationGroup("events"), [MigrationDetails("0001", Status.COMPLETED, True)]) + ( + MigrationGroup("events"), + [MigrationDetails("0001", Status.COMPLETED, True, True)], + ) ] mock_policy.can_run.return_value = policy_result[0] diff --git a/tests/migrations/test_runner.py b/tests/migrations/test_runner.py index 44d096f014..6b1360dcf5 100644 --- a/tests/migrations/test_runner.py +++ b/tests/migrations/test_runner.py @@ -110,6 +110,26 @@ def test_show_all_for_groups() -> None: assert all([migration.status == Status.COMPLETED for migration in migrations]) +@pytest.mark.clickhouse_db +def test_show_all_nonexistent_migration() -> None: + runner = Runner() + assert all( + [ + migration.status == Status.NOT_STARTED + for (_, group_migrations) in runner.show_all() + for migration in group_migrations + ] + ) + runner.run_all(force=True) + assert all( + [ + migration.status == Status.COMPLETED + for (_, group_migrations) in runner.show_all() + for migration in group_migrations + ] + ) + + @pytest.mark.clickhouse_db def test_run_migration() -> None: runner = Runner() diff --git a/tests/pipeline/test_entity_processing_stage_composite.py b/tests/pipeline/test_entity_processing_stage_composite.py index 376e13a5d4..6eda556057 100644 --- a/tests/pipeline/test_entity_processing_stage_composite.py +++ b/tests/pipeline/test_entity_processing_stage_composite.py @@ -29,6 +29,7 @@ JoinType, ) from snuba.query.data_source.simple import Entity, Table +from snuba.query.exceptions import ValidationException from snuba.query.expressions import ( Column, FunctionCall, @@ -456,3 +457,92 @@ def test_composite( .data ) assert actual == expected + + +TEST_CASES_INVALID = [ + pytest.param( + CompositeQuery( + from_clause=JoinClause( + left_node=IndividualNode( + alias="err", + data_source=events_ent, + ), + right_node=IndividualNode( + alias="groups", + data_source=groups_ent, + ), + keys=[ + JoinCondition( + left=JoinConditionExpression("err", "group_id"), + right=JoinConditionExpression("groups", "id"), + ) + ], + join_type=JoinType.INNER, + ), + selected_columns=[ + SelectedExpression( + "f_release", + FunctionCall( + "f_release", + "f", + (Column(None, "err", "release"),), + ), + ), + SelectedExpression( + "_snuba_right", + Column("_snuba_right", "groups", "status"), + ), + ], + condition=binary_condition( + BooleanFunctions.AND, + binary_condition( + ConditionFunctions.EQ, + Column(None, "err", "project_id"), + Literal(None, 1), + ), + binary_condition( + BooleanFunctions.AND, + binary_condition( + ConditionFunctions.GTE, + Column(None, "err", "timestamp"), + Literal(None, datetime(2020, 1, 1, 12, 0)), + ), + binary_condition( + ConditionFunctions.GTE, + Column(None, "err", "foo"), + Literal(None, 1), + ), + ), + ), + ), + ValidationException, + id="Join query with missing column", + ), +] + + +@pytest.mark.parametrize("logical_query, expected_error", TEST_CASES_INVALID) +@pytest.mark.clickhouse_db +def test_invalid_composite( + logical_query: CompositeQuery[Entity], + expected_error: Exception, +) -> None: + request = Request( + id="", + original_body={"query": "placeholder"}, + query=cast(LogicalQuery, logical_query), + query_settings=HTTPQuerySettings(), + attribution_info=AttributionInfo( + get_app_id("blah"), {"tenant_type": "tenant_id"}, "blah", None, None, None + ), + ) + actual = EntityProcessingStage().execute( + QueryPipelineResult( + data=request, + query_settings=request.query_settings, + timer=Timer("test"), + error=None, + ) + ) + assert actual.error and not actual.data + assert isinstance(type(actual.error), type(expected_error)) diff --git a/tests/web/rpc/test_span_samples.py b/tests/web/rpc/test_span_samples.py index a933d54daf..1e70c69ceb 100644 --- a/tests/web/rpc/test_span_samples.py +++ b/tests/web/rpc/test_span_samples.py @@ -10,6 +10,7 @@ from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import ( AttributeKey, AttributeValue, + VirtualColumnContext, ) from sentry_protos.snuba.v1alpha.trace_item_filter_pb2 import ( ComparisonFilter, @@ -24,6 +25,8 @@ from tests.base import BaseApiTest from tests.helpers import write_raw_unprocessed_events +_RELEASE_TAG = "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b" + def gen_message(dt: datetime) -> Mapping[str, Any]: return { @@ -34,7 +37,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: "is_segment": True, "data": { "sentry.environment": "development", - "sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", + "sentry.release": _RELEASE_TAG, "thread.name": "uWSGIWorker1Core0", "thread.id": "8522009600", "sentry.segment.name": "/api/0/relays/projectconfigs/", @@ -62,7 +65,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: "environment": "development", "op": "http.server", "platform": "python", - "release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", + "release": _RELEASE_TAG, "sdk.name": "sentry.python.django", "sdk.version": "2.7.0", "status": "ok", @@ -166,7 +169,6 @@ def test_with_data(self, setup_teardown: Any) -> None: ) ], limit=61, - attribute_key_transform_context=None, ) response = span_samples_query(message) assert [ @@ -222,7 +224,6 @@ def test_booleans_and_number_compares(self, setup_teardown: Any) -> None: ) ], limit=61, - attribute_key_transform_context=None, ) response = span_samples_query(message) assert [ @@ -231,3 +232,103 @@ def test_booleans_and_number_compares(self, setup_teardown: Any) -> None: ) for x in response.span_samples ] == [{"is_segment": True, "span_id": "123456781234567d"} for _ in range(60)] + + def test_with_virtual_columns(self, setup_teardown: Any) -> None: + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + message = SpanSamplesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="category") + ) + ), + keys=[ + AttributeKey(type=AttributeKey.TYPE_STRING, name="project_name"), + AttributeKey(type=AttributeKey.TYPE_STRING, name="release_version"), + AttributeKey(type=AttributeKey.TYPE_STRING, name="sentry.sdk.name"), + ], + order_by=[ + SpanSamplesRequest.OrderBy( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, name="project_name" + ), + ) + ], + limit=61, + virtual_column_contexts=[ + VirtualColumnContext( + from_column_name="project_id", + to_column_name="project_name", + value_map={"1": "sentry", "2": "snuba"}, + ), + VirtualColumnContext( + from_column_name="release", + to_column_name="release_version", + value_map={_RELEASE_TAG: "4.2.0.69"}, + ), + ], + ) + response = span_samples_query(message) + assert [ + dict((k, x.results[k].val_str) for k in x.results) + for x in response.span_samples + ] == [ + { + "project_name": "sentry", + "sentry.sdk.name": "sentry.python.django", + "release_version": "4.2.0.69", + } + for _ in range(60) + ] + + def test_order_by_virtual_columns(self, setup_teardown: Any) -> None: + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + message = SpanSamplesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="category") + ) + ), + keys=[ + AttributeKey(type=AttributeKey.TYPE_STRING, name="special_color"), + ], + order_by=[ + SpanSamplesRequest.OrderBy( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, name="special_color" + ) + ) + ], + limit=61, + virtual_column_contexts=[ + VirtualColumnContext( + from_column_name="color", + to_column_name="special_color", + value_map={"red": "1", "green": "2", "blue": "3"}, + ), + ], + ) + response = span_samples_query(message) + result_dicts = [ + dict((k, x.results[k].val_str) for k in x.results) + for x in response.span_samples + ] + colors = [d["special_color"] for d in result_dicts] + assert sorted(colors) == colors diff --git a/tests/web/rpc/test_timeseries_api.py b/tests/web/rpc/test_timeseries_api.py index d86e7e7ec3..d5093c9641 100644 --- a/tests/web/rpc/test_timeseries_api.py +++ b/tests/web/rpc/test_timeseries_api.py @@ -18,7 +18,8 @@ from tests.helpers import write_raw_unprocessed_events -def gen_message(dt: datetime) -> Mapping[str, Any]: +def gen_message(dt: datetime, msg_index: int) -> Mapping[str, Any]: + dt = dt - timedelta(hours=1) + timedelta(minutes=msg_index) return { "description": "/api/0/relays/projectconfigs/", "duration_ms": 152, @@ -42,7 +43,12 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: }, "measurements": { "num_of_spans": {"value": 50.0}, - "eap.measurement": {"value": 420}, + "eap.measurement": {"value": msg_index}, + "client_sample_rate": { + "value": 0.01 + if msg_index % 10 == 0 + else 1 # every 10th span should be 100x upscaled + }, }, "organization_id": 1, "origin": "auto.http.django", @@ -83,7 +89,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: "location": random.choice(["mobile", "frontend", "backend"]), }, "trace_id": uuid.uuid4().hex, - "start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)), + "start_timestamp_ms": int(dt.timestamp()) * 1000, "start_timestamp_precise": dt.timestamp(), "end_timestamp_precise": dt.timestamp() + 1, } @@ -97,8 +103,7 @@ def gen_message(dt: datetime) -> Mapping[str, Any]: @pytest.fixture(autouse=True) def setup_teardown(clickhouse_db: None, redis_db: None) -> None: spans_storage = get_storage(StorageKey("eap_spans")) - start = BASE_TIME - messages = [gen_message(start - timedelta(minutes=i)) for i in range(120)] + messages = [gen_message(BASE_TIME, i) for i in range(120)] write_raw_unprocessed_events(spans_storage, messages) # type: ignore @@ -126,40 +131,94 @@ def test_basic(self) -> None: ) assert response.status_code == 200 - def test_with_data(self, setup_teardown: Any) -> None: - ts = Timestamp(seconds=int(BASE_TIME.timestamp())) - hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + def test_sum(self, setup_teardown: Any) -> None: message = AggregateBucketRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, cogs_category="something", referrer="something", - start_timestamp=Timestamp(seconds=hour_ago), - end_timestamp=ts, + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 30)), ), key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT), - aggregate=AggregateBucketRequest.FUNCTION_AVERAGE, - granularity_secs=1, + aggregate=AggregateBucketRequest.FUNCTION_SUM, + granularity_secs=300, ) response = timeseries_query(message) - assert response.result == [420 for _ in range(60)] + # spans have (measurement, sample rate) = (0, 100), (10, 1), ..., (100, 100) + # granularity puts five spans into the same bucket + # whole interval is 30 minutes, so there should be 6 buckets + # and our start time is exactly 1 hour after data stops + expected_results = [ + 60 * 100 + 61 + 62 + 63 + 64, + 65 + 66 + 67 + 68 + 69, + 70 * 100 + 71 + 72 + 73 + 74, + 75 + 76 + 77 + 78 + 79, + 80 * 100 + 81 + 82 + 83 + 84, + 85 + 86 + 87 + 88 + 89, + ] + assert response.result == expected_results def test_quantiles(self, setup_teardown: Any) -> None: - ts = Timestamp(seconds=int(BASE_TIME.timestamp())) - hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) message = AggregateBucketRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, cogs_category="something", referrer="something", - start_timestamp=Timestamp(seconds=hour_ago), - end_timestamp=ts, + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 60)), ), key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT), aggregate=AggregateBucketRequest.FUNCTION_P99, - granularity_secs=1, + granularity_secs=60 * 15, + ) + response = timeseries_query(message) + # spans have measurement = 0, 1, 2, ... + # for us, starts at 60, and granularity puts 15 spans into each bucket + # and the P99 of 15 spans is just the maximum of the 15. + # T-Digest is approximate, so these numbers can be +- 3 or so. + expected_results = pytest.approx( + [ + 60 + 15 * 0 + 14, + 60 + 15 * 1 + 14, + 60 + 15 * 2 + 14, + 60 + 15 * 3 + 14, + ], + rel=3, + ) + print(response.result) + print(expected_results) + assert response.result == expected_results + + def test_average(self, setup_teardown: Any) -> None: + message = AggregateBucketRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 30)), + ), + key=AttributeKey(name="eap.measurement", type=AttributeKey.TYPE_FLOAT), + aggregate=AggregateBucketRequest.FUNCTION_AVERAGE, + granularity_secs=300, ) response = timeseries_query(message) - assert response.result == [420 for _ in range(60)] + # spans have (measurement, sample rate) = (0, 100), (10, 1), ..., (100, 100) + # granularity puts five spans into the same bucket + # whole interval is 30 minutes, so there should be 6 buckets + # and our start time is exactly 1 hour after data stops + expected_results = pytest.approx( + [ + (60 * 100 + 61 + 62 + 63 + 64) / 104, + (65 + 66 + 67 + 68 + 69) / 5, + (70 * 100 + 71 + 72 + 73 + 74) / 104, + (75 + 76 + 77 + 78 + 79) / 5, + (80 * 100 + 81 + 82 + 83 + 84) / 104, + (85 + 86 + 87 + 88 + 89) / 5, + ] + ) + assert response.result == expected_results diff --git a/tests/web/rpc/test_tags_list.py b/tests/web/rpc/test_trace_item_attribute_list.py similarity index 64% rename from tests/web/rpc/test_tags_list.py rename to tests/web/rpc/test_trace_item_attribute_list.py index bb826ac7ba..ecb0846e70 100644 --- a/tests/web/rpc/test_tags_list.py +++ b/tests/web/rpc/test_trace_item_attribute_list.py @@ -5,14 +5,15 @@ import pytest from google.protobuf.timestamp_pb2 import Timestamp from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import ( - TagsListRequest, - TagsListResponse, + TraceItemAttributesRequest, + TraceItemAttributesResponse, ) from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta +from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey -from snuba.web.rpc.tags_list import tags_list_query +from snuba.web.rpc.trace_item_attribute_list import trace_item_attribute_list_query from tests.base import BaseApiTest from tests.helpers import write_raw_unprocessed_events @@ -63,11 +64,11 @@ def setup_teardown(clickhouse_db: None, redis_db: None) -> None: @pytest.mark.clickhouse_db @pytest.mark.redis_db -class TestTagsList(BaseApiTest): +class TestTraceItemAttributes(BaseApiTest): def test_basic(self) -> None: ts = Timestamp() ts.GetCurrentTime() - message = TagsListRequest( + message = TraceItemAttributesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -98,12 +99,12 @@ def test_basic(self) -> None: offset=20, ) response = self.app.post( - "/rpc/TagsListRequest", data=message.SerializeToString() + "/rpc/TraceItemAttributesRequest", data=message.SerializeToString() ) assert response.status_code == 200 - def test_simple_case(self, setup_teardown: Any) -> None: - message = TagsListRequest( + def test_simple_case_str(self, setup_teardown: Any) -> None: + message = TraceItemAttributesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -132,17 +133,58 @@ def test_simple_case(self, setup_teardown: Any) -> None: ), limit=10, offset=0, + type=AttributeKey.Type.TYPE_STRING, ) - response = tags_list_query(message) + response = trace_item_attribute_list_query(message) assert response.tags == [ - TagsListResponse.Tag( - name=f"a_tag_{i:03}", type=TagsListResponse.TYPE_STRING + TraceItemAttributesResponse.Tag( + name=f"a_tag_{i:03}", type=AttributeKey.Type.TYPE_STRING + ) + for i in range(0, 10) + ] + + def test_simple_case_float(self, setup_teardown: Any) -> None: + message = TraceItemAttributesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp( + seconds=int( + datetime( + year=BASE_TIME.year, + month=BASE_TIME.month, + day=BASE_TIME.day - 1, + tzinfo=UTC, + ).timestamp() + ) + ), + end_timestamp=Timestamp( + seconds=int( + datetime( + year=BASE_TIME.year, + month=BASE_TIME.month, + day=BASE_TIME.day + 1, + tzinfo=UTC, + ).timestamp() + ) + ), + ), + limit=10, + offset=0, + type=AttributeKey.Type.TYPE_FLOAT, + ) + response = trace_item_attribute_list_query(message) + assert response.tags == [ + TraceItemAttributesResponse.Tag( + name=f"b_measurement_{i:03}", type=AttributeKey.Type.TYPE_FLOAT ) for i in range(0, 10) ] def test_with_offset(self, setup_teardown: Any) -> None: - message = TagsListRequest( + message = TraceItemAttributesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -170,21 +212,24 @@ def test_with_offset(self, setup_teardown: Any) -> None: ), ), limit=5, - offset=29, + offset=10, + type=AttributeKey.Type.TYPE_FLOAT, ) - response = tags_list_query(message) + response = trace_item_attribute_list_query(message) assert response.tags == [ - TagsListResponse.Tag(name="a_tag_029", type=TagsListResponse.TYPE_STRING), - TagsListResponse.Tag( - name="b_measurement_000", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_010", type=AttributeKey.Type.TYPE_FLOAT + ), + TraceItemAttributesResponse.Tag( + name="b_measurement_011", type=AttributeKey.Type.TYPE_FLOAT ), - TagsListResponse.Tag( - name="b_measurement_001", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_012", type=AttributeKey.Type.TYPE_FLOAT ), - TagsListResponse.Tag( - name="b_measurement_002", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_013", type=AttributeKey.Type.TYPE_FLOAT ), - TagsListResponse.Tag( - name="b_measurement_003", type=TagsListResponse.TYPE_NUMBER + TraceItemAttributesResponse.Tag( + name="b_measurement_014", type=AttributeKey.Type.TYPE_FLOAT ), ] diff --git a/tests/web/rpc/test_trace_item_attribute_values.py b/tests/web/rpc/test_trace_item_attribute_values.py new file mode 100644 index 0000000000..3f400219be --- /dev/null +++ b/tests/web/rpc/test_trace_item_attribute_values.py @@ -0,0 +1,127 @@ +import uuid +from datetime import UTC, datetime, timedelta +from typing import Any, Mapping + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import AttributeValuesRequest +from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.web.rpc.trace_item_attribute_values import trace_item_attribute_values_query +from tests.base import BaseApiTest +from tests.helpers import write_raw_unprocessed_events + +BASE_TIME = datetime.utcnow().replace(minute=0, second=0, microsecond=0) - timedelta( + minutes=180 +) +COMMON_META = RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp( + seconds=int( + datetime( + year=BASE_TIME.year, + month=BASE_TIME.month, + day=BASE_TIME.day, + tzinfo=UTC, + ).timestamp() + ) + ), + end_timestamp=Timestamp( + seconds=int( + datetime( + year=BASE_TIME.year, + month=BASE_TIME.month, + day=BASE_TIME.day + 1, + tzinfo=UTC, + ).timestamp() + ) + ), +) + + +def gen_message(tags: Mapping[str, str]) -> Mapping[str, Any]: + return { + "description": "/api/0/relays/projectconfigs/", + "duration_ms": 152, + "event_id": "d826225de75d42d6b2f01b957d51f18f", + "exclusive_time_ms": 0.228, + "is_segment": True, + "data": {}, + "measurements": {}, + "organization_id": 1, + "origin": "auto.http.django", + "project_id": 1, + "received": 1721319572.877828, + "retention_days": 90, + "segment_id": "8873a98879faf06d", + "sentry_tags": {}, + "span_id": uuid.uuid4().hex, + "tags": tags, + "trace_id": uuid.uuid4().hex, + "start_timestamp_ms": int(BASE_TIME.timestamp() * 1000), + "start_timestamp_precise": BASE_TIME.timestamp(), + "end_timestamp_precise": BASE_TIME.timestamp() + 1, + } + + +@pytest.fixture(autouse=True) +def setup_teardown(clickhouse_db: None, redis_db: None) -> None: + spans_storage = get_storage(StorageKey("eap_spans")) + messages = [ + gen_message({"tag1": "herp", "tag2": "herp"}), + gen_message({"tag1": "herpderp", "tag2": "herp"}), + gen_message({"tag1": "durp", "tag3": "herp"}), + gen_message({"tag1": "blah", "tag2": "herp"}), + gen_message({"tag1": "derpderp", "tag2": "derp"}), + gen_message({"tag2": "hehe"}), + gen_message({"tag1": "some_last_value"}), + ] + write_raw_unprocessed_events(spans_storage, messages) # type: ignore + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +class TestTraceItemAttributes(BaseApiTest): + def test_basic(self) -> None: + ts = Timestamp() + ts.GetCurrentTime() + message = AttributeValuesRequest( + meta=COMMON_META, + name="tag1", + limit=10, + offset=20, + ) + response = self.app.post( + "/rpc/AttributeValuesRequest", data=message.SerializeToString() + ) + assert response.status_code == 200 + + def test_simple_case(self, setup_teardown: Any) -> None: + message = AttributeValuesRequest(meta=COMMON_META, limit=5, name="tag1") + response = trace_item_attribute_values_query(message) + assert response.values == ["blah", "derpderp", "durp", "herp", "herpderp"] + + def test_offset(self, setup_teardown: Any) -> None: + message = AttributeValuesRequest( + meta=COMMON_META, limit=5, offset=1, name="tag1" + ) + response = trace_item_attribute_values_query(message) + assert response.values == [ + "derpderp", + "durp", + "herp", + "herpderp", + "some_last_value", + ] + + def test_with_value_filter(self, setup_teardown: Any) -> None: + message = AttributeValuesRequest( + meta=COMMON_META, limit=5, name="tag1", value_substring_match="erp" + ) + response = trace_item_attribute_values_query(message) + assert response.values == ["derpderp", "herp", "herpderp"]