Skip to content

Commit

Permalink
Merge branch 'master' into spans_v2_migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-sentry authored Jul 22, 2024
2 parents 3d32ca8 + 7ce112c commit 1f42e0c
Show file tree
Hide file tree
Showing 29 changed files with 589 additions and 121 deletions.
13 changes: 12 additions & 1 deletion MIGRATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,18 @@ The `snuba migrations` CLI tool should be used to manage migrations.

## Writing migrations

In order to add a new migration, first determine which migration group the new migration should be added to, and add an entry to that group in `migrations/groups.py` with the new migration identifier you have chosen. By convention we prefix migration IDs with a number matching the position of the migration in the group, i.e. the 4th migration in that group will be prefixed with `0004_`. Add a file which will contain the new migration at `/migrations/snuba_migrations/<group>/<migration_id>.py`.
### Auto-generating migrations
If you want to write a migration for the following common use cases, you can skip the rest of this document and have it autogenerated for you:
* Add a column to an existing table

Instructions:
1. First locate the relevant `storage.yaml` file for the table you want to update. If you dont already know, this is basically just a schema definition for the table. This file will be located in `snuba/datatsets/configuration/<dataset>/storages/<storage>.yaml` ex. `.../group_attributes/storages/group_attributes.yaml`.
2. Make the desired modifications to the storage.yaml. This will likely look something like adding a new column to `schema.columns`
3. run the command `snuba migrations generate path/to/storage.yaml`. This will generate the migration based on the modifications to the storage.yaml file.
4. You're basically done, now you just have to commit, push, and merge your migration!

### Overview
In order to add a new migration, first determine which migration group the new migration should be added to. You must prefix migration IDs with a number matching the position of the migration in the group, i.e. the 4th migration in that group will be prefixed with `0004_`. Add a file which will contain the new migration at `/migrations/snuba_migrations/<group>/<migration_id>.py`.

If you need to create a new group, add the group to `migrations.groups.MigrationGroup` and a loader to `migrations.group_loader` for the group defining the path to the directory where that group's migrations will be located. Register these to `migrations.groups._REGISTERED_MIGRATION_GROUPS` - note the position of the group in this list determines the order the migrations will be executed in. For a new MigrationGroup, the `readiness_state` should be set to `limited` which means the migrations will only be automatically executed in CI and the local environment.

Expand Down
3 changes: 2 additions & 1 deletion snuba/admin/iam_policy/iam_policy.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"group:team-ingest@sentry.io",
"group:team-visibility@sentry.io",
"group:team-performance@sentry.io",
"group:team-eng-managers@sentry.io"
"group:team-eng-managers@sentry.io",
"group:team-telemetry-experience@sentry.io"
],
"role": "roles/CardinalityAnalyzer"
},
Expand Down
7 changes: 4 additions & 3 deletions snuba/admin/production_queries/prod_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ def run_snql_query(body: Dict[str, Any], user: str) -> Response:

@audit_log
def run_query_with_audit(query: str, user: str) -> Response:
dataset = get_dataset(body.pop("dataset"))
dataset_name = body.pop("dataset")
dataset = get_dataset(dataset_name)
body["dry_run"] = True
response = dataset_query(dataset, body, Timer("admin"))
response = dataset_query(dataset_name, body, Timer("admin"))
if response.status_code != 200:
return response

body["dry_run"] = False
_validate_projects_in_query(body, dataset)
return dataset_query(dataset, body, Timer("admin"))
return dataset_query(dataset_name, body, Timer("admin"))

return run_query_with_audit(body["query"], user)

Expand Down
13 changes: 4 additions & 9 deletions snuba/admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@
load_instruction,
store_instruction,
)
from snuba.datasets.factory import (
InvalidDatasetError,
get_dataset,
get_enabled_dataset_names,
)
from snuba.datasets.factory import InvalidDatasetError, get_enabled_dataset_names
from snuba.datasets.storages.factory import get_all_storage_keys, get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.migrations.connect import check_for_inactive_replicas
Expand Down Expand Up @@ -780,9 +776,9 @@ def snuba_debug() -> Response:
body = json.loads(request.data)
body["debug"] = True
body["dry_run"] = True
dataset_name = body.pop("dataset")
try:
dataset = get_dataset(body.pop("dataset"))
response = dataset_query(dataset, body, Timer("admin"))
response = dataset_query(dataset_name, body, Timer("admin"))
data = response.get_json()
assert isinstance(data, dict)

Expand Down Expand Up @@ -1026,8 +1022,7 @@ def production_snql_query() -> Response:
body = json.loads(request.data)
body["tenant_ids"] = {"referrer": request.referrer, "organization_id": ORG_ID}
try:
ret = run_snql_query(body, g.user.email)
return ret
return run_snql_query(body, g.user.email)
except InvalidQueryException as exception:
return Response(
json.dumps({"error": {"message": str(exception)}}, indent=4),
Expand Down
4 changes: 3 additions & 1 deletion snuba/cli/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def add_node(
@click.option("--name", type=str, help="optional name for the migration")
def generate(storage_path: str, name: Optional[str] = None) -> None:
"""
Given a path to user-modified storage.yaml definition (inside snuba/datasets/configuration/*/storages/*.py),
Given a path to user-modified storage.yaml definition (inside snuba/datasets/configuration/*/storages/*.yaml),
and an optional name for the migration,
generates a snuba migration based on the schema modifications to the storage.yaml.
Expand All @@ -398,6 +398,8 @@ def generate(storage_path: str, name: Optional[str] = None) -> None:
The generated migration will be written into the local directory. The user is responsible for making
the commit, PR, and merging.
see MIGRATIONS.md in the root folder for more info
"""
expected_pattern = r"(.+/)?snuba/datasets/configuration/.*/storages/.*\.(yml|yaml)"
if not re.fullmatch(expected_pattern, storage_path):
Expand Down
2 changes: 2 additions & 0 deletions snuba/datasets/configuration/discover/entities/discover.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ schema:
args: { schema_modifiers: [nullable] },
},
{ name: profile_id, type: UUID, args: { schema_modifiers: [nullable] } },
{ name: profiler_id, type: UUID, args: { schema_modifiers: [nullable] } },
{
name: replay_id,
type: UUID,
Expand Down Expand Up @@ -297,6 +298,7 @@ storages:
- group_ids
- app_start_type
- profile_id
- profiler_id
- mapper: ColumnToLiteral
args:
from_table_name: null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ storages:
- group_ids
- app_start_type
- profile_id
- profiler_id
- mapper: ColumnToFunctionOnColumn
args:
from_table_name: null
Expand Down Expand Up @@ -465,6 +466,7 @@ storages:
- group_ids
- app_start_type
- profile_id
- profiler_id
subscriptables:
- mapper: SubscriptableMapper
args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ schema:
},
{ name: app_start_type, type: String },
{ name: profile_id, type: UUID, args: { schema_modifiers: [nullable] } },
{ name: profiler_id, type: UUID, args: { schema_modifiers: [nullable] } },
{ name: replay_id, type: UUID, args: { schema_modifiers: [nullable] } }
]
required_time_column: finish_ts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ schema:
},
{ name: app_start_type, type: String },
{ name: profile_id, type: UUID, args: { schema_modifiers: [nullable] } },
{ name: profiler_id, type: UUID, args: { schema_modifiers: [nullable] } },
{ name: replay_id, type: UUID, args: { schema_modifiers: [nullable] } },
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ schema:
},
{ name: app_start_type, type: String },
{ name: profile_id, type: UUID, args: { schema_modifiers: [nullable] } },
{ name: profiler_id, type: UUID, args: { schema_modifiers: [nullable] } },
{ name: replay_id, type: UUID, args: { schema_modifiers: [nullable] } },
]
local_table_name: transactions_local
Expand Down Expand Up @@ -203,7 +204,7 @@ query_processors:
trace.span_id: span_id
- processor: UUIDColumnProcessor
args:
columns: [event_id, trace_id, profile_id, replay_id]
columns: [event_id, trace_id, profile_id, profiler_id, replay_id]
- processor: HexIntColumnProcessor
args:
columns: [span_id]
Expand Down
5 changes: 5 additions & 0 deletions snuba/datasets/processors/transactions_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ def _process_contexts_and_user(
if profile_id is not None:
processed["profile_id"] = str(uuid.UUID(profile_id))

profiler_id = profile_context.get("profiler_id")
if profiler_id is not None:
processed["profiler_id"] = str(uuid.UUID(profiler_id))

replay_context = contexts.get("replay")
if replay_context is not None:
replay_id = replay_context.get("replay_id")
Expand Down Expand Up @@ -432,6 +436,7 @@ def _sanitize_contexts(
# again in the context array
profile_ctx = sanitized_context.get("profile", {})
profile_ctx.pop("profile_id", None)
profile_ctx.pop("profiler_id", None)
replay_ctx = sanitized_context.get("replay", {})
replay_ctx.pop("replay_id", None)

Expand Down
5 changes: 5 additions & 0 deletions snuba/migrations/group_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ def get_migrations(self) -> Sequence[str]:
last = None
for fname in migration_filenames:
if last is not None and fname[:4] == last[:4]:
"""
if this is failing in CI when u think the files dont exist
i think its a github cache or something u might have to
remake the PR or branch or something
"""
raise ValueError(
f"""Duplicate migration number for the following files:
{os.path.join(migration_folder,last)}.py
Expand Down
25 changes: 25 additions & 0 deletions snuba/migrations/operations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import time
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union
Expand Down Expand Up @@ -77,12 +78,36 @@ def execute(self) -> None:
logger.info(f"Executing on {self.target.value} node: {node}")
try:
connection.execute(self.format_sql(), settings=self._settings)
self._block_on_mutations(connection)
except Exception:
logger.exception(
f"Failed to execute operation on {self.storage_set}, target: {self.target}\n{self.format_sql()}\n{self._settings}"
)
raise

def _block_on_mutations(
self, conn: ClickhousePool, poll_seconds: int = 5, timeout_seconds: int = 300
) -> None:
"""
This function blocks until all entries of system.mutations
have is_done=1. Polls system.mutations every poll_seconds.
Raises error if not unblocked after timeout_seconds.
"""
slept_so_far = 0
while True:
is_mutating = conn.execute(
"select count(*) from system.mutations where is_done=0"
).results != [(0,)]
if not is_mutating:
return
elif slept_so_far >= timeout_seconds:
raise TimeoutError(
f"{conn.host}:{conn.port} not finished mutating after {timeout_seconds} seconds"
)
else:
time.sleep(poll_seconds)
slept_so_far += poll_seconds

@abstractmethod
def format_sql(self) -> str:
raise NotImplementedError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
DEFAULT_BYTES_THROTTLE_DIVIDER = 2
DEFAULT_THREADS_THROTTLE_DIVIDER = 2
QUOTA_UNIT = "bytes"
SUGGESTION = "scan less bytes"
SUGGESTION = "The feature, organization/project is scanning too many bytes, this usually means they are abusing that API"


class BytesScannedRejectingPolicy(AllocationPolicy):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
DEFAULT_OVERRIDE_LIMIT = -1
DEFAULT_BYTES_SCANNED_LIMIT = 10000000
QUOTA_UNIT = "bytes"
SUGGESTION = "scan less bytes"
SUGGESTION = "The feature, organization/project is scanning too many bytes, this usually means they are abusing that API"


class BytesScannedWindowAllocationPolicy(AllocationPolicy):
Expand Down
2 changes: 1 addition & 1 deletion snuba/query/allocation_policies/concurrent_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from snuba.query.allocation_policies import MAX_THRESHOLD, NO_SUGGESTION

QUOTA_UNIT = "concurrent_queries"
SUGGESTION = "scan less concurrent queries"
SUGGESTION = "A customer is sending too many queries to snuba. The customer may be abusing an API or the queries may be innefficient"
import typing


Expand Down
2 changes: 1 addition & 1 deletion snuba/query/allocation_policies/per_referrer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
_THREADS_THROTTLE_DIVIDER = 2

QUOTA_UNIT = "concurrent_queries"
SUGGESTION = "scan less concurrent queries"
SUGGESTION = "This feature is doing too many concurrent queries. Customers are being affected arbitrarily. Either means the feature is not being appropriately rate limited on the sentry side or that the queries are inefficient"


class ReferrerGuardRailPolicy(BaseConcurrentRateLimitAllocationPolicy):
Expand Down
35 changes: 18 additions & 17 deletions snuba/query/mql/context_population.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def scope_conditions(

def rollup_expressions(
mql_context: MQLContext, table_name: str | None = None
) -> tuple[Expression, bool, OrderBy | None, SelectedExpression | None]:
) -> tuple[Expression, bool, OrderBy | None, SelectedExpression]:
"""
This function returns four values based on the rollup field in the MQL context:
- granularity_condition: an expression that filters the granularity column based on the granularity in the MQL context
Expand Down Expand Up @@ -130,26 +130,27 @@ def rollup_expressions(
)

with_totals = rollup.with_totals == "True"
selected_time = None
orderby = None

prefix = "" if not table_name else f"{table_name}."
time_expression = FunctionCall(
f"{prefix}time",
"toStartOfInterval",
parameters=(
Column(None, table_name, "timestamp"),
FunctionCall(
None,
"toIntervalSecond",
(Literal(None, rollup.interval),),
),
Literal(None, "Universal"),
),
)
selected_time = SelectedExpression("time", time_expression)

if rollup.interval:
# If an interval is specified, then we need to group the time by that interval,
# return the time in the select, and order the results by that time.
prefix = "" if not table_name else f"{table_name}."
time_expression = FunctionCall(
f"{prefix}time",
"toStartOfInterval",
parameters=(
Column(None, table_name, "timestamp"),
FunctionCall(
None,
"toIntervalSecond",
(Literal(None, rollup.interval),),
),
Literal(None, "Universal"),
),
)
selected_time = SelectedExpression("time", time_expression)
orderby = OrderBy(OrderByDirection.ASC, time_expression)
elif rollup.orderby is not None:
direction = (
Expand Down
22 changes: 9 additions & 13 deletions snuba/query/mql/parser_supported_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,6 @@ def populate_query_from_mql_context(
assert isinstance(data_source, QueryEntity)
entity_data.append((data_source.key, alias))

selected_time_found = False
for entity_key, table_alias in entity_data:
time_condition = start_end_time_condition(mql_context, entity_key, table_alias)
scope_condition = scope_conditions(mql_context, table_alias)
Expand All @@ -1305,20 +1304,17 @@ def populate_query_from_mql_context(
query.set_totals(with_totals)
if orderby:
query.set_ast_orderby([orderby])
query.set_ast_selected_columns(
list(query.get_selected_columns()) + [selected_time]
)

if selected_time:
selected_time_found = True
query.set_ast_selected_columns(
list(query.get_selected_columns()) + [selected_time]
)

groupby = query.get_groupby()
if groupby:
query.set_ast_groupby(list(groupby) + [selected_time.expression])
else:
query.set_ast_groupby([selected_time.expression])
groupby = query.get_groupby()
if groupby:
query.set_ast_groupby(list(groupby) + [selected_time.expression])
else:
query.set_ast_groupby([selected_time.expression])

if isinstance(query, CompositeQuery) and selected_time_found:
if isinstance(query, CompositeQuery):
# If the query is grouping by time, that needs to be added to the JoinClause keys to
# ensure we correctly join the subqueries. The column names will be the same for all the
# subqueries, so we just need to map all the table aliases.
Expand Down
9 changes: 9 additions & 0 deletions snuba/snuba_migrations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
If you don't know anything about snuba migrations see `MIGRATIONS.md` in the root folder

Each folder in here represents a migration group. see `snuba/migrations/group_loader.py`

Each migration (ex. `events/0001_events_initial.py`) needs to follow the naming scheme `xxxx_migration_name.py`
where `xxxx` is the 0 padded migration number. Migrations are applied in order of migration number. See `snuba/migrations/group_loader.py` for more info.

## Migration Auto-Generation
Who wants to write their own migrations by hand? Certainly not me! See `MIGRATIONS.md` to learn how you can have them generated for you.
Loading

0 comments on commit 1f42e0c

Please sign in to comment.