diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d483d1b07..9257781f3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,36 @@ # Changelog +## 24.3.0 + +### Various fixes & improvements + +- Unrevert: feat: Remove query splitters from the API (#5581) by @evanh +- feat: Add use_case_id index to generic metrics (#5655) by @evanh +- ref(ci): Remove deleted test file (#5656) by @evanh +- fix vscode debugger (#5652) by @kylemumma +- chore: Upgrade snuba-sdk to 2.0.31 (#5647) by @iambriccardo +- fix(gocd): put snuba cmd into $SNUBA_CMD (#5654) by @MeredithAnya +- enable canary health check (#5649) by @enochtangg +- Revert "fix(CapMan): Allocation Policies causing potentially timeout errors on ST (#4403)" (703042e1) by @getsentry-bot +- fix(gocd): add SNUBA_CMD_TYPE (#5648) by @MeredithAnya +- Allows empty `trace_id` (#5637) by @xurui-c +- fix: Fix bump version for rust (#5643) by @lynnagara +- feat(generic-metrics): Add metrics around encoding format type in processor (#5627) by @ayirr7 +- feat: filter by metric_id in select logical query optimizer (#5610) by @kylemumma +- fix(gocd): fix unbound variable (#5641) by @MeredithAnya +- ref: bump sentry-kafka-schemas to 0.1.60 (#5642) by @getsentry-bot +- add canary health check to gocd pipeline (#5638) by @enochtangg +- ref(codecov) Try out the failed test feature in Codecov (#5635) by @evanh +- feat(spans): Enable spans storage in ST and self-hosted (#5629) by @phacops +- fix: Fix a bug in HexIntColumnProcessor that skipped array conditions (#5640) by @evanh +- ref(gocd): use shared script query-fetcher (#5639) by @MeredithAnya +- ref(gocd): add comparer pipeline, consolidate script? (#5636) by @MeredithAnya +- feat(spans): Set the migration group as complete to run migrations everywhere (#5634) by @phacops +- feat(admin): Absolute imports in snuba-admin (#5630) by @volokluev +- the default value of trace_id will be a randomly generated uuid insteā€¦ (#5628) by @xurui-c + +_Plus 72 more_ + ## 24.2.0 ### Various fixes & improvements diff --git a/docs/source/conf.py b/docs/source/conf.py index 27618ad943..eea908078d 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -7,7 +7,7 @@ copyright = "2021, Sentry Team and Contributors" author = "Sentry Team and Contributors" -release = "24.3.0.dev0" +release = "24.4.0.dev0" # -- General configuration --------------------------------------------------- diff --git a/setup.py b/setup.py index e44606b460..3825fd5528 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import find_packages, setup -VERSION = "24.3.0.dev0" +VERSION = "24.4.0.dev0" def get_requirements() -> Sequence[str]: diff --git a/snuba/admin/iam_policy/iam_policy.json b/snuba/admin/iam_policy/iam_policy.json index 3841804209..33568ef0a7 100644 --- a/snuba/admin/iam_policy/iam_policy.json +++ b/snuba/admin/iam_policy/iam_policy.json @@ -34,8 +34,10 @@ }, { "members": [ - "group:team-starfish@sentry.io", - "group:team-ingest@sentry.io" + "group:team-ingest@sentry.io", + "group:team-visibility@sentry.io", + "group:team-performance@sentry.io", + "group:team-eng-managers@sentry.io" ], "role": "roles/CardinalityAnalyzer" }, diff --git a/snuba/admin/views.py b/snuba/admin/views.py index b891c04baa..c469b535ea 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -80,6 +80,8 @@ runner = Runner() audit_log = AuditLog() +ORG_ID = 1 + @application.errorhandler(UnauthorizedException) def handle_invalid_json(exception: UnauthorizedException) -> Response: @@ -237,9 +239,11 @@ def do_action() -> None: if not dry_run: audit_log.record( user or "", - AuditLogAction.RAN_MIGRATION_STARTED - if action == "run" - else AuditLogAction.REVERSED_MIGRATION_STARTED, + ( + AuditLogAction.RAN_MIGRATION_STARTED + if action == "run" + else AuditLogAction.REVERSED_MIGRATION_STARTED + ), {"migration": str(migration_key), "force": force, "fake": fake}, ) check_for_inactive_replicas( @@ -256,9 +260,11 @@ def do_action() -> None: if not dry_run: audit_log.record( user or "", - AuditLogAction.RAN_MIGRATION_COMPLETED - if action == "run" - else AuditLogAction.REVERSED_MIGRATION_COMPLETED, + ( + AuditLogAction.RAN_MIGRATION_COMPLETED + if action == "run" + else AuditLogAction.REVERSED_MIGRATION_COMPLETED + ), {"migration": str(migration_key), "force": force, "fake": fake}, notify=True, ) @@ -266,9 +272,11 @@ def do_action() -> None: def notify_error() -> None: audit_log.record( user or "", - AuditLogAction.RAN_MIGRATION_FAILED - if action == "run" - else AuditLogAction.REVERSED_MIGRATION_FAILED, + ( + AuditLogAction.RAN_MIGRATION_FAILED + if action == "run" + else AuditLogAction.REVERSED_MIGRATION_FAILED + ), {"migration": str(migration_key), "force": force, "fake": fake}, notify=True, ) @@ -667,7 +675,7 @@ def config(config_key: str) -> Response: 400, {"Content-Type": "application/json"}, ) - except (state.MismatchedTypeException): + except state.MismatchedTypeException: return Response( json.dumps({"error": "Mismatched type"}), 400, @@ -997,7 +1005,7 @@ def dlq_replay() -> Response: @check_tool_perms(tools=[AdminTools.PRODUCTION_QUERIES]) def production_snql_query() -> Response: body = json.loads(request.data) - body["tenant_ids"] = {"referrer": request.referrer} + body["tenant_ids"] = {"referrer": request.referrer, "organization_id": ORG_ID} try: ret = run_snql_query(body, g.user.email) return ret diff --git a/snuba/cli/subscriptions_executor.py b/snuba/cli/subscriptions_executor.py index 9507786b41..65e286a43c 100644 --- a/snuba/cli/subscriptions_executor.py +++ b/snuba/cli/subscriptions_executor.py @@ -110,6 +110,7 @@ def subscriptions_executor( metrics_tags = { "dataset": dataset_name, + "entity": entity_names[0], } if slice_id: diff --git a/snuba/pipeline/query_pipeline.py b/snuba/pipeline/query_pipeline.py index 5221fd3ccb..f57114608d 100644 --- a/snuba/pipeline/query_pipeline.py +++ b/snuba/pipeline/query_pipeline.py @@ -1,5 +1,8 @@ +from __future__ import annotations + from abc import ABC, abstractmethod -from typing import Generic, Sequence, TypeVar, Union +from dataclasses import dataclass +from typing import Generic, Optional, Sequence, TypeVar, Union from snuba.datasets.plans.query_plan import ( ClickhouseQueryPlan, @@ -12,6 +15,9 @@ from snuba.web import QueryResult TPlan = TypeVar("TPlan", bound=Union[ClickhouseQueryPlan, CompositeQueryPlan]) +T = TypeVar("T") +Tin = TypeVar("Tin") +Tout = TypeVar("Tout") class QueryPlanner(ABC, Generic[TPlan]): @@ -76,3 +82,61 @@ def build_planner( self, query: Query, settings: QuerySettings ) -> QueryPlanner[ClickhouseQueryPlan]: raise NotImplementedError + + +class QueryPipelineStage(Generic[Tin, Tout]): + """ + This class represents a single stage in the snuba query execution pipeline. + The purpose of this class is to provide an organized and transparent interface to + execute specific processing steps on the query with clearly defined inputs and outputs. + These stages are designed to be composed and/or swapped among each other to form a + a flexible query pipeline. + + Some examples of a query pipeline stage may include: + * Execute all entity query processors defined on the entity yaml + * Apply query transformation from logical representation to Clickhouse representation + * Execute all storage processors defined on the storage yaml + * Run query execution + * Query reporting + + To create a Query Pipeline Stage, the main components to specify are: + an input type, an output type, and a execution function which returns the output wrapped with QueryPipelineResult. + ============================================== + >>> class MyQueryPipelineStage(QueryPipelineStage[LogicalQuery, LogicalQuery]): + >>> def _execute(self, input: QueryPipelineResult[LogicalQuery]) -> QueryPipelineResult[LogicalQuery]: + >>> try: + >>> result = my_complex_processing_function(input.data) + >>> return QueryPipelineResult(result, None) + >>> except Exception as e: + >>> return QueryPipelineResult(None, e) + """ + + @abstractmethod + def _execute(self, input: QueryPipelineResult[Tin]) -> QueryPipelineResult[Tout]: + raise NotImplementedError + + def execute(self, input: QueryPipelineResult[Tin]) -> QueryPipelineResult[Tout]: + if input.error: + # Forward the error to next stage of pipeline + return QueryPipelineResult(None, input.error) + return self._execute(input) + + +class InvalidQueryPipelineResult(Exception): + pass + + +@dataclass +class QueryPipelineResult(ABC, Generic[T]): + """ + A container to represent the result of a query pipeline stage. + """ + + data: Optional[T] + error: Optional[Exception] + + def __post_init__(self) -> None: + if self.data is None and self.error is None: + raise InvalidQueryPipelineResult( + "QueryPipelineResult must have either data or error set" + ) diff --git a/snuba/query/processors/logical/filter_in_select_optimizer.py b/snuba/query/processors/logical/filter_in_select_optimizer.py index 459c940026..02794742eb 100644 --- a/snuba/query/processors/logical/filter_in_select_optimizer.py +++ b/snuba/query/processors/logical/filter_in_select_optimizer.py @@ -1,3 +1,6 @@ +import logging + +from snuba import environment from snuba.query.composite import CompositeQuery from snuba.query.conditions import binary_condition from snuba.query.data_source.simple import Entity as QueryEntity @@ -11,6 +14,7 @@ ) from snuba.query.logical import Query as LogicalQuery from snuba.state import get_int_config +from snuba.utils.metrics.wrapper import MetricsWrapper """ Domain maps from a property to the specific values that are being filtered for. Ex: @@ -24,6 +28,10 @@ """ Domain = dict[Column | SubscriptableReference, set[Literal]] +metrics = MetricsWrapper(environment.metrics, "api") + +logger = logging.getLogger(__name__) + class FilterInSelectOptimizer: """ @@ -48,6 +56,9 @@ def process_mql_query( try: domain = self.get_domain_of_mql_query(query) except ValueError: + logger.warning( + "Failed getting domain", exc_info=True, extra={"query": query} + ) domain = {} if domain: @@ -73,6 +84,7 @@ def process_mql_query( ) assert domain_filter is not None query.add_condition_to_ast(domain_filter) + metrics.increment("kyles_optimizer_optimized") def get_domain_of_mql_query( self, query: LogicalQuery | CompositeQuery[QueryEntity] diff --git a/snuba/settings/settings_test.py b/snuba/settings/settings_test.py index 01673da983..c59bb3b37e 100644 --- a/snuba/settings/settings_test.py +++ b/snuba/settings/settings_test.py @@ -44,7 +44,7 @@ OPTIMIZE_PARALLEL_MAX_JITTER_MINUTES = 0 -ADMIN_ALLOWED_PROD_PROJECTS = [1] +ADMIN_ALLOWED_PROD_PROJECTS = [1, 11276] REDIS_CLUSTERS = { key: { diff --git a/tests/admin/test_api.py b/tests/admin/test_api.py index aac9582f5b..d465e9d7e2 100644 --- a/tests/admin/test_api.py +++ b/tests/admin/test_api.py @@ -599,6 +599,26 @@ def test_prod_snql_query_valid_query(admin_api: FlaskClient) -> None: assert "data" in data +@pytest.mark.redis_db +@pytest.mark.clickhouse_db +def test_prod_snql_query_multiple_allowed_projects(admin_api: FlaskClient) -> None: + snql_query = """ + MATCH (transactions) + SELECT title + WHERE project_id IN array(1, 11276) + AND finish_ts >= toDateTime('2023-01-01 00:00:00') + AND finish_ts < toDateTime('2023-02-01 00:00:00') + """ + response = admin_api.post( + "/production_snql_query", + data=json.dumps({"dataset": "transactions", "query": snql_query}), + headers={"Referer": "https://snuba-admin.getsentry.net/"}, + ) + assert response.status_code == 200 + data = json.loads(response.data) + assert "data" in data + + @pytest.mark.redis_db @pytest.mark.clickhouse_db def test_prod_snql_query_invalid_project_query(admin_api: FlaskClient) -> None: diff --git a/tests/pipeline/test_pipeline_stage.py b/tests/pipeline/test_pipeline_stage.py new file mode 100644 index 0000000000..2b9181594f --- /dev/null +++ b/tests/pipeline/test_pipeline_stage.py @@ -0,0 +1,37 @@ +from typing import Optional + +import pytest + +from snuba.pipeline.query_pipeline import ( + InvalidQueryPipelineResult, + QueryPipelineResult, + QueryPipelineStage, +) + + +class TestQueryPipelineStage(QueryPipelineStage[int, int]): + def _execute(self, input: QueryPipelineResult[int]) -> QueryPipelineResult[int]: + try: + result = check_input_and_multiply(input.data) + return QueryPipelineResult(result, None) + except Exception as e: + return QueryPipelineResult(None, e) + + +def check_input_and_multiply(num: Optional[int]) -> int: + if num == 0 or num is None: + raise Exception("Input cannot be zero") + return num * 2 + + +def test_query_pipeline_stage() -> None: + input = QueryPipelineResult(data=1, error=None) + result = TestQueryPipelineStage().execute(input) + assert result.data == 2 + + input = QueryPipelineResult(data=0, error=None) + result = TestQueryPipelineStage().execute(input) + assert str(result.error) == "Input cannot be zero" + + with pytest.raises(InvalidQueryPipelineResult): + input = QueryPipelineResult(data=None, error=None)