Skip to content

Commit

Permalink
Merge branch 'master' into undo_xit
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-sentry authored Sep 18, 2024
2 parents 15810d1 + 157bd8e commit 2d32ae6
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,25 @@ schema:
[
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: trace_id, type: UUID },
{ name: attr_key, type: String },
{ name: attr_value, type: Float, args: { size: 64 } },
{
name: attr_min_value,
type: SimpleAggregateFunction,
args:
{
func: "min",
arg_types: [ { type: Float, args: { size: 64 } } ],
},
},
{
name: attr_max_value,
type: SimpleAggregateFunction,
args:
{
func: "max",
arg_types: [ { type: Float, args: { size: 64 } } ],
},
},
{ name: timestamp, type: DateTime },
{ name: retention_days, type: UInt, args: { size: 16 } },
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ schema:
[
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: trace_id, type: UUID },
{ name: attr_key, type: String },
{ name: attr_value, type: String },
{ name: timestamp, type: DateTime },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,25 @@ schema:
[
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: trace_id, type: UUID },
{ name: attr_key, type: String },
{ name: attr_value, type: Float, args: { size: 64 } },
{
name: attr_min_value,
type: SimpleAggregateFunction,
args:
{
func: "min",
arg_types: [ { type: Float, args: { size: 64 } } ],
},
},
{
name: attr_max_value,
type: SimpleAggregateFunction,
args:
{
func: "max",
arg_types: [ { type: Float, args: { size: 64 } } ],
},
},
{ name: timestamp, type: DateTime },
{ name: retention_days, type: UInt, args: { size: 16 } },
{
Expand All @@ -28,8 +44,8 @@ schema:
},
},
]
local_table_name: spans_num_attrs_local
dist_table_name: spans_num_attrs_dist
local_table_name: spans_num_attrs_2_local
dist_table_name: spans_num_attrs_2_dist
allocation_policies:
- name: ConcurrentRateLimitAllocationPolicy
args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ schema:
[
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: trace_id, type: UUID },
{ name: attr_key, type: String },
{ name: attr_value, type: String },
{ name: timestamp, type: DateTime },
Expand All @@ -28,8 +27,8 @@ schema:
},
}
]
local_table_name: spans_str_attrs_local
dist_table_name: spans_str_attrs_dist
local_table_name: spans_str_attrs_2_local
dist_table_name: spans_str_attrs_2_dist
allocation_policies:
- name: ConcurrentRateLimitAllocationPolicy
args:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
from __future__ import annotations

from typing import Sequence

from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget, SqlOperation
from snuba.utils.constants import ATTRIBUTE_BUCKETS
from snuba.utils.schemas import (
Column,
DateTime,
Float,
SimpleAggregateFunction,
String,
UInt,
)


class Migration(migration.ClickhouseNodeMigration):
"""
This migration creates a table meant to store just the attributes seen in a particular org.
The table is populated by a separate materialized view for each type of attribute.
"""

blocking = False
storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM
granularity = "8192"

old_str_mv = "spans_str_attrs_mv"
old_str_local_table = "spans_str_attrs_local"
old_str_dist_table = "spans_str_attrs_dist"
new_str_mv = "spans_str_attrs_2_mv"
new_str_local_table = "spans_str_attrs_2_local"
new_str_dist_table = "spans_str_attrs_2_dist"
str_columns: Sequence[Column[Modifiers]] = [
Column("organization_id", UInt(64)),
Column("project_id", UInt(64)),
Column("attr_key", String(modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("attr_value", String(modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("timestamp", DateTime(modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("retention_days", UInt(16)),
Column("count", SimpleAggregateFunction("sum", [UInt(64)])),
]

old_num_mv = "spans_num_attrs_mv"
old_num_local_table = "spans_num_attrs_local"
old_num_dist_table = "spans_num_attrs_dist"
new_num_mv = "spans_num_attrs_2_mv"
new_num_local_table = "spans_num_attrs_2_local"
new_num_dist_table = "spans_num_attrs_2_dist"
num_columns: Sequence[Column[Modifiers]] = [
Column("organization_id", UInt(64)),
Column("project_id", UInt(64)),
Column("attr_key", String()),
Column("attr_min_value", SimpleAggregateFunction("min", [Float(64)])),
Column("attr_max_value", SimpleAggregateFunction("max", [Float(64)])),
Column("timestamp", DateTime(modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("retention_days", UInt(16)),
Column("count", SimpleAggregateFunction("sum", [UInt(64)])),
]

def forwards_ops(self) -> Sequence[SqlOperation]:
return [
# first, drop the old MVs
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.old_str_mv,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.old_num_mv,
target=OperationTarget.LOCAL,
),
# next, drop the old dist tables
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.old_str_dist_table,
target=OperationTarget.DISTRIBUTED,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.old_num_dist_table,
target=OperationTarget.DISTRIBUTED,
),
# next, drop the old local tables
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.old_str_local_table,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.old_num_local_table,
target=OperationTarget.LOCAL,
),
# now, create new versions of the tables & MVs
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.new_str_local_table,
engine=table_engines.AggregatingMergeTree(
storage_set=self.storage_set_key,
primary_key="(organization_id, attr_key)",
order_by="(organization_id, attr_key, attr_value, project_id, timestamp, retention_days)",
partition_by="toMonday(timestamp)",
settings={
"index_granularity": self.granularity,
},
ttl="timestamp + toIntervalDay(retention_days)",
),
columns=self.str_columns,
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.new_str_dist_table,
engine=table_engines.Distributed(
local_table_name=self.new_str_local_table, sharding_key=None
),
columns=self.str_columns,
target=OperationTarget.DISTRIBUTED,
),
operations.CreateMaterializedView(
storage_set=self.storage_set_key,
view_name=self.new_str_mv,
columns=self.str_columns,
destination_table_name=self.new_str_local_table,
target=OperationTarget.LOCAL,
query=f"""
SELECT
organization_id,
project_id,
attrs.1 as attr_key,
attrs.2 as attr_value,
toStartOfDay(_sort_timestamp) AS timestamp,
retention_days,
1 AS count
FROM eap_spans_local
LEFT ARRAY JOIN
arrayConcat(
{", ".join(f"CAST(attr_str_{n}, 'Array(Tuple(String, String))')" for n in range(ATTRIBUTE_BUCKETS))},
array(
tuple('service', `service`),
tuple('segment_name', `segment_name`),
tuple('name', `name`)
)
) AS attrs
GROUP BY
organization_id,
project_id,
attr_key,
attr_value,
timestamp,
retention_days
""",
),
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.new_num_local_table,
engine=table_engines.AggregatingMergeTree(
storage_set=self.storage_set_key,
primary_key="(organization_id, attr_key)",
order_by="(organization_id, attr_key, attr_min_value, attr_max_value, timestamp, project_id, retention_days)",
partition_by="toMonday(timestamp)",
settings={
"index_granularity": self.granularity,
},
ttl="timestamp + toIntervalDay(retention_days)",
),
columns=self.num_columns,
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.new_num_dist_table,
engine=table_engines.Distributed(
local_table_name=self.new_num_local_table, sharding_key=None
),
columns=self.num_columns,
target=OperationTarget.DISTRIBUTED,
),
operations.CreateMaterializedView(
storage_set=self.storage_set_key,
view_name=self.new_num_mv,
columns=self.num_columns,
destination_table_name=self.new_num_local_table,
target=OperationTarget.LOCAL,
query=f"""
SELECT
organization_id,
project_id,
attrs.1 as attr_key,
attrs.2 as attr_min_value,
attrs.2 as attr_max_value,
toStartOfDay(_sort_timestamp) AS timestamp,
retention_days,
1 AS count
FROM eap_spans_local
LEFT ARRAY JOIN
arrayConcat(
{",".join(f"CAST(attr_num_{n}, 'Array(Tuple(String, Float64))')" for n in range(ATTRIBUTE_BUCKETS))},
array(
tuple('duration_ms', `duration_ms`::Float64)
)
) AS attrs
GROUP BY
organization_id,
project_id,
attrs.1,
attrs.2,
timestamp,
retention_days
""",
),
]

def backwards_ops(self) -> Sequence[SqlOperation]:
return [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.new_str_mv,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.new_str_local_table,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.new_str_dist_table,
target=OperationTarget.DISTRIBUTED,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.new_num_mv,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.new_num_local_table,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.new_num_dist_table,
target=OperationTarget.DISTRIBUTED,
),
]

0 comments on commit 2d32ae6

Please sign in to comment.