Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(profiles): Add dataset configurations and processor for profile chunks #5897

Merged
merged 35 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b813c29
feat: Add DateTime64 column type
phacops May 12, 2024
c08623f
Add tests and validation
phacops May 12, 2024
9f5d4e1
Add support for precision and timezone
phacops May 12, 2024
41b5051
Fix typing
phacops May 12, 2024
efb88ce
Fix schema validation
phacops May 12, 2024
e0f5dd8
feat(profiles): Add a dataset to store profile chunks
phacops May 12, 2024
3da03e1
Fix typo in column name
phacops May 12, 2024
b9b425c
Use microsecond precision
phacops May 12, 2024
95dac57
Fix DateTime64 argument name and convert end_timestamp to DateTime in…
phacops May 12, 2024
dbba7e4
Hash profiler_id in sort key as it's a UUID and we need integers
phacops May 12, 2024
e4112c9
Use PROFILES storage set since it's going to be on the same cluster
phacops May 13, 2024
e09551c
Switch sample key to be based on chunk_id
phacops May 13, 2024
aaccc60
Remove unneeded constant
phacops May 13, 2024
0a5af69
Fix wrong storage set for migration group
phacops May 13, 2024
fb419d4
Revert "Fix wrong storage set for migration group"
phacops May 14, 2024
6520246
Revert "Use PROFILES storage set since it's going to be on the same c…
phacops May 14, 2024
82964dd
Remove version column
phacops May 14, 2024
3849033
Merge branch 'master' into pierre/continuous-profiling-dataset-migration
phacops May 14, 2024
5ad30f7
Remove end_timestamp from the sort key
phacops May 14, 2024
5f3f490
Merge branch 'master' into pierre/continuous-profiling-dataset-migration
phacops May 14, 2024
a4d65b3
feat(profiles): Add configuration and processor for profile chunks
phacops May 12, 2024
034e065
Fix tests
phacops May 12, 2024
afebdba
Fix DateTime64 columns configuration
phacops May 12, 2024
427d612
Set no query processor
phacops May 12, 2024
5cf460a
Bump sentry-kafka-schemas to v0.1.82
phacops May 14, 2024
efc9ef7
Enforce all allocation policies
phacops May 14, 2024
f545710
Remove entity configuration as it's not needed by default
phacops May 14, 2024
2f2c532
Merge branch 'master' into pierre/continuous-profiling-dataset-config…
phacops May 16, 2024
11b7d12
Merge branch 'master' into pierre/continuous-profiling-dataset-config…
phacops May 16, 2024
5624875
Merge branch 'master' into pierre/continuous-profiling-dataset-config…
phacops May 16, 2024
e1a791d
Remove entity from dataset configuration since it's not defined
phacops May 16, 2024
6fa093b
Merge branch 'master' into pierre/continuous-profiling-dataset-config…
phacops May 16, 2024
30e2986
Fix Cargo.toml after bad merge
phacops May 16, 2024
8e64a40
Revert "Fix Cargo.toml after bad merge"
phacops May 16, 2024
45bb330
Revert change to Cargo.toml
phacops May 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pyo3 = { version = "0.18.1", features = ["chrono"] }
reqwest = { version = "0.11.11", features = ["stream"] }
rust_arroyo = { version = "*", git = "https://github.com/getsentry/arroyo" }
sentry = { version = "0.32.0", features = ["anyhow", "tracing"] }
sentry-kafka-schemas = "0.1.81"
sentry-kafka-schemas = "0.1.82"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
thiserror = "1.0"
Expand Down
2 changes: 2 additions & 0 deletions rust_snuba/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod functions;
mod generic_metrics;
mod metrics_summaries;
mod outcomes;
mod profile_chunks;
mod profiles;
mod querylog;
mod release_health_metrics;
Expand Down Expand Up @@ -62,6 +63,7 @@ define_processing_functions! {
("GenericGaugesMetricsProcessor", "snuba-generic-metrics", ProcessingFunctionType::ProcessingFunction(generic_metrics::process_gauge_message)),
("PolymorphicMetricsProcessor", "snuba-metrics", ProcessingFunctionType::ProcessingFunction(release_health_metrics::process_metrics_message)),
("ErrorsProcessor", "events", ProcessingFunctionType::ProcessingFunctionWithReplacements(errors::process_message_with_replacement)),
("ProfileChunksProcessor", "snuba-profile-chunks", ProcessingFunctionType::ProcessingFunction(profile_chunks::process_message)),
}

// COGS is recorded for these processors
Expand Down
91 changes: 91 additions & 0 deletions rust_snuba/src/processors/profile_chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use crate::config::ProcessorConfig;
use anyhow::Context;
use chrono::DateTime;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::processors::utils::enforce_retention;
use crate::types::{InsertBatch, KafkaMessageMetadata};

pub fn process_message(
payload: KafkaPayload,
metadata: KafkaMessageMetadata,
config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
let payload_bytes = payload.payload().context("Expected payload")?;
let msg: InputMessage = serde_json::from_slice(payload_bytes)?;

let mut row = Chunk {
chunk: msg,
offset: metadata.offset,
partition: metadata.partition,
};

row.chunk.retention_days = Some(enforce_retention(
row.chunk.retention_days,
&config.env_config,
));

let origin_timestamp = DateTime::from_timestamp(row.chunk.received, 0);

InsertBatch::from_rows([row], origin_timestamp)
}

#[derive(Debug, Deserialize, Serialize, JsonSchema)]
struct InputMessage {
project_id: u64,
profiler_id: Uuid,
chunk_id: Uuid,
start_timestamp: f64,
end_timestamp: f64,
received: i64,
retention_days: Option<u16>,
}

#[derive(Debug, Deserialize, Serialize, JsonSchema)]
struct Chunk {
#[serde(flatten)]
chunk: InputMessage,

#[serde(default)]
offset: u64,
#[serde(default)]
partition: u16,
}

#[cfg(test)]
mod tests {
use std::time::SystemTime;

use crate::processors::tests::run_schema_type_test;

use super::*;

#[test]
fn test_chunk() {
let data = r#"{
"chunk_id": "0432a0a4c25f4697bf9f0a2fcbe6a814",
"end_timestamp": 1710805689.1234567,
"profiler_id": "4d229f1d3807421ba62a5f8bc295d836",
"project_id": 1,
"received": 1694357860,
"retention_days": 30,
"start_timestamp": 1710805688.1234567
}"#;
let payload = KafkaPayload::new(None, None, Some(data.as_bytes().to_vec()));
let meta = KafkaMessageMetadata {
partition: 0,
offset: 1,
timestamp: DateTime::from(SystemTime::now()),
};
process_message(payload, meta, &ProcessorConfig::default())
.expect("The message should be processed");
}

#[test]
fn schema() {
run_schema_type_test::<InputMessage>("snuba-profile-chunks", None);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
source: src/processors/mod.rs
description: "{\n \"project_id\": 1,\n \"profiler_id\": \"4d229f1d3807421ba62a5f8bc295d836\",\n \"chunk_id\": \"0432a0a4c25f4697bf9f0a2fcbe6a814\",\n \"start_timestamp\": 1710805688.1234567,\n \"end_timestamp\": 1710805689.1234567,\n \"received\": 1694357860,\n \"retention_days\": 30\n}\n"
expression: snapshot_payload
---
[
{
"chunk_id": "0432a0a4-c25f-4697-bf9f-0a2fcbe6a814",
"end_timestamp": 1710805689.1234567,
"offset": 1,
"partition": 0,
"profiler_id": "4d229f1d-3807-421b-a62a-5f8bc295d836",
"project_id": 1,
"received": 1694357860,
"retention_days": 30,
"start_timestamp": 1710805688.1234567
}
]
55 changes: 55 additions & 0 deletions snuba/datasets/configuration/profiles/storages/chunks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
version: v1
kind: writable_storage
name: profile_chunks
storage:
key: profile_chunks
set_key: profile_chunks
readiness_state: partial
schema:
columns:
[
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: profiler_id, type: UUID },
{ name: chunk_id, type: UUID },
{ name: start_timestamp, type: DateTime64, args: { precision: 6 } },
{ name: end_timestamp, type: DateTime64, args: { precision: 6 } },
{ name: retention_days, type: UInt, args: { size: 16 } },
{ name: partition, type: UInt, args: { size: 16 } },
{ name: offset, type: UInt, args: { size: 64 } },
]
local_table_name: profile_chunks_local
dist_table_name: profile_chunks_dist
query_processors:
- processor: UUIDColumnProcessor
args:
columns: !!set
profiler_id: null
chunk_id: null
- processor: TableRateLimit
allocation_policies:
- name: ConcurrentRateLimitAllocationPolicy
args:
required_tenant_types:
- referrer
- project_id
default_config_overrides:
is_enforced: 1
- name: BytesScannedWindowAllocationPolicy
args:
required_tenant_types:
- referrer
default_config_overrides:
is_enforced: 1
throttled_thread_number: 1
org_limit_bytes_scanned: 100000
- name: ReferrerGuardRailPolicy
args:
required_tenant_types:
- referrer
default_config_overrides:
is_enforced: 1
mandatory_condition_checkers:
- condition: ProjectIdEnforcer
stream_loader:
processor: ProfileChunksProcessor
default_topic: snuba-profile-chunks
6 changes: 6 additions & 0 deletions snuba/datasets/processors/profile_chunks_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor


class ProfileChunksProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("ProfileChunksProcessor")
3 changes: 3 additions & 0 deletions snuba/utils/streams/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ class Topic(Enum):
SUBSCRIPTION_RESULTS_GENERIC_METRICS = "generic-metrics-subscription-results"

QUERYLOG = "snuba-queries"

PROFILES = "processed-profiles"
PROFILES_FUNCTIONS = "profiles-call-tree"
PROFILE_CHUNKS = "snuba-profile-chunks"

REPLAYEVENTS = "ingest-replay-events"
GENERIC_METRICS = "snuba-generic-metrics"
GENERIC_METRICS_SETS_COMMIT_LOG = "snuba-generic-metrics-sets-commit-log"
Expand Down
Loading