diff --git a/MIGRATIONS.md b/MIGRATIONS.md index 6a89cec888..83f51398d0 100644 --- a/MIGRATIONS.md +++ b/MIGRATIONS.md @@ -105,7 +105,18 @@ The `snuba migrations` CLI tool should be used to manage migrations. ## Writing migrations -In order to add a new migration, first determine which migration group the new migration should be added to, and add an entry to that group in `migrations/groups.py` with the new migration identifier you have chosen. By convention we prefix migration IDs with a number matching the position of the migration in the group, i.e. the 4th migration in that group will be prefixed with `0004_`. Add a file which will contain the new migration at `/migrations/snuba_migrations//.py`. +### Auto-generating migrations +If you want to write a migration for the following common use cases, you can skip the rest of this document and have it autogenerated for you: +* Add a column to an existing table + +Instructions: +1. First locate the relevant `storage.yaml` file for the table you want to update. If you dont already know, this is basically just a schema definition for the table. This file will be located in `snuba/datatsets/configuration//storages/.yaml` ex. `.../group_attributes/storages/group_attributes.yaml`. +2. Make the desired modifications to the storage.yaml. This will likely look something like adding a new column to `schema.columns` +3. run the command `snuba migrations generate path/to/storage.yaml`. This will generate the migration based on the modifications to the storage.yaml file. +4. You're basically done, now you just have to commit, push, and merge your migration! + +### Overview +In order to add a new migration, first determine which migration group the new migration should be added to. You must prefix migration IDs with a number matching the position of the migration in the group, i.e. the 4th migration in that group will be prefixed with `0004_`. Add a file which will contain the new migration at `/migrations/snuba_migrations//.py`. If you need to create a new group, add the group to `migrations.groups.MigrationGroup` and a loader to `migrations.group_loader` for the group defining the path to the directory where that group's migrations will be located. Register these to `migrations.groups._REGISTERED_MIGRATION_GROUPS` - note the position of the group in this list determines the order the migrations will be executed in. For a new MigrationGroup, the `readiness_state` should be set to `limited` which means the migrations will only be automatically executed in CI and the local environment. diff --git a/snuba/admin/iam_policy/iam_policy.json b/snuba/admin/iam_policy/iam_policy.json index 33568ef0a7..3909a85c52 100644 --- a/snuba/admin/iam_policy/iam_policy.json +++ b/snuba/admin/iam_policy/iam_policy.json @@ -37,7 +37,8 @@ "group:team-ingest@sentry.io", "group:team-visibility@sentry.io", "group:team-performance@sentry.io", - "group:team-eng-managers@sentry.io" + "group:team-eng-managers@sentry.io", + "group:team-telemetry-experience@sentry.io" ], "role": "roles/CardinalityAnalyzer" }, diff --git a/snuba/admin/production_queries/prod_queries.py b/snuba/admin/production_queries/prod_queries.py index 8792824437..8d28eb4fa5 100644 --- a/snuba/admin/production_queries/prod_queries.py +++ b/snuba/admin/production_queries/prod_queries.py @@ -22,15 +22,16 @@ def run_snql_query(body: Dict[str, Any], user: str) -> Response: @audit_log def run_query_with_audit(query: str, user: str) -> Response: - dataset = get_dataset(body.pop("dataset")) + dataset_name = body.pop("dataset") + dataset = get_dataset(dataset_name) body["dry_run"] = True - response = dataset_query(dataset, body, Timer("admin")) + response = dataset_query(dataset_name, body, Timer("admin")) if response.status_code != 200: return response body["dry_run"] = False _validate_projects_in_query(body, dataset) - return dataset_query(dataset, body, Timer("admin")) + return dataset_query(dataset_name, body, Timer("admin")) return run_query_with_audit(body["query"], user) diff --git a/snuba/admin/views.py b/snuba/admin/views.py index a332166289..458a85fc11 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -58,11 +58,7 @@ load_instruction, store_instruction, ) -from snuba.datasets.factory import ( - InvalidDatasetError, - get_dataset, - get_enabled_dataset_names, -) +from snuba.datasets.factory import InvalidDatasetError, get_enabled_dataset_names from snuba.datasets.storages.factory import get_all_storage_keys, get_storage from snuba.datasets.storages.storage_key import StorageKey from snuba.migrations.connect import check_for_inactive_replicas @@ -780,9 +776,9 @@ def snuba_debug() -> Response: body = json.loads(request.data) body["debug"] = True body["dry_run"] = True + dataset_name = body.pop("dataset") try: - dataset = get_dataset(body.pop("dataset")) - response = dataset_query(dataset, body, Timer("admin")) + response = dataset_query(dataset_name, body, Timer("admin")) data = response.get_json() assert isinstance(data, dict) @@ -1026,8 +1022,7 @@ def production_snql_query() -> Response: body = json.loads(request.data) body["tenant_ids"] = {"referrer": request.referrer, "organization_id": ORG_ID} try: - ret = run_snql_query(body, g.user.email) - return ret + return run_snql_query(body, g.user.email) except InvalidQueryException as exception: return Response( json.dumps({"error": {"message": str(exception)}}, indent=4), diff --git a/snuba/cli/migrations.py b/snuba/cli/migrations.py index 359148c189..95d180cfaa 100644 --- a/snuba/cli/migrations.py +++ b/snuba/cli/migrations.py @@ -387,7 +387,7 @@ def add_node( @click.option("--name", type=str, help="optional name for the migration") def generate(storage_path: str, name: Optional[str] = None) -> None: """ - Given a path to user-modified storage.yaml definition (inside snuba/datasets/configuration/*/storages/*.py), + Given a path to user-modified storage.yaml definition (inside snuba/datasets/configuration/*/storages/*.yaml), and an optional name for the migration, generates a snuba migration based on the schema modifications to the storage.yaml. @@ -398,6 +398,8 @@ def generate(storage_path: str, name: Optional[str] = None) -> None: The generated migration will be written into the local directory. The user is responsible for making the commit, PR, and merging. + + see MIGRATIONS.md in the root folder for more info """ expected_pattern = r"(.+/)?snuba/datasets/configuration/.*/storages/.*\.(yml|yaml)" if not re.fullmatch(expected_pattern, storage_path): diff --git a/snuba/datasets/configuration/discover/entities/discover.yaml b/snuba/datasets/configuration/discover/entities/discover.yaml index daf619ad79..956937d7f1 100644 --- a/snuba/datasets/configuration/discover/entities/discover.yaml +++ b/snuba/datasets/configuration/discover/entities/discover.yaml @@ -256,6 +256,7 @@ schema: args: { schema_modifiers: [nullable] }, }, { name: profile_id, type: UUID, args: { schema_modifiers: [nullable] } }, + { name: profiler_id, type: UUID, args: { schema_modifiers: [nullable] } }, { name: replay_id, type: UUID, @@ -297,6 +298,7 @@ storages: - group_ids - app_start_type - profile_id + - profiler_id - mapper: ColumnToLiteral args: from_table_name: null diff --git a/snuba/datasets/configuration/discover/entities/discover_events.yaml b/snuba/datasets/configuration/discover/entities/discover_events.yaml index 0cfcfa864c..d7c8ac2c89 100644 --- a/snuba/datasets/configuration/discover/entities/discover_events.yaml +++ b/snuba/datasets/configuration/discover/entities/discover_events.yaml @@ -309,6 +309,7 @@ storages: - group_ids - app_start_type - profile_id + - profiler_id - mapper: ColumnToFunctionOnColumn args: from_table_name: null @@ -465,6 +466,7 @@ storages: - group_ids - app_start_type - profile_id + - profiler_id subscriptables: - mapper: SubscriptableMapper args: diff --git a/snuba/datasets/configuration/discover/entities/discover_transactions.yaml b/snuba/datasets/configuration/discover/entities/discover_transactions.yaml index 93d1dbec46..aef257eef3 100644 --- a/snuba/datasets/configuration/discover/entities/discover_transactions.yaml +++ b/snuba/datasets/configuration/discover/entities/discover_transactions.yaml @@ -126,6 +126,7 @@ schema: }, { name: app_start_type, type: String }, { name: profile_id, type: UUID, args: { schema_modifiers: [nullable] } }, + { name: profiler_id, type: UUID, args: { schema_modifiers: [nullable] } }, { name: replay_id, type: UUID, args: { schema_modifiers: [nullable] } } ] required_time_column: finish_ts diff --git a/snuba/datasets/configuration/transactions/entities/transactions.yaml b/snuba/datasets/configuration/transactions/entities/transactions.yaml index 88ef4a0cfc..0f035ed12c 100644 --- a/snuba/datasets/configuration/transactions/entities/transactions.yaml +++ b/snuba/datasets/configuration/transactions/entities/transactions.yaml @@ -127,6 +127,7 @@ schema: }, { name: app_start_type, type: String }, { name: profile_id, type: UUID, args: { schema_modifiers: [nullable] } }, + { name: profiler_id, type: UUID, args: { schema_modifiers: [nullable] } }, { name: replay_id, type: UUID, args: { schema_modifiers: [nullable] } }, ] diff --git a/snuba/datasets/configuration/transactions/storages/transactions.yaml b/snuba/datasets/configuration/transactions/storages/transactions.yaml index e9c1a8e59e..e7d34cef7a 100644 --- a/snuba/datasets/configuration/transactions/storages/transactions.yaml +++ b/snuba/datasets/configuration/transactions/storages/transactions.yaml @@ -157,6 +157,7 @@ schema: }, { name: app_start_type, type: String }, { name: profile_id, type: UUID, args: { schema_modifiers: [nullable] } }, + { name: profiler_id, type: UUID, args: { schema_modifiers: [nullable] } }, { name: replay_id, type: UUID, args: { schema_modifiers: [nullable] } }, ] local_table_name: transactions_local @@ -203,7 +204,7 @@ query_processors: trace.span_id: span_id - processor: UUIDColumnProcessor args: - columns: [event_id, trace_id, profile_id, replay_id] + columns: [event_id, trace_id, profile_id, profiler_id, replay_id] - processor: HexIntColumnProcessor args: columns: [span_id] diff --git a/snuba/datasets/processors/transactions_processor.py b/snuba/datasets/processors/transactions_processor.py index 1b3b363869..781cf501df 100644 --- a/snuba/datasets/processors/transactions_processor.py +++ b/snuba/datasets/processors/transactions_processor.py @@ -272,6 +272,10 @@ def _process_contexts_and_user( if profile_id is not None: processed["profile_id"] = str(uuid.UUID(profile_id)) + profiler_id = profile_context.get("profiler_id") + if profiler_id is not None: + processed["profiler_id"] = str(uuid.UUID(profiler_id)) + replay_context = contexts.get("replay") if replay_context is not None: replay_id = replay_context.get("replay_id") @@ -432,6 +436,7 @@ def _sanitize_contexts( # again in the context array profile_ctx = sanitized_context.get("profile", {}) profile_ctx.pop("profile_id", None) + profile_ctx.pop("profiler_id", None) replay_ctx = sanitized_context.get("replay", {}) replay_ctx.pop("replay_id", None) diff --git a/snuba/migrations/group_loader.py b/snuba/migrations/group_loader.py index 8255e7f687..0474504cdd 100644 --- a/snuba/migrations/group_loader.py +++ b/snuba/migrations/group_loader.py @@ -62,6 +62,11 @@ def get_migrations(self) -> Sequence[str]: last = None for fname in migration_filenames: if last is not None and fname[:4] == last[:4]: + """ + if this is failing in CI when u think the files dont exist + i think its a github cache or something u might have to + remake the PR or branch or something + """ raise ValueError( f"""Duplicate migration number for the following files: {os.path.join(migration_folder,last)}.py diff --git a/snuba/migrations/operations.py b/snuba/migrations/operations.py index e0c0cfc1c6..585c21e52e 100644 --- a/snuba/migrations/operations.py +++ b/snuba/migrations/operations.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import time from abc import ABC, abstractmethod from enum import Enum from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union @@ -77,12 +78,36 @@ def execute(self) -> None: logger.info(f"Executing on {self.target.value} node: {node}") try: connection.execute(self.format_sql(), settings=self._settings) + self._block_on_mutations(connection) except Exception: logger.exception( f"Failed to execute operation on {self.storage_set}, target: {self.target}\n{self.format_sql()}\n{self._settings}" ) raise + def _block_on_mutations( + self, conn: ClickhousePool, poll_seconds: int = 5, timeout_seconds: int = 300 + ) -> None: + """ + This function blocks until all entries of system.mutations + have is_done=1. Polls system.mutations every poll_seconds. + Raises error if not unblocked after timeout_seconds. + """ + slept_so_far = 0 + while True: + is_mutating = conn.execute( + "select count(*) from system.mutations where is_done=0" + ).results != [(0,)] + if not is_mutating: + return + elif slept_so_far >= timeout_seconds: + raise TimeoutError( + f"{conn.host}:{conn.port} not finished mutating after {timeout_seconds} seconds" + ) + else: + time.sleep(poll_seconds) + slept_so_far += poll_seconds + @abstractmethod def format_sql(self) -> str: raise NotImplementedError diff --git a/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py b/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py index 0f14e35ce6..4976739ab8 100644 --- a/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py +++ b/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py @@ -48,7 +48,7 @@ DEFAULT_BYTES_THROTTLE_DIVIDER = 2 DEFAULT_THREADS_THROTTLE_DIVIDER = 2 QUOTA_UNIT = "bytes" -SUGGESTION = "scan less bytes" +SUGGESTION = "The feature, organization/project is scanning too many bytes, this usually means they are abusing that API" class BytesScannedRejectingPolicy(AllocationPolicy): diff --git a/snuba/query/allocation_policies/bytes_scanned_window_policy.py b/snuba/query/allocation_policies/bytes_scanned_window_policy.py index f914dee873..ffed62633c 100644 --- a/snuba/query/allocation_policies/bytes_scanned_window_policy.py +++ b/snuba/query/allocation_policies/bytes_scanned_window_policy.py @@ -87,7 +87,7 @@ DEFAULT_OVERRIDE_LIMIT = -1 DEFAULT_BYTES_SCANNED_LIMIT = 10000000 QUOTA_UNIT = "bytes" -SUGGESTION = "scan less bytes" +SUGGESTION = "The feature, organization/project is scanning too many bytes, this usually means they are abusing that API" class BytesScannedWindowAllocationPolicy(AllocationPolicy): diff --git a/snuba/query/allocation_policies/concurrent_rate_limit.py b/snuba/query/allocation_policies/concurrent_rate_limit.py index be96331e7e..7d7662eaa7 100644 --- a/snuba/query/allocation_policies/concurrent_rate_limit.py +++ b/snuba/query/allocation_policies/concurrent_rate_limit.py @@ -38,7 +38,7 @@ from snuba.query.allocation_policies import MAX_THRESHOLD, NO_SUGGESTION QUOTA_UNIT = "concurrent_queries" -SUGGESTION = "scan less concurrent queries" +SUGGESTION = "A customer is sending too many queries to snuba. The customer may be abusing an API or the queries may be innefficient" import typing diff --git a/snuba/query/allocation_policies/per_referrer.py b/snuba/query/allocation_policies/per_referrer.py index 54c25f625d..223ba0144f 100644 --- a/snuba/query/allocation_policies/per_referrer.py +++ b/snuba/query/allocation_policies/per_referrer.py @@ -27,7 +27,7 @@ _THREADS_THROTTLE_DIVIDER = 2 QUOTA_UNIT = "concurrent_queries" -SUGGESTION = "scan less concurrent queries" +SUGGESTION = "This feature is doing too many concurrent queries. Customers are being affected arbitrarily. Either means the feature is not being appropriately rate limited on the sentry side or that the queries are inefficient" class ReferrerGuardRailPolicy(BaseConcurrentRateLimitAllocationPolicy): diff --git a/snuba/query/mql/context_population.py b/snuba/query/mql/context_population.py index 74561067e8..046cdbe08d 100644 --- a/snuba/query/mql/context_population.py +++ b/snuba/query/mql/context_population.py @@ -92,7 +92,7 @@ def scope_conditions( def rollup_expressions( mql_context: MQLContext, table_name: str | None = None -) -> tuple[Expression, bool, OrderBy | None, SelectedExpression | None]: +) -> tuple[Expression, bool, OrderBy | None, SelectedExpression]: """ This function returns four values based on the rollup field in the MQL context: - granularity_condition: an expression that filters the granularity column based on the granularity in the MQL context @@ -130,26 +130,27 @@ def rollup_expressions( ) with_totals = rollup.with_totals == "True" - selected_time = None orderby = None + + prefix = "" if not table_name else f"{table_name}." + time_expression = FunctionCall( + f"{prefix}time", + "toStartOfInterval", + parameters=( + Column(None, table_name, "timestamp"), + FunctionCall( + None, + "toIntervalSecond", + (Literal(None, rollup.interval),), + ), + Literal(None, "Universal"), + ), + ) + selected_time = SelectedExpression("time", time_expression) + if rollup.interval: # If an interval is specified, then we need to group the time by that interval, # return the time in the select, and order the results by that time. - prefix = "" if not table_name else f"{table_name}." - time_expression = FunctionCall( - f"{prefix}time", - "toStartOfInterval", - parameters=( - Column(None, table_name, "timestamp"), - FunctionCall( - None, - "toIntervalSecond", - (Literal(None, rollup.interval),), - ), - Literal(None, "Universal"), - ), - ) - selected_time = SelectedExpression("time", time_expression) orderby = OrderBy(OrderByDirection.ASC, time_expression) elif rollup.orderby is not None: direction = ( diff --git a/snuba/query/mql/parser_supported_join.py b/snuba/query/mql/parser_supported_join.py index 0d2cfb068d..cf66705823 100644 --- a/snuba/query/mql/parser_supported_join.py +++ b/snuba/query/mql/parser_supported_join.py @@ -1289,7 +1289,6 @@ def populate_query_from_mql_context( assert isinstance(data_source, QueryEntity) entity_data.append((data_source.key, alias)) - selected_time_found = False for entity_key, table_alias in entity_data: time_condition = start_end_time_condition(mql_context, entity_key, table_alias) scope_condition = scope_conditions(mql_context, table_alias) @@ -1305,20 +1304,17 @@ def populate_query_from_mql_context( query.set_totals(with_totals) if orderby: query.set_ast_orderby([orderby]) + query.set_ast_selected_columns( + list(query.get_selected_columns()) + [selected_time] + ) - if selected_time: - selected_time_found = True - query.set_ast_selected_columns( - list(query.get_selected_columns()) + [selected_time] - ) - - groupby = query.get_groupby() - if groupby: - query.set_ast_groupby(list(groupby) + [selected_time.expression]) - else: - query.set_ast_groupby([selected_time.expression]) + groupby = query.get_groupby() + if groupby: + query.set_ast_groupby(list(groupby) + [selected_time.expression]) + else: + query.set_ast_groupby([selected_time.expression]) - if isinstance(query, CompositeQuery) and selected_time_found: + if isinstance(query, CompositeQuery): # If the query is grouping by time, that needs to be added to the JoinClause keys to # ensure we correctly join the subqueries. The column names will be the same for all the # subqueries, so we just need to map all the table aliases. diff --git a/snuba/snuba_migrations/README.md b/snuba/snuba_migrations/README.md new file mode 100644 index 0000000000..67fe51cb86 --- /dev/null +++ b/snuba/snuba_migrations/README.md @@ -0,0 +1,9 @@ +If you don't know anything about snuba migrations see `MIGRATIONS.md` in the root folder + +Each folder in here represents a migration group. see `snuba/migrations/group_loader.py` + +Each migration (ex. `events/0001_events_initial.py`) needs to follow the naming scheme `xxxx_migration_name.py` +where `xxxx` is the 0 padded migration number. Migrations are applied in order of migration number. See `snuba/migrations/group_loader.py` for more info. + +## Migration Auto-Generation +Who wants to write their own migrations by hand? Certainly not me! See `MIGRATIONS.md` to learn how you can have them generated for you. diff --git a/snuba/web/query.py b/snuba/web/query.py index 8ab2907e92..83679da91c 100644 --- a/snuba/web/query.py +++ b/snuba/web/query.py @@ -1,11 +1,14 @@ from __future__ import annotations import logging -from typing import Optional +from typing import Any, Optional -from snuba import environment +import sentry_sdk + +from snuba import environment, settings from snuba.datasets.dataset import Dataset -from snuba.datasets.factory import get_dataset_name +from snuba.datasets.factory import InvalidDatasetError, get_dataset, get_dataset_name +from snuba.datasets.pluggable_dataset import PluggableDataset from snuba.pipeline.query_pipeline import QueryPipelineResult from snuba.pipeline.stages.query_execution import ExecutionStage from snuba.pipeline.stages.query_processing import ( @@ -13,9 +16,12 @@ StorageProcessingStage, ) from snuba.query.exceptions import InvalidQueryException, QueryPlanException +from snuba.query.query_settings import HTTPQuerySettings from snuba.querylog import record_invalid_request, record_query from snuba.querylog.query_metadata import SnubaQueryMetadata, get_request_status from snuba.request import Request +from snuba.request.schema import RequestSchema +from snuba.request.validation import build_request, parse_mql_query, parse_snql_query from snuba.utils.metrics.gauge import Gauge from snuba.utils.metrics.timer import Timer from snuba.utils.metrics.util import with_span @@ -99,6 +105,58 @@ def run_query( return result +def _get_dataset(dataset_name: Optional[str]) -> Dataset: + if dataset_name: + try: + return get_dataset(dataset_name) + except InvalidDatasetError: + return PluggableDataset(name=dataset_name, all_entities=[]) + return PluggableDataset(name=settings.DEFAULT_DATASET_NAME, all_entities=[]) + + +@with_span() +def parse_and_run_query( + body: dict[str, Any], + timer: Timer, + is_mql: bool = False, + dataset_name: Optional[str] = None, + referrer: Optional[str] = None, +) -> tuple[Request, QueryResult]: + """Top level entrypoint from a raw query body to a query result + + Example: + >>> request, result = parse_and_run_query( + >>> body={ + >>> "query":"MATCH (events) SELECT event_id, group_id, project_id, timestamp WHERE timestamp >= toDateTime('2024-07-17T21:04:34') AND timestamp < toDateTime('2024-07-17T21:10:34')", + >>> "tenant_ids":{"organization_id":319976,"referrer":"Group.get_helpful"} + >>> }, + >>> timer=Timer("parse_and_run_query"), + >>> is_mql=False, + >>> ) + + Optional args: + dataset_name (str): used mainly for observability purposes + referrer (str): legacy param, you probably don't need to provide this. It should be in the tenant_ids of the body + """ + + with sentry_sdk.start_span(description="build_schema", op="validate"): + schema = RequestSchema.build(HTTPQuerySettings, is_mql) + + # NOTE(Volo): dataset is not necessary for queries because storages can be queried directly + # certain parts of the code still use it though, many metrics used by snuba are still tagged by + # "dataset" even though datasets don't define very much. The user is able to provide a dataset_name + # for that reason. Otherwise they are not useful. + # EXCEPT FOR DISCOVER which is a whole can of worms, but that's the one place where the dataset is useful for something + dataset = _get_dataset(dataset_name) + referrer = referrer or "" + parse_function = parse_snql_query if not is_mql else parse_mql_query + request = build_request( + body, parse_function, HTTPQuerySettings, schema, dataset, timer, referrer + ) + + return request, run_query(dataset, request, timer) + + def _set_query_final(request: Request, extra: QueryExtraData) -> None: if "final" in extra["stats"]: request.query.set_final(extra["stats"]["final"]) diff --git a/snuba/web/views.py b/snuba/web/views.py index c89eaa241f..dd658e483b 100644 --- a/snuba/web/views.py +++ b/snuba/web/views.py @@ -40,18 +40,15 @@ from snuba.datasets.entities.factory import get_entity_name from snuba.datasets.entity import Entity from snuba.datasets.entity_subscriptions.validators import InvalidSubscriptionError -from snuba.datasets.factory import InvalidDatasetError, get_dataset, get_dataset_name +from snuba.datasets.factory import InvalidDatasetError, get_dataset_name from snuba.datasets.schemas.tables import TableSchema from snuba.datasets.storage import StorageNotAvailable from snuba.query.allocation_policies import AllocationPolicyViolations from snuba.query.exceptions import InvalidQueryException, QueryPlanException from snuba.query.query_settings import HTTPQuerySettings from snuba.redis import all_redis_clients -from snuba.request import Request as SnubaRequest from snuba.request.exceptions import InvalidJsonRequestException, JsonDecodeException from snuba.request.schema import RequestSchema -from snuba.request.validation import build_request, parse_mql_query, parse_snql_query -from snuba.state import get_float_config from snuba.state.rate_limit import RateLimitExceeded from snuba.subscriptions.codecs import SubscriptionDataCodec from snuba.subscriptions.data import PartitionId @@ -68,7 +65,7 @@ from snuba.web import QueryException, QueryTooLongException from snuba.web.constants import get_http_status_for_clickhouse_error from snuba.web.converters import DatasetConverter, EntityConverter -from snuba.web.query import run_query +from snuba.web.query import parse_and_run_query from snuba.writer import BatchWriterEncoderWrapper, WriterTableRow logger = logging.getLogger("snuba.api") @@ -237,14 +234,16 @@ def parse_request_body(http_request: Request) -> Dict[str, Any]: raise JsonDecodeException(str(error)) from error -def _trace_transaction(dataset: Dataset) -> None: +def _trace_transaction(dataset_name: str) -> None: with sentry_sdk.configure_scope() as scope: if scope.span: - scope.span.set_tag("dataset", get_dataset_name(dataset)) + scope.span.set_tag("dataset", dataset_name) scope.span.set_tag("referrer", http_request.referrer) if scope.transaction: - scope.transaction = f"{scope.transaction.name}__{get_dataset_name(dataset)}__{http_request.referrer}" + scope.transaction = ( + f"{scope.transaction.name}__{dataset_name}__{http_request.referrer}" + ) @application.route("/query", methods=["GET", "POST"]) @@ -254,9 +253,9 @@ def unqualified_query_view(*, timer: Timer) -> Union[Response, str, WerkzeugResp return redirect(f"/{settings.DEFAULT_DATASET_NAME}/query", code=302) elif http_request.method == "POST": body = parse_request_body(http_request) - dataset = get_dataset(body.pop("dataset", settings.DEFAULT_DATASET_NAME)) - _trace_transaction(dataset) - return dataset_query(dataset, body, timer) + dataset_name = str(body.pop("dataset", settings.DEFAULT_DATASET_NAME)) + _trace_transaction(dataset_name) + return dataset_query(dataset_name, body, timer) else: assert False, "unexpected fallthrough" @@ -272,8 +271,9 @@ def snql_dataset_query_view(*, dataset: Dataset, timer: Timer) -> Union[Response ) elif http_request.method == "POST": body = parse_request_body(http_request) - _trace_transaction(dataset) - return dataset_query(dataset, body, timer) + dataset_name = get_dataset_name(dataset) + _trace_transaction(dataset_name) + return dataset_query(dataset_name, body, timer) else: assert False, "unexpected fallthrough" @@ -282,9 +282,10 @@ def snql_dataset_query_view(*, dataset: Dataset, timer: Timer) -> Union[Response @util.time_request("query", {"mql": "true"}) def mql_dataset_query_view(*, dataset: Dataset, timer: Timer) -> Union[Response, str]: if http_request.method == "POST": + dataset_name = get_dataset_name(dataset) body = parse_request_body(http_request) - _trace_transaction(dataset) - return dataset_query(dataset, body, timer, is_mql=True) + _trace_transaction(dataset_name) + return dataset_query(dataset_name, body, timer, is_mql=True) else: assert False, "unexpected fallthrough" @@ -332,19 +333,9 @@ def dump_payload(payload: MutableMapping[str, Any]) -> str: return json.dumps(sanitized_payload, default=str) -def _get_and_log_referrer(request: SnubaRequest, body: Dict[str, Any]) -> None: - metrics.increment( - "just_referrer_count", tags={"referrer": request.attribution_info.referrer} - ) - if random.random() < get_float_config("log-referrer-sample-rate", 0.001): # type: ignore - logger.info(f"Received referrer: {request.attribution_info.referrer}") - if request.attribution_info.referrer == "": - logger.info(f"Received unknown referrer from request: {request}, {body}") - - @with_span() def dataset_query( - dataset: Dataset, body: Dict[str, Any], timer: Timer, is_mql: bool = False + dataset_name: str, body: Dict[str, Any], timer: Timer, is_mql: bool = False ) -> Response: assert http_request.method == "POST" referrer = http_request.referrer or "" # mypy @@ -356,22 +347,14 @@ def dataset_query( # is detected, and then log everything if get_shutdown() or random.random() < 0.05: if get_shutdown() or check_down_file_exists(): - tags = {"dataset": get_dataset_name(dataset)} + tags = {"dataset": dataset_name} metrics.increment("post.shutdown.query", tags=tags) diff = time.time() - (shutdown_time() or 0.0) # this should never be None metrics.timing("post.shutdown.query.delay", diff, tags=tags) - - with sentry_sdk.start_span(description="build_schema", op="validate"): - schema = RequestSchema.build(HTTPQuerySettings, is_mql) - - parse_function = parse_snql_query if not is_mql else parse_mql_query - request = build_request( - body, parse_function, HTTPQuerySettings, schema, dataset, timer, referrer - ) - _get_and_log_referrer(request, body) - try: - result = run_query(dataset, request, timer) + request, result = parse_and_run_query( + body, timer, is_mql, dataset_name, referrer + ) assert result.extra["stats"] except InvalidQueryException as exception: details: Mapping[str, Any] diff --git a/tests/admin/test_api.py b/tests/admin/test_api.py index ca5bbe31e1..bd15c1d77f 100644 --- a/tests/admin/test_api.py +++ b/tests/admin/test_api.py @@ -331,16 +331,6 @@ def test_get_snuba_datasets(admin_api: FlaskClient) -> None: assert set(data) == set(get_enabled_dataset_names()) -@pytest.mark.redis_db -def test_snuba_debug_invalid_dataset(admin_api: FlaskClient) -> None: - response = admin_api.post( - "/snuba_debug", data=json.dumps({"dataset": "", "query": ""}) - ) - assert response.status_code == 400 - data = json.loads(response.data) - assert data["error"]["message"] == "dataset '' does not exist" - - @pytest.mark.redis_db def test_snuba_debug_invalid_query(admin_api: FlaskClient) -> None: response = admin_api.post( diff --git a/tests/datasets/test_transaction_processor.py b/tests/datasets/test_transaction_processor.py index 150e92284f..9ae424e9e1 100644 --- a/tests/datasets/test_transaction_processor.py +++ b/tests/datasets/test_transaction_processor.py @@ -45,6 +45,7 @@ class TransactionEvent: app_start_type: str = "warm" has_app_ctx: bool = True profile_id: Optional[str] = None + profiler_id: Optional[str] = None replay_id: Optional[str] = None received: Optional[float] = None @@ -55,9 +56,18 @@ def get_app_context(self) -> Optional[Mapping[str, str]]: return None def get_profile_context(self) -> Optional[Mapping[str, str]]: - if self.profile_id is None: - return None - return {"profile_id": self.profile_id} + context = {} + + if self.profile_id is not None: + context["profile_id"] = self.profile_id + + if self.profiler_id is not None: + context["profiler_id"] = self.profiler_id + + if context: + return context + + return None def get_replay_context(self) -> Optional[Mapping[str, str]]: if self.replay_id is None: @@ -280,6 +290,8 @@ def build_result(self, meta: KafkaMessageMetadata) -> Mapping[str, Any]: if self.profile_id is not None: ret["profile_id"] = str(uuid.UUID(self.profile_id)) + if self.profiler_id is not None: + ret["profiler_id"] = str(uuid.UUID(self.profiler_id)) if self.replay_id is not None: ret["replay_id"] = str(uuid.UUID(self.replay_id)) ret["tags.key"].append("replayId") @@ -332,6 +344,7 @@ def __get_transaction_event(self) -> TransactionEvent: }, transaction_source="url", profile_id="046852d24483455c8c44f0c8fbf496f9", + profiler_id="822301ff8bdb4daca920ddf2f993b1ff", replay_id="d2731f8ed8934c6fa5253e450915aa12", ) diff --git a/tests/query/parser/test_formula_mql_query.py b/tests/query/parser/test_formula_mql_query.py index 852f327d93..0a95dc594b 100644 --- a/tests/query/parser/test_formula_mql_query.py +++ b/tests/query/parser/test_formula_mql_query.py @@ -1,6 +1,7 @@ from __future__ import annotations import re +from copy import deepcopy from datetime import datetime import pytest @@ -36,14 +37,18 @@ ) -def time_expression(table_alias: str | None = None) -> FunctionCall: +def time_expression( + table_alias: str | None = None, to_interval_seconds: int | None = 60 +) -> FunctionCall: alias_prefix = f"{table_alias}." if table_alias else "" return FunctionCall( f"_snuba_{alias_prefix}time", "toStartOfInterval", ( Column("_snuba_timestamp", table_alias, "timestamp"), - FunctionCall(None, "toIntervalSecond", (Literal(None, 60),)), + FunctionCall( + None, "toIntervalSecond", (Literal(None, to_interval_seconds),) + ), Literal(None, "Universal"), ), ) @@ -1247,3 +1252,171 @@ def test_curried_aggregate_formula() -> None: query = parse_mql_query_new(str(query_body), mql_context, generic_metrics) eq, reason = query.equals(expected) assert eq, reason + + +def test_formula_with_totals() -> None: + mql_context_new = deepcopy(mql_context) + mql_context_new["rollup"]["with_totals"] = "True" + mql_context_new["rollup"]["interval"] = None + query_body = "sum(`d:transactions/duration@millisecond`){status_code:200} / sum(`d:transactions/duration@millisecond`)" + + expected_selected = SelectedExpression( + "aggregate_value", + divide( + FunctionCall( + None, + "sum", + (Column("_snuba_value", "d0", "value"),), + ), + FunctionCall( + None, + "sum", + (Column("_snuba_value", "d1", "value"),), + ), + "_snuba_aggregate_value", + ), + ) + + join_clause = JoinClause( + left_node=IndividualNode( + alias="d1", + data_source=from_distributions, + ), + right_node=IndividualNode( + alias="d0", + data_source=from_distributions, + ), + keys=[ + JoinCondition( + left=JoinConditionExpression(table_alias="d1", column="d1.time"), + right=JoinConditionExpression(table_alias="d0", column="d0.time"), + ) + ], + join_type=JoinType.INNER, + join_modifier=None, + ) + + tag_condition = binary_condition( + "equals", tag_column("status_code", "d0"), Literal(None, "200") + ) + metric_condition1 = metric_id_condition(123456, "d0") + metric_condition2 = metric_id_condition(123456, "d1") + formula_condition = combine_and_conditions( + condition("d0") + + condition("d1") + + [tag_condition, metric_condition1, metric_condition2] + ) + + expected = CompositeQuery( + from_clause=join_clause, + selected_columns=[ + expected_selected, + SelectedExpression( + "time", + time_expression("d1", None), + ), + SelectedExpression( + "time", + time_expression("d0", None), + ), + ], + groupby=[time_expression("d1", None), time_expression("d0", None)], + condition=formula_condition, + order_by=[], + limit=1000, + offset=0, + totals=True, + ) + + generic_metrics = get_dataset( + "generic_metrics", + ) + query = parse_mql_query_new(str(query_body), mql_context_new, generic_metrics) + eq, reason = query.equals(expected) + assert eq, reason + + +def test_formula_with_totals_and_interval() -> None: + mql_context_new = deepcopy(mql_context) + mql_context_new["rollup"]["with_totals"] = "True" + query_body = "sum(`d:transactions/duration@millisecond`){status_code:200} / sum(`d:transactions/duration@millisecond`)" + + expected_selected = SelectedExpression( + "aggregate_value", + divide( + FunctionCall( + None, + "sum", + (Column("_snuba_value", "d0", "value"),), + ), + FunctionCall( + None, + "sum", + (Column("_snuba_value", "d1", "value"),), + ), + "_snuba_aggregate_value", + ), + ) + + join_clause = JoinClause( + left_node=IndividualNode( + alias="d1", + data_source=from_distributions, + ), + right_node=IndividualNode( + alias="d0", + data_source=from_distributions, + ), + keys=[ + JoinCondition( + left=JoinConditionExpression(table_alias="d1", column="d1.time"), + right=JoinConditionExpression(table_alias="d0", column="d0.time"), + ) + ], + join_type=JoinType.INNER, + join_modifier=None, + ) + + tag_condition = binary_condition( + "equals", tag_column("status_code", "d0"), Literal(None, "200") + ) + metric_condition1 = metric_id_condition(123456, "d0") + metric_condition2 = metric_id_condition(123456, "d1") + formula_condition = combine_and_conditions( + condition("d0") + + condition("d1") + + [tag_condition, metric_condition1, metric_condition2] + ) + + expected = CompositeQuery( + from_clause=join_clause, + selected_columns=[ + expected_selected, + SelectedExpression( + "time", + time_expression("d1"), + ), + SelectedExpression( + "time", + time_expression("d0"), + ), + ], + groupby=[time_expression("d1"), time_expression("d0")], + condition=formula_condition, + order_by=[ + OrderBy( + direction=OrderByDirection.ASC, + expression=time_expression("d0"), + ), + ], + limit=1000, + offset=0, + totals=True, + ) + + generic_metrics = get_dataset( + "generic_metrics", + ) + query = parse_mql_query_new(str(query_body), mql_context_new, generic_metrics) + eq, reason = query.equals(expected) + assert eq, reason diff --git a/tests/query/parser/test_parser.py b/tests/query/parser/test_parser.py index eccbdb46a3..70d205330e 100644 --- a/tests/query/parser/test_parser.py +++ b/tests/query/parser/test_parser.py @@ -29,15 +29,20 @@ EntityKey.GENERIC_METRICS_DISTRIBUTIONS, get_entity(EntityKey.GENERIC_METRICS_DISTRIBUTIONS).get_data_model(), ) -time_expression = FunctionCall( - "_snuba_time", - "toStartOfInterval", - ( - Column("_snuba_timestamp", None, "timestamp"), - FunctionCall(None, "toIntervalSecond", (Literal(None, 60),)), - Literal(None, "Universal"), - ), -) + + +def time_expression(to_interval_seconds: int | None = 60) -> FunctionCall: + return FunctionCall( + "_snuba_time", + "toStartOfInterval", + ( + Column("_snuba_timestamp", None, "timestamp"), + FunctionCall( + None, "toIntervalSecond", (Literal(None, to_interval_seconds),) + ), + Literal(None, "Universal"), + ), + ) def test_mql() -> None: @@ -77,8 +82,12 @@ def test_mql() -> None: (Column("_snuba_value", None, "value"),), ), ), + SelectedExpression( + "time", + time_expression(None), + ), ], - groupby=[], + groupby=[time_expression(None)], condition=and_cond( and_cond( and_cond( @@ -183,8 +192,12 @@ def test_mql_wildcards() -> None: (Column("_snuba_value", None, "value"),), ), ), + SelectedExpression( + "time", + time_expression(None), + ), ], - groupby=[], + groupby=[time_expression(None)], condition=and_cond( and_cond( and_cond( @@ -287,8 +300,12 @@ def test_mql_negated_wildcards() -> None: (Column("_snuba_value", None, "value"),), ), ), + SelectedExpression( + "time", + time_expression(None), + ), ], - groupby=[], + groupby=[time_expression(None)], condition=and_cond( and_cond( and_cond( diff --git a/tests/test_metrics_mql_api.py b/tests/test_metrics_mql_api.py index 7bbdfd94c5..fcf69e9833 100644 --- a/tests/test_metrics_mql_api.py +++ b/tests/test_metrics_mql_api.py @@ -683,6 +683,117 @@ def test_complex_formula(self) -> None: data = json.loads(response.data) assert len(data["data"]) == 180, data + def test_formula_with_totals(self) -> None: + query = MetricsQuery( + query=Formula( + ArithmeticOperator.PLUS.value, + [ + Timeseries( + metric=Metric( + "transaction.duration", + DISTRIBUTIONS_MRI, + DISTRIBUTIONS.metric_id, + ), + aggregate="avg", + ), + Timeseries( + metric=Metric( + "transaction.duration", + DISTRIBUTIONS_MRI, + DISTRIBUTIONS.metric_id, + ), + aggregate="avg", + ), + ], + ), + start=self.start_time, + end=self.end_time, + rollup=Rollup(interval=None, totals=True, orderby=None, granularity=60), + scope=MetricsScope( + org_ids=[self.org_id], + project_ids=self.project_ids, + use_case_id=USE_CASE_ID, + ), + indexer_mappings={ + "transaction.duration": DISTRIBUTIONS_MRI, + DISTRIBUTIONS_MRI: DISTRIBUTIONS.metric_id, + "status_code": resolve_str("status_code"), + }, + ) + + response = self.app.post( + self.mql_route, + data=Request( + dataset=DATASET, + app_id="test", + query=query, + flags=Flags(debug=True), + tenant_ids={"referrer": "tests", "organization_id": self.org_id}, + ).serialize_mql(), + ) + + assert response.status_code == 200, response.data + data = json.loads(response.data) + assert ( + data["totals"]["aggregate_value"] == 4.0 + ) # Should be more than the number of data points + + def test_formula_with_totals_and_interval(self) -> None: + query = MetricsQuery( + query=Formula( + ArithmeticOperator.PLUS.value, + [ + Timeseries( + metric=Metric( + "transaction.duration", + DISTRIBUTIONS_MRI, + DISTRIBUTIONS.metric_id, + ), + aggregate="avg", + ), + Timeseries( + metric=Metric( + "transaction.duration", + DISTRIBUTIONS_MRI, + DISTRIBUTIONS.metric_id, + ), + aggregate="avg", + ), + ], + ), + start=self.start_time, + end=self.end_time, + rollup=Rollup(interval=60, totals=True, orderby=None, granularity=60), + scope=MetricsScope( + org_ids=[self.org_id], + project_ids=self.project_ids, + use_case_id=USE_CASE_ID, + ), + indexer_mappings={ + "transaction.duration": DISTRIBUTIONS_MRI, + DISTRIBUTIONS_MRI: DISTRIBUTIONS.metric_id, + "status_code": resolve_str("status_code"), + }, + ) + + response = self.app.post( + self.mql_route, + data=Request( + dataset=DATASET, + app_id="test", + query=query, + flags=Flags(debug=True), + tenant_ids={"referrer": "tests", "organization_id": self.org_id}, + ).serialize_mql(), + ) + + assert response.status_code == 200, response.data + data = json.loads(response.data) + assert len(data["data"]) == 180, data + assert ( + data["totals"]["aggregate_value"] == 4.0 + ) # Should be more than the number of data points + def test_multi_entity_formula(self) -> None: query = MetricsQuery( query=Formula( diff --git a/tests/web/test_db_query.py b/tests/web/test_db_query.py index 0f3f0e6598..8bb7859b11 100644 --- a/tests/web/test_db_query.py +++ b/tests/web/test_db_query.py @@ -312,7 +312,7 @@ def test_db_query_success() -> None: "policy": "BytesScannedRejectingPolicy", "quota_used": 1560000000000, "quota_unit": "bytes", - "suggestion": "scan less bytes", + "suggestion": "The feature, organization/project is scanning too many bytes, this usually means they are abusing that API", "throttle_threshold": 1280000000000, }, }, @@ -360,7 +360,7 @@ def test_db_query_success() -> None: "rejection_threshold": 2560000000000, "quota_used": 1560000000000, "quota_unit": "bytes", - "suggestion": "scan less bytes", + "suggestion": "The feature, organization/project is scanning too many bytes, this usually means they are abusing that API", }, "CrossOrgQueryAllocationPolicy": { "can_run": True, @@ -385,7 +385,7 @@ def test_db_query_success() -> None: "rejection_threshold": MAX_THRESHOLD, "quota_used": 0, "quota_unit": "bytes", - "suggestion": "scan less bytes", + "suggestion": "The feature, organization/project is scanning too many bytes, this usually means they are abusing that API", }, }, } diff --git a/tests/web/test_parse_and_run_query.py b/tests/web/test_parse_and_run_query.py new file mode 100644 index 0000000000..da3d50e9ad --- /dev/null +++ b/tests/web/test_parse_and_run_query.py @@ -0,0 +1,65 @@ +import pytest + +from snuba.utils.metrics.timer import Timer +from snuba.web.query import parse_and_run_query + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +def test_basic_snql() -> None: + request, result = parse_and_run_query( + body={ + "query": "MATCH (events) SELECT event_id, group_id, project_id, timestamp WHERE timestamp >= toDateTime('2024-07-17T21:04:34') AND timestamp < toDateTime('2024-07-17T21:10:34') AND project_id = 1", + "tenant_ids": {"organization_id": 319976, "referrer": "Group.get_helpful"}, + }, + timer=Timer("parse_and_run_query"), + is_mql=False, + ) + assert result.result["meta"] == [ + {"name": "event_id", "type": "String"}, + {"name": "group_id", "type": "UInt64"}, + {"name": "project_id", "type": "UInt64"}, + {"name": "timestamp", "type": "DateTime"}, + ] + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +def test_basic_mql() -> None: + # body = {'debug': True, 'query': 'sum(c:transactions/count_per_root_project@none){transaction:"t1"} by (status_code)', 'dataset': 'generic_metrics', 'app_id': 'test', 'tenant_ids': {'referrer': 'tests', 'organization_id': 101}, 'parent_api': '', 'mql_context': {'start': '2024-07-16T09:15:00+00:00', 'end': '2024-07-16T15:15:00+00:00', 'rollup': {'orderby': None, 'granularity': 60, 'interval': 60, 'with_totals': None}, 'scope': {'org_ids': [101], 'project_ids': [1, 2], 'use_case_id': 'performance'}, 'indexer_mappings': {'transaction.duration': 'c:transactions/count_per_root_project@none', 'c:transactions/count_per_root_project@none': 1067, 'transaction': 65546, 'status_code': 9223372036854776010}, 'limit': None, 'offset': None}} + body = { + "query": 'sum(c:transactions/count_per_root_project@none){transaction:"t1"} by (status_code)', + "dataset": "generic_metrics", + "app_id": "test", + "tenant_ids": {"referrer": "tests", "organization_id": 101}, + "parent_api": "", + "mql_context": { + "start": "2024-07-16T09:15:00+00:00", + "end": "2024-07-16T15:15:00+00:00", + "rollup": { + "orderby": None, + "granularity": 60, + "interval": 60, + "with_totals": None, + }, + "scope": { + "org_ids": [101], + "project_ids": [1, 2], + "use_case_id": "performance", + }, + "indexer_mappings": { + "transaction.duration": "c:transactions/count_per_root_project@none", + "c:transactions/count_per_root_project@none": 1067, + "transaction": 65546, + "status_code": 9223372036854776010, + }, + "limit": None, + "offset": None, + }, + } + request, result = parse_and_run_query( + body=body, + timer=Timer("parse_and_run_query"), + is_mql=True, + dataset_name="generic_metrics", + )