Skip to content

Commit

Permalink
Fix test?
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-sentry committed Jul 23, 2024
1 parent 63b7fe7 commit dd24259
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 39 deletions.
39 changes: 39 additions & 0 deletions snuba/migrations/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union

Expand Down Expand Up @@ -467,6 +468,44 @@ def format_sql(self) -> str:
return f"ALTER TABLE {self.__table_name} ADD INDEX IF NOT EXISTS {self.__index_name} {self.__index_expression} TYPE {self.__index_type} GRANULARITY {self.__granularity}{optional_after_clause};"


@dataclass
class AddIndicesData:
name: str # e.g.: bf_index
expression: str # e.g.: mapKeys(my_map)
type: str # e.g.: bloom_filter(0.1)
granularity: int # e.g.: 4


class AddIndices(SqlOperation):
"""
Adds an index.
Only works with the MergeTree family of tables.
In ClickHouse versions prior to 20.1.2.4, this requires setting
allow_experimental_data_skipping_indices = 1
"""

def __init__(
self,
storage_set: StorageSetKey,
table_name: str,
indices: Sequence[AddIndicesData],
target: OperationTarget = OperationTarget.UNSET,
):
super().__init__(storage_set, target=target)
self.__table_name = table_name
self.__indices = indices

def format_sql(self) -> str:
statements = [
f"ADD INDEX IF NOT EXISTS {idx['name']} {idx['expression']} TYPE {idx['type']} GRANULARITY {idx['granularity']}"
for idx in self.__indices
]

return f"ALTER TABLE {self.__table_name} {', '.join(statements)};"


class DropIndex(SqlOperation):
"""
Drops an index.
Expand Down
68 changes: 29 additions & 39 deletions snuba/snuba_migrations/events_analytics_platform/0001_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget, SqlOperation
from snuba.migrations.operations import AddIndicesData, OperationTarget, SqlOperation
from snuba.utils.schemas import (
UUID,
Bool,
Expand Down Expand Up @@ -83,63 +83,48 @@
]
)

index_create_ops: Sequence[SqlOperation] = (
indices: Sequence[AddIndicesData] = (
[
operations.AddIndex(
storage_set=storage_set_name,
table_name=local_table_name,
index_name="bf_trace_id",
index_expression="trace_id",
index_type="bloom_filter",
AddIndicesData(
name="bf_trace_id",
expression="trace_id",
type="bloom_filter",
granularity=1,
target=OperationTarget.LOCAL,
),
)
]
+ [
operations.AddIndex(
storage_set=storage_set_name,
table_name=local_table_name,
index_name=f"bf_attr_str_{i}",
index_expression=f"mapKeys(attr_str_{i})",
index_type="bloom_filter",
AddIndicesData(
name=f"bf_attr_str_{i}",
expression=f"mapKeys(attr_str_{i})",
type="bloom_filter",
granularity=1,
target=OperationTarget.LOCAL,
)
for i in range(num_attr_buckets)
]
+ [
operations.AddIndex(
storage_set=storage_set_name,
table_name=local_table_name,
index_name=f"bf_attr_str_val_{i}",
index_expression=f"mapValues(attr_str_{i})",
index_type="ngrambf_v1(4, 1024, 10, 1)",
AddIndicesData(
name=f"bf_attr_str_val_{i}",
expression=f"mapValues(attr_str_{i})",
type="ngrambf_v1(4, 1024, 10, 1)",
granularity=1,
target=OperationTarget.LOCAL,
)
for i in range(num_attr_buckets)
]
+ [
operations.AddIndex(
storage_set=storage_set_name,
table_name=local_table_name,
index_name=f"bf_attr_num_{i}",
index_expression=f"mapKeys(attr_num_{i})",
index_type="bloom_filter",
AddIndicesData(
name=f"bf_attr_num_{i}",
expression=f"mapKeys(attr_num_{i})",
type="bloom_filter",
granularity=1,
target=OperationTarget.LOCAL,
)
for i in range(num_attr_buckets)
]
+ [
operations.AddIndex(
storage_set=storage_set_name,
table_name=local_table_name,
index_name=f"bf_attr_bool_{i}",
index_expression=f"mapKeys(attr_bool_{i})",
index_type="bloom_filter",
AddIndicesData(
name=f"bf_attr_bool_{i}",
expression=f"mapKeys(attr_bool_{i})",
type="bloom_filter",
granularity=1,
target=OperationTarget.LOCAL,
)
for i in range(num_attr_buckets)
]
Expand Down Expand Up @@ -176,8 +161,13 @@ def forwards_ops(self) -> Sequence[SqlOperation]:
),
target=OperationTarget.DISTRIBUTED,
),
operations.AddIndices(
storage_set=storage_set_name,
table_name=local_table_name,
indices=indices,
target=OperationTarget.LOCAL,
),
]
res.extend(index_create_ops)
return res

def backwards_ops(self) -> Sequence[SqlOperation]:
Expand Down

0 comments on commit dd24259

Please sign in to comment.