Skip to content

Commit

Permalink
Merge branch 'master' into dbanda/replayer-match-ops-labels
Browse files Browse the repository at this point in the history
  • Loading branch information
dbanda authored Mar 21, 2024
2 parents 95e613f + f66a9fa commit 226c29a
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 17 deletions.
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
# Changelog

## 24.3.0

### Various fixes & improvements

- Unrevert: feat: Remove query splitters from the API (#5581) by @evanh
- feat: Add use_case_id index to generic metrics (#5655) by @evanh
- ref(ci): Remove deleted test file (#5656) by @evanh
- fix vscode debugger (#5652) by @kylemumma
- chore: Upgrade snuba-sdk to 2.0.31 (#5647) by @iambriccardo
- fix(gocd): put snuba cmd into $SNUBA_CMD (#5654) by @MeredithAnya
- enable canary health check (#5649) by @enochtangg
- Revert "fix(CapMan): Allocation Policies causing potentially timeout errors on ST (#4403)" (703042e1) by @getsentry-bot
- fix(gocd): add SNUBA_CMD_TYPE (#5648) by @MeredithAnya
- Allows empty `trace_id` (#5637) by @xurui-c
- fix: Fix bump version for rust (#5643) by @lynnagara
- feat(generic-metrics): Add metrics around encoding format type in processor (#5627) by @ayirr7
- feat: filter by metric_id in select logical query optimizer (#5610) by @kylemumma
- fix(gocd): fix unbound variable (#5641) by @MeredithAnya
- ref: bump sentry-kafka-schemas to 0.1.60 (#5642) by @getsentry-bot
- add canary health check to gocd pipeline (#5638) by @enochtangg
- ref(codecov) Try out the failed test feature in Codecov (#5635) by @evanh
- feat(spans): Enable spans storage in ST and self-hosted (#5629) by @phacops
- fix: Fix a bug in HexIntColumnProcessor that skipped array conditions (#5640) by @evanh
- ref(gocd): use shared script query-fetcher (#5639) by @MeredithAnya
- ref(gocd): add comparer pipeline, consolidate script? (#5636) by @MeredithAnya
- feat(spans): Set the migration group as complete to run migrations everywhere (#5634) by @phacops
- feat(admin): Absolute imports in snuba-admin (#5630) by @volokluev
- the default value of trace_id will be a randomly generated uuid inste… (#5628) by @xurui-c

_Plus 72 more_

## 24.2.0

### Various fixes & improvements
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
copyright = "2021, Sentry Team and Contributors"
author = "Sentry Team and Contributors"

release = "24.3.0.dev0"
release = "24.4.0.dev0"


# -- General configuration ---------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from setuptools import find_packages, setup

VERSION = "24.3.0.dev0"
VERSION = "24.4.0.dev0"


def get_requirements() -> Sequence[str]:
Expand Down
6 changes: 4 additions & 2 deletions snuba/admin/iam_policy/iam_policy.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
},
{
"members": [
"group:team-starfish@sentry.io",
"group:team-ingest@sentry.io"
"group:team-ingest@sentry.io",
"group:team-visibility@sentry.io",
"group:team-performance@sentry.io",
"group:team-eng-managers@sentry.io"
],
"role": "roles/CardinalityAnalyzer"
},
Expand Down
30 changes: 19 additions & 11 deletions snuba/admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
runner = Runner()
audit_log = AuditLog()

ORG_ID = 1


@application.errorhandler(UnauthorizedException)
def handle_invalid_json(exception: UnauthorizedException) -> Response:
Expand Down Expand Up @@ -237,9 +239,11 @@ def do_action() -> None:
if not dry_run:
audit_log.record(
user or "",
AuditLogAction.RAN_MIGRATION_STARTED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_STARTED,
(
AuditLogAction.RAN_MIGRATION_STARTED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_STARTED
),
{"migration": str(migration_key), "force": force, "fake": fake},
)
check_for_inactive_replicas(
Expand All @@ -256,19 +260,23 @@ def do_action() -> None:
if not dry_run:
audit_log.record(
user or "",
AuditLogAction.RAN_MIGRATION_COMPLETED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_COMPLETED,
(
AuditLogAction.RAN_MIGRATION_COMPLETED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_COMPLETED
),
{"migration": str(migration_key), "force": force, "fake": fake},
notify=True,
)

def notify_error() -> None:
audit_log.record(
user or "",
AuditLogAction.RAN_MIGRATION_FAILED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_FAILED,
(
AuditLogAction.RAN_MIGRATION_FAILED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_FAILED
),
{"migration": str(migration_key), "force": force, "fake": fake},
notify=True,
)
Expand Down Expand Up @@ -667,7 +675,7 @@ def config(config_key: str) -> Response:
400,
{"Content-Type": "application/json"},
)
except (state.MismatchedTypeException):
except state.MismatchedTypeException:
return Response(
json.dumps({"error": "Mismatched type"}),
400,
Expand Down Expand Up @@ -997,7 +1005,7 @@ def dlq_replay() -> Response:
@check_tool_perms(tools=[AdminTools.PRODUCTION_QUERIES])
def production_snql_query() -> Response:
body = json.loads(request.data)
body["tenant_ids"] = {"referrer": request.referrer}
body["tenant_ids"] = {"referrer": request.referrer, "organization_id": ORG_ID}
try:
ret = run_snql_query(body, g.user.email)
return ret
Expand Down
1 change: 1 addition & 0 deletions snuba/cli/subscriptions_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def subscriptions_executor(

metrics_tags = {
"dataset": dataset_name,
"entity": entity_names[0],
}

if slice_id:
Expand Down
66 changes: 65 additions & 1 deletion snuba/pipeline/query_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Generic, Sequence, TypeVar, Union
from dataclasses import dataclass
from typing import Generic, Optional, Sequence, TypeVar, Union

from snuba.datasets.plans.query_plan import (
ClickhouseQueryPlan,
Expand All @@ -12,6 +15,9 @@
from snuba.web import QueryResult

TPlan = TypeVar("TPlan", bound=Union[ClickhouseQueryPlan, CompositeQueryPlan])
T = TypeVar("T")
Tin = TypeVar("Tin")
Tout = TypeVar("Tout")


class QueryPlanner(ABC, Generic[TPlan]):
Expand Down Expand Up @@ -76,3 +82,61 @@ def build_planner(
self, query: Query, settings: QuerySettings
) -> QueryPlanner[ClickhouseQueryPlan]:
raise NotImplementedError


class QueryPipelineStage(Generic[Tin, Tout]):
"""
This class represents a single stage in the snuba query execution pipeline.
The purpose of this class is to provide an organized and transparent interface to
execute specific processing steps on the query with clearly defined inputs and outputs.
These stages are designed to be composed and/or swapped among each other to form a
a flexible query pipeline.
Some examples of a query pipeline stage may include:
* Execute all entity query processors defined on the entity yaml
* Apply query transformation from logical representation to Clickhouse representation
* Execute all storage processors defined on the storage yaml
* Run query execution
* Query reporting
To create a Query Pipeline Stage, the main components to specify are:
an input type, an output type, and a execution function which returns the output wrapped with QueryPipelineResult.
==============================================
>>> class MyQueryPipelineStage(QueryPipelineStage[LogicalQuery, LogicalQuery]):
>>> def _execute(self, input: QueryPipelineResult[LogicalQuery]) -> QueryPipelineResult[LogicalQuery]:
>>> try:
>>> result = my_complex_processing_function(input.data)
>>> return QueryPipelineResult(result, None)
>>> except Exception as e:
>>> return QueryPipelineResult(None, e)
"""

@abstractmethod
def _execute(self, input: QueryPipelineResult[Tin]) -> QueryPipelineResult[Tout]:
raise NotImplementedError

def execute(self, input: QueryPipelineResult[Tin]) -> QueryPipelineResult[Tout]:
if input.error:
# Forward the error to next stage of pipeline
return QueryPipelineResult(None, input.error)
return self._execute(input)


class InvalidQueryPipelineResult(Exception):
pass


@dataclass
class QueryPipelineResult(ABC, Generic[T]):
"""
A container to represent the result of a query pipeline stage.
"""

data: Optional[T]
error: Optional[Exception]

def __post_init__(self) -> None:
if self.data is None and self.error is None:
raise InvalidQueryPipelineResult(
"QueryPipelineResult must have either data or error set"
)
12 changes: 12 additions & 0 deletions snuba/query/processors/logical/filter_in_select_optimizer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import logging

from snuba import environment
from snuba.query.composite import CompositeQuery
from snuba.query.conditions import binary_condition
from snuba.query.data_source.simple import Entity as QueryEntity
Expand All @@ -11,6 +14,7 @@
)
from snuba.query.logical import Query as LogicalQuery
from snuba.state import get_int_config
from snuba.utils.metrics.wrapper import MetricsWrapper

"""
Domain maps from a property to the specific values that are being filtered for. Ex:
Expand All @@ -24,6 +28,10 @@
"""
Domain = dict[Column | SubscriptableReference, set[Literal]]

metrics = MetricsWrapper(environment.metrics, "api")

logger = logging.getLogger(__name__)


class FilterInSelectOptimizer:
"""
Expand All @@ -48,6 +56,9 @@ def process_mql_query(
try:
domain = self.get_domain_of_mql_query(query)
except ValueError:
logger.warning(
"Failed getting domain", exc_info=True, extra={"query": query}
)
domain = {}

if domain:
Expand All @@ -73,6 +84,7 @@ def process_mql_query(
)
assert domain_filter is not None
query.add_condition_to_ast(domain_filter)
metrics.increment("kyles_optimizer_optimized")

def get_domain_of_mql_query(
self, query: LogicalQuery | CompositeQuery[QueryEntity]
Expand Down
2 changes: 1 addition & 1 deletion snuba/settings/settings_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

OPTIMIZE_PARALLEL_MAX_JITTER_MINUTES = 0

ADMIN_ALLOWED_PROD_PROJECTS = [1]
ADMIN_ALLOWED_PROD_PROJECTS = [1, 11276]

REDIS_CLUSTERS = {
key: {
Expand Down
20 changes: 20 additions & 0 deletions tests/admin/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,26 @@ def test_prod_snql_query_valid_query(admin_api: FlaskClient) -> None:
assert "data" in data


@pytest.mark.redis_db
@pytest.mark.clickhouse_db
def test_prod_snql_query_multiple_allowed_projects(admin_api: FlaskClient) -> None:
snql_query = """
MATCH (transactions)
SELECT title
WHERE project_id IN array(1, 11276)
AND finish_ts >= toDateTime('2023-01-01 00:00:00')
AND finish_ts < toDateTime('2023-02-01 00:00:00')
"""
response = admin_api.post(
"/production_snql_query",
data=json.dumps({"dataset": "transactions", "query": snql_query}),
headers={"Referer": "https://snuba-admin.getsentry.net/"},
)
assert response.status_code == 200
data = json.loads(response.data)
assert "data" in data


@pytest.mark.redis_db
@pytest.mark.clickhouse_db
def test_prod_snql_query_invalid_project_query(admin_api: FlaskClient) -> None:
Expand Down
37 changes: 37 additions & 0 deletions tests/pipeline/test_pipeline_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Optional

import pytest

from snuba.pipeline.query_pipeline import (
InvalidQueryPipelineResult,
QueryPipelineResult,
QueryPipelineStage,
)


class TestQueryPipelineStage(QueryPipelineStage[int, int]):
def _execute(self, input: QueryPipelineResult[int]) -> QueryPipelineResult[int]:
try:
result = check_input_and_multiply(input.data)
return QueryPipelineResult(result, None)
except Exception as e:
return QueryPipelineResult(None, e)


def check_input_and_multiply(num: Optional[int]) -> int:
if num == 0 or num is None:
raise Exception("Input cannot be zero")
return num * 2


def test_query_pipeline_stage() -> None:
input = QueryPipelineResult(data=1, error=None)
result = TestQueryPipelineStage().execute(input)
assert result.data == 2

input = QueryPipelineResult(data=0, error=None)
result = TestQueryPipelineStage().execute(input)
assert str(result.error) == "Input cannot be zero"

with pytest.raises(InvalidQueryPipelineResult):
input = QueryPipelineResult(data=None, error=None)

0 comments on commit 226c29a

Please sign in to comment.