diff --git a/snuba/migrations/operations.py b/snuba/migrations/operations.py index 585c21e52e..6d5b9b3271 100644 --- a/snuba/migrations/operations.py +++ b/snuba/migrations/operations.py @@ -3,6 +3,7 @@ import logging import time from abc import ABC, abstractmethod +from dataclasses import dataclass from enum import Enum from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union @@ -467,6 +468,44 @@ def format_sql(self) -> str: return f"ALTER TABLE {self.__table_name} ADD INDEX IF NOT EXISTS {self.__index_name} {self.__index_expression} TYPE {self.__index_type} GRANULARITY {self.__granularity}{optional_after_clause};" +@dataclass +class AddIndicesData: + name: str # e.g.: bf_index + expression: str # e.g.: mapKeys(my_map) + type: str # e.g.: bloom_filter(0.1) + granularity: int # e.g.: 4 + + +class AddIndices(SqlOperation): + """ + Adds an index. + + Only works with the MergeTree family of tables. + + In ClickHouse versions prior to 20.1.2.4, this requires setting + allow_experimental_data_skipping_indices = 1 + """ + + def __init__( + self, + storage_set: StorageSetKey, + table_name: str, + indices: Sequence[AddIndicesData], + target: OperationTarget = OperationTarget.UNSET, + ): + super().__init__(storage_set, target=target) + self.__table_name = table_name + self.__indices = indices + + def format_sql(self) -> str: + statements = [ + f"ADD INDEX IF NOT EXISTS {idx['name']} {idx['expression']} TYPE {idx['type']} GRANULARITY {idx['granularity']}" + for idx in self.__indices + ] + + return f"ALTER TABLE {self.__table_name} {', '.join(statements)};" + + class DropIndex(SqlOperation): """ Drops an index. diff --git a/snuba/snuba_migrations/events_analytics_platform/0001_spans.py b/snuba/snuba_migrations/events_analytics_platform/0001_spans.py index 899aff4dac..a1f0b6db44 100644 --- a/snuba/snuba_migrations/events_analytics_platform/0001_spans.py +++ b/snuba/snuba_migrations/events_analytics_platform/0001_spans.py @@ -3,7 +3,7 @@ from snuba.clusters.storage_sets import StorageSetKey from snuba.migrations import migration, operations, table_engines from snuba.migrations.columns import MigrationModifiers as Modifiers -from snuba.migrations.operations import OperationTarget, SqlOperation +from snuba.migrations.operations import AddIndicesData, OperationTarget, SqlOperation from snuba.utils.schemas import ( UUID, Bool, @@ -83,63 +83,48 @@ ] ) -index_create_ops: Sequence[SqlOperation] = ( +indices: Sequence[AddIndicesData] = ( [ - operations.AddIndex( - storage_set=storage_set_name, - table_name=local_table_name, - index_name="bf_trace_id", - index_expression="trace_id", - index_type="bloom_filter", + AddIndicesData( + name="bf_trace_id", + expression="trace_id", + type="bloom_filter", granularity=1, - target=OperationTarget.LOCAL, - ), + ) ] + [ - operations.AddIndex( - storage_set=storage_set_name, - table_name=local_table_name, - index_name=f"bf_attr_str_{i}", - index_expression=f"mapKeys(attr_str_{i})", - index_type="bloom_filter", + AddIndicesData( + name=f"bf_attr_str_{i}", + expression=f"mapKeys(attr_str_{i})", + type="bloom_filter", granularity=1, - target=OperationTarget.LOCAL, ) for i in range(num_attr_buckets) ] + [ - operations.AddIndex( - storage_set=storage_set_name, - table_name=local_table_name, - index_name=f"bf_attr_str_val_{i}", - index_expression=f"mapValues(attr_str_{i})", - index_type="ngrambf_v1(4, 1024, 10, 1)", + AddIndicesData( + name=f"bf_attr_str_val_{i}", + expression=f"mapValues(attr_str_{i})", + type="ngrambf_v1(4, 1024, 10, 1)", granularity=1, - target=OperationTarget.LOCAL, ) for i in range(num_attr_buckets) ] + [ - operations.AddIndex( - storage_set=storage_set_name, - table_name=local_table_name, - index_name=f"bf_attr_num_{i}", - index_expression=f"mapKeys(attr_num_{i})", - index_type="bloom_filter", + AddIndicesData( + name=f"bf_attr_num_{i}", + expression=f"mapKeys(attr_num_{i})", + type="bloom_filter", granularity=1, - target=OperationTarget.LOCAL, ) for i in range(num_attr_buckets) ] + [ - operations.AddIndex( - storage_set=storage_set_name, - table_name=local_table_name, - index_name=f"bf_attr_bool_{i}", - index_expression=f"mapKeys(attr_bool_{i})", - index_type="bloom_filter", + AddIndicesData( + name=f"bf_attr_bool_{i}", + expression=f"mapKeys(attr_bool_{i})", + type="bloom_filter", granularity=1, - target=OperationTarget.LOCAL, ) for i in range(num_attr_buckets) ] @@ -176,8 +161,13 @@ def forwards_ops(self) -> Sequence[SqlOperation]: ), target=OperationTarget.DISTRIBUTED, ), + operations.AddIndices( + storage_set=storage_set_name, + table_name=local_table_name, + indices=indices, + target=OperationTarget.LOCAL, + ), ] - res.extend(index_create_ops) return res def backwards_ops(self) -> Sequence[SqlOperation]: