diff --git a/rust_snuba/src/processors/mod.rs b/rust_snuba/src/processors/mod.rs index 5c88e5e0b9..ce0e30328a 100644 --- a/rust_snuba/src/processors/mod.rs +++ b/rust_snuba/src/processors/mod.rs @@ -3,6 +3,7 @@ mod functions; mod generic_metrics; mod metrics_summaries; mod outcomes; +mod profile_chunks; mod profiles; mod querylog; mod release_health_metrics; @@ -62,6 +63,7 @@ define_processing_functions! { ("GenericGaugesMetricsProcessor", "snuba-generic-metrics", ProcessingFunctionType::ProcessingFunction(generic_metrics::process_gauge_message)), ("PolymorphicMetricsProcessor", "snuba-metrics", ProcessingFunctionType::ProcessingFunction(release_health_metrics::process_metrics_message)), ("ErrorsProcessor", "events", ProcessingFunctionType::ProcessingFunctionWithReplacements(errors::process_message_with_replacement)), + ("ProfileChunksProcessor", "snuba-profile-chunks", ProcessingFunctionType::ProcessingFunction(profile_chunks::process_message)), } // COGS is recorded for these processors diff --git a/rust_snuba/src/processors/profile_chunks.rs b/rust_snuba/src/processors/profile_chunks.rs new file mode 100644 index 0000000000..de8aaf81f6 --- /dev/null +++ b/rust_snuba/src/processors/profile_chunks.rs @@ -0,0 +1,91 @@ +use crate::config::ProcessorConfig; +use anyhow::Context; +use chrono::DateTime; +use rust_arroyo::backends::kafka::types::KafkaPayload; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::processors::utils::enforce_retention; +use crate::types::{InsertBatch, KafkaMessageMetadata}; + +pub fn process_message( + payload: KafkaPayload, + metadata: KafkaMessageMetadata, + config: &ProcessorConfig, +) -> anyhow::Result { + let payload_bytes = payload.payload().context("Expected payload")?; + let msg: InputMessage = serde_json::from_slice(payload_bytes)?; + + let mut row = Chunk { + chunk: msg, + offset: metadata.offset, + partition: metadata.partition, + }; + + row.chunk.retention_days = Some(enforce_retention( + row.chunk.retention_days, + &config.env_config, + )); + + let origin_timestamp = DateTime::from_timestamp(row.chunk.received, 0); + + InsertBatch::from_rows([row], origin_timestamp) +} + +#[derive(Debug, Deserialize, Serialize, JsonSchema)] +struct InputMessage { + project_id: u64, + profiler_id: Uuid, + chunk_id: Uuid, + start_timestamp: f64, + end_timestamp: f64, + received: i64, + retention_days: Option, +} + +#[derive(Debug, Deserialize, Serialize, JsonSchema)] +struct Chunk { + #[serde(flatten)] + chunk: InputMessage, + + #[serde(default)] + offset: u64, + #[serde(default)] + partition: u16, +} + +#[cfg(test)] +mod tests { + use std::time::SystemTime; + + use crate::processors::tests::run_schema_type_test; + + use super::*; + + #[test] + fn test_chunk() { + let data = r#"{ + "chunk_id": "0432a0a4c25f4697bf9f0a2fcbe6a814", + "end_timestamp": 1710805689.1234567, + "profiler_id": "4d229f1d3807421ba62a5f8bc295d836", + "project_id": 1, + "received": 1694357860, + "retention_days": 30, + "start_timestamp": 1710805688.1234567 + }"#; + let payload = KafkaPayload::new(None, None, Some(data.as_bytes().to_vec())); + let meta = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + process_message(payload, meta, &ProcessorConfig::default()) + .expect("The message should be processed"); + } + + #[test] + fn schema() { + run_schema_type_test::("snuba-profile-chunks", None); + } +} diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-profile-chunks-ProfileChunksProcessor-snuba-profile-chunks__1__valid.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-profile-chunks-ProfileChunksProcessor-snuba-profile-chunks__1__valid.json.snap new file mode 100644 index 0000000000..11056110ae --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-profile-chunks-ProfileChunksProcessor-snuba-profile-chunks__1__valid.json.snap @@ -0,0 +1,18 @@ +--- +source: src/processors/mod.rs +description: "{\n \"project_id\": 1,\n \"profiler_id\": \"4d229f1d3807421ba62a5f8bc295d836\",\n \"chunk_id\": \"0432a0a4c25f4697bf9f0a2fcbe6a814\",\n \"start_timestamp\": 1710805688.1234567,\n \"end_timestamp\": 1710805689.1234567,\n \"received\": 1694357860,\n \"retention_days\": 30\n}\n" +expression: snapshot_payload +--- +[ + { + "chunk_id": "0432a0a4-c25f-4697-bf9f-0a2fcbe6a814", + "end_timestamp": 1710805689.1234567, + "offset": 1, + "partition": 0, + "profiler_id": "4d229f1d-3807-421b-a62a-5f8bc295d836", + "project_id": 1, + "received": 1694357860, + "retention_days": 30, + "start_timestamp": 1710805688.1234567 + } +] diff --git a/snuba/datasets/configuration/profiles/storages/chunks.yaml b/snuba/datasets/configuration/profiles/storages/chunks.yaml new file mode 100644 index 0000000000..ecc28404a9 --- /dev/null +++ b/snuba/datasets/configuration/profiles/storages/chunks.yaml @@ -0,0 +1,55 @@ +version: v1 +kind: writable_storage +name: profile_chunks +storage: + key: profile_chunks + set_key: profile_chunks +readiness_state: partial +schema: + columns: + [ + { name: project_id, type: UInt, args: { size: 64 } }, + { name: profiler_id, type: UUID }, + { name: chunk_id, type: UUID }, + { name: start_timestamp, type: DateTime64, args: { precision: 6 } }, + { name: end_timestamp, type: DateTime64, args: { precision: 6 } }, + { name: retention_days, type: UInt, args: { size: 16 } }, + { name: partition, type: UInt, args: { size: 16 } }, + { name: offset, type: UInt, args: { size: 64 } }, + ] + local_table_name: profile_chunks_local + dist_table_name: profile_chunks_dist +query_processors: + - processor: UUIDColumnProcessor + args: + columns: !!set + profiler_id: null + chunk_id: null + - processor: TableRateLimit +allocation_policies: + - name: ConcurrentRateLimitAllocationPolicy + args: + required_tenant_types: + - referrer + - project_id + default_config_overrides: + is_enforced: 1 + - name: BytesScannedWindowAllocationPolicy + args: + required_tenant_types: + - referrer + default_config_overrides: + is_enforced: 1 + throttled_thread_number: 1 + org_limit_bytes_scanned: 100000 + - name: ReferrerGuardRailPolicy + args: + required_tenant_types: + - referrer + default_config_overrides: + is_enforced: 1 +mandatory_condition_checkers: + - condition: ProjectIdEnforcer +stream_loader: + processor: ProfileChunksProcessor + default_topic: snuba-profile-chunks diff --git a/snuba/datasets/processors/profile_chunks_processor.py b/snuba/datasets/processors/profile_chunks_processor.py new file mode 100644 index 0000000000..6513d80caa --- /dev/null +++ b/snuba/datasets/processors/profile_chunks_processor.py @@ -0,0 +1,6 @@ +from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor + + +class ProfileChunksProcessor(RustCompatProcessor): + def __init__(self) -> None: + super().__init__("ProfileChunksProcessor") diff --git a/snuba/utils/streams/topics.py b/snuba/utils/streams/topics.py index 7bb26411e2..1d985a1ac1 100644 --- a/snuba/utils/streams/topics.py +++ b/snuba/utils/streams/topics.py @@ -39,8 +39,11 @@ class Topic(Enum): SUBSCRIPTION_RESULTS_GENERIC_METRICS = "generic-metrics-subscription-results" QUERYLOG = "snuba-queries" + PROFILES = "processed-profiles" PROFILES_FUNCTIONS = "profiles-call-tree" + PROFILE_CHUNKS = "snuba-profile-chunks" + REPLAYEVENTS = "ingest-replay-events" GENERIC_METRICS = "snuba-generic-metrics" GENERIC_METRICS_SETS_COMMIT_LOG = "snuba-generic-metrics-sets-commit-log"