Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(profiles): Add a dataset to store profile chunks #5923

Merged
merged 21 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b813c29
feat: Add DateTime64 column type
phacops May 12, 2024
c08623f
Add tests and validation
phacops May 12, 2024
9f5d4e1
Add support for precision and timezone
phacops May 12, 2024
41b5051
Fix typing
phacops May 12, 2024
efb88ce
Fix schema validation
phacops May 12, 2024
e0f5dd8
feat(profiles): Add a dataset to store profile chunks
phacops May 12, 2024
3da03e1
Fix typo in column name
phacops May 12, 2024
b9b425c
Use microsecond precision
phacops May 12, 2024
95dac57
Fix DateTime64 argument name and convert end_timestamp to DateTime in…
phacops May 12, 2024
dbba7e4
Hash profiler_id in sort key as it's a UUID and we need integers
phacops May 12, 2024
e4112c9
Use PROFILES storage set since it's going to be on the same cluster
phacops May 13, 2024
e09551c
Switch sample key to be based on chunk_id
phacops May 13, 2024
aaccc60
Remove unneeded constant
phacops May 13, 2024
0a5af69
Fix wrong storage set for migration group
phacops May 13, 2024
fb419d4
Revert "Fix wrong storage set for migration group"
phacops May 14, 2024
6520246
Revert "Use PROFILES storage set since it's going to be on the same c…
phacops May 14, 2024
82964dd
Remove version column
phacops May 14, 2024
3849033
Merge branch 'master' into pierre/continuous-profiling-dataset-migration
phacops May 14, 2024
5ad30f7
Remove end_timestamp from the sort key
phacops May 14, 2024
5f3f490
Merge branch 'master' into pierre/continuous-profiling-dataset-migration
phacops May 14, 2024
2f8f5eb
Merge branch 'master' into pierre/continuous-profiling-dataset-migrat…
phacops May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions snuba/clusters/storage_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"SPANS": "spans",
"GROUP_ATTRIBUTES": "group_attributes",
"METRICS_SUMMARIES": "metrics_summaries",
"PROFILE_CHUNKS": "profile_chunks",
}


Expand Down
10 changes: 10 additions & 0 deletions snuba/migrations/group_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,13 @@ def get_migrations(self) -> Sequence[str]:
"0002_metrics_summaries_add_tags_hashmap",
"0003_metrics_summaries_add_segment_id_duration_group_columns",
]


class ProfileChunksLoader(DirectoryLoader):
def __init__(self) -> None:
super().__init__("snuba.snuba_migrations.profile_chunks")

def get_migrations(self) -> Sequence[str]:
return [
"0001_create_profile_chunks_table",
]
8 changes: 8 additions & 0 deletions snuba/migrations/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
MetricsLoader,
MetricsSummariesLoader,
OutcomesLoader,
ProfileChunksLoader,
ProfilesLoader,
QuerylogLoader,
ReplaysLoader,
Expand Down Expand Up @@ -43,6 +44,7 @@ class MigrationGroup(Enum):
SPANS = "spans"
GROUP_ATTRIBUTES = "group_attributes"
METRICS_SUMMARIES = "metrics_summaries"
PROFILE_CHUNKS = "profile_chunks"


# Migration groups are mandatory by default. Specific groups can
Expand All @@ -59,6 +61,7 @@ class MigrationGroup(Enum):
MigrationGroup.SEARCH_ISSUES,
MigrationGroup.GROUP_ATTRIBUTES,
MigrationGroup.METRICS_SUMMARIES,
MigrationGroup.PROFILE_CHUNKS,
}


Expand Down Expand Up @@ -169,6 +172,11 @@ def __init__(
storage_sets_keys={StorageSetKey.METRICS_SUMMARIES},
readiness_state=ReadinessState.PARTIAL,
),
MigrationGroup.PROFILE_CHUNKS: _MigrationGroup(
loader=ProfileChunksLoader(),
storage_sets_keys={StorageSetKey.PROFILE_CHUNKS},
readiness_state=ReadinessState.PARTIAL,
),
}


Expand Down
1 change: 1 addition & 0 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
"group_attributes",
"generic_metrics_gauges",
"metrics_summaries",
"profile_chunks",
},
"single_node": True,
},
Expand Down
1 change: 1 addition & 0 deletions snuba/settings/settings_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"group_attributes",
"generic_metrics_gauges",
"metrics_summaries",
"profile_chunks",
},
"single_node": False,
"cluster_name": "cluster_one_sh",
Expand Down
1 change: 1 addition & 0 deletions snuba/settings/settings_test_distributed_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"group_attributes",
"generic_metrics_gauges",
"metrics_summaries",
"profile_chunks",
},
"single_node": False,
"cluster_name": "storage_cluster",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from typing import List, Sequence

from snuba.clickhouse.columns import UUID, Column, DateTime64, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget, SqlOperation

storage_set = StorageSetKey.PROFILE_CHUNKS
table_prefix = "profile_chunks"
local_table_name = f"{table_prefix}_local"
dist_table_name = f"{table_prefix}_dist"

columns: List[Column[Modifiers]] = [
Column("project_id", UInt(64)),
Column("profiler_id", UUID()),
Column("chunk_id", UUID()),
Column(
"start_timestamp",
DateTime64(
precision=6,
modifiers=Modifiers(codecs=["DoubleDelta"]),
),
),
Column(
"end_timestamp",
DateTime64(
precision=6,
modifiers=Modifiers(codecs=["DoubleDelta"]),
),
),
Column("retention_days", UInt(16)),
Column("partition", UInt(16)),
Column("offset", UInt(64)),
]


class Migration(migration.ClickhouseNodeMigration):
blocking = False

def forwards_ops(self) -> Sequence[SqlOperation]:
return [
operations.CreateTable(
storage_set=storage_set,
table_name=local_table_name,
columns=columns,
engine=table_engines.ReplacingMergeTree(
order_by="(project_id, profiler_id, start_timestamp, cityHash64(chunk_id))",
partition_by="(retention_days, toStartOfDay(start_timestamp))",
sample_by="cityHash64(chunk_id)",
settings={"index_granularity": "8192"},
storage_set=storage_set,
ttl="toDateTime(end_timestamp) + toIntervalDay(retention_days)",
),
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=storage_set,
table_name=dist_table_name,
columns=columns,
engine=table_engines.Distributed(
local_table_name=local_table_name,
sharding_key="cityHash64(profiler_id)",
),
target=OperationTarget.DISTRIBUTED,
),
]

def backwards_ops(self) -> Sequence[SqlOperation]:
return [
operations.DropTable(
storage_set=storage_set,
table_name=dist_table_name,
target=OperationTarget.DISTRIBUTED,
),
operations.DropTable(
storage_set=storage_set,
table_name=local_table_name,
target=OperationTarget.LOCAL,
),
]
Loading