Skip to content

Commit

Permalink
feat(profiles): Add a dataset to store profile chunks (#5923)
Browse files Browse the repository at this point in the history
  • Loading branch information
phacops authored May 16, 2024
1 parent d4859d4 commit b4a2ffd
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 0 deletions.
1 change: 1 addition & 0 deletions snuba/clusters/storage_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"SPANS": "spans",
"GROUP_ATTRIBUTES": "group_attributes",
"METRICS_SUMMARIES": "metrics_summaries",
"PROFILE_CHUNKS": "profile_chunks",
}


Expand Down
10 changes: 10 additions & 0 deletions snuba/migrations/group_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,13 @@ def get_migrations(self) -> Sequence[str]:
"0002_metrics_summaries_add_tags_hashmap",
"0003_metrics_summaries_add_segment_id_duration_group_columns",
]


class ProfileChunksLoader(DirectoryLoader):
def __init__(self) -> None:
super().__init__("snuba.snuba_migrations.profile_chunks")

def get_migrations(self) -> Sequence[str]:
return [
"0001_create_profile_chunks_table",
]
8 changes: 8 additions & 0 deletions snuba/migrations/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
MetricsLoader,
MetricsSummariesLoader,
OutcomesLoader,
ProfileChunksLoader,
ProfilesLoader,
QuerylogLoader,
ReplaysLoader,
Expand Down Expand Up @@ -43,6 +44,7 @@ class MigrationGroup(Enum):
SPANS = "spans"
GROUP_ATTRIBUTES = "group_attributes"
METRICS_SUMMARIES = "metrics_summaries"
PROFILE_CHUNKS = "profile_chunks"


# Migration groups are mandatory by default. Specific groups can
Expand All @@ -59,6 +61,7 @@ class MigrationGroup(Enum):
MigrationGroup.SEARCH_ISSUES,
MigrationGroup.GROUP_ATTRIBUTES,
MigrationGroup.METRICS_SUMMARIES,
MigrationGroup.PROFILE_CHUNKS,
}


Expand Down Expand Up @@ -169,6 +172,11 @@ def __init__(
storage_sets_keys={StorageSetKey.METRICS_SUMMARIES},
readiness_state=ReadinessState.PARTIAL,
),
MigrationGroup.PROFILE_CHUNKS: _MigrationGroup(
loader=ProfileChunksLoader(),
storage_sets_keys={StorageSetKey.PROFILE_CHUNKS},
readiness_state=ReadinessState.PARTIAL,
),
}


Expand Down
1 change: 1 addition & 0 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
"group_attributes",
"generic_metrics_gauges",
"metrics_summaries",
"profile_chunks",
},
"single_node": True,
},
Expand Down
1 change: 1 addition & 0 deletions snuba/settings/settings_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"group_attributes",
"generic_metrics_gauges",
"metrics_summaries",
"profile_chunks",
},
"single_node": False,
"cluster_name": "cluster_one_sh",
Expand Down
1 change: 1 addition & 0 deletions snuba/settings/settings_test_distributed_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"group_attributes",
"generic_metrics_gauges",
"metrics_summaries",
"profile_chunks",
},
"single_node": False,
"cluster_name": "storage_cluster",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from typing import List, Sequence

from snuba.clickhouse.columns import UUID, Column, DateTime64, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget, SqlOperation

storage_set = StorageSetKey.PROFILE_CHUNKS
table_prefix = "profile_chunks"
local_table_name = f"{table_prefix}_local"
dist_table_name = f"{table_prefix}_dist"

columns: List[Column[Modifiers]] = [
Column("project_id", UInt(64)),
Column("profiler_id", UUID()),
Column("chunk_id", UUID()),
Column(
"start_timestamp",
DateTime64(
precision=6,
modifiers=Modifiers(codecs=["DoubleDelta"]),
),
),
Column(
"end_timestamp",
DateTime64(
precision=6,
modifiers=Modifiers(codecs=["DoubleDelta"]),
),
),
Column("retention_days", UInt(16)),
Column("partition", UInt(16)),
Column("offset", UInt(64)),
]


class Migration(migration.ClickhouseNodeMigration):
blocking = False

def forwards_ops(self) -> Sequence[SqlOperation]:
return [
operations.CreateTable(
storage_set=storage_set,
table_name=local_table_name,
columns=columns,
engine=table_engines.ReplacingMergeTree(
order_by="(project_id, profiler_id, start_timestamp, cityHash64(chunk_id))",
partition_by="(retention_days, toStartOfDay(start_timestamp))",
sample_by="cityHash64(chunk_id)",
settings={"index_granularity": "8192"},
storage_set=storage_set,
ttl="toDateTime(end_timestamp) + toIntervalDay(retention_days)",
),
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=storage_set,
table_name=dist_table_name,
columns=columns,
engine=table_engines.Distributed(
local_table_name=local_table_name,
sharding_key="cityHash64(profiler_id)",
),
target=OperationTarget.DISTRIBUTED,
),
]

def backwards_ops(self) -> Sequence[SqlOperation]:
return [
operations.DropTable(
storage_set=storage_set,
table_name=dist_table_name,
target=OperationTarget.DISTRIBUTED,
),
operations.DropTable(
storage_set=storage_set,
table_name=local_table_name,
target=OperationTarget.LOCAL,
),
]

0 comments on commit b4a2ffd

Please sign in to comment.