Skip to content

Commit

Permalink
feat(spans): Set origin_timestamp for spans (#5367)
Browse files Browse the repository at this point in the history
  • Loading branch information
phacops authored Jan 10, 2024
1 parent 7d07e32 commit 0fa6894
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 15 deletions.
3 changes: 2 additions & 1 deletion rust_snuba/src/processors/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub fn process_message(
) -> anyhow::Result<InsertBatch> {
let payload_bytes = payload.payload().context("Expected payload")?;
let msg: InputMessage = serde_json::from_slice(payload_bytes)?;

let functions = msg.functions.iter().map(|from| {
Function {
profile_id: msg.profile_id,
Expand All @@ -39,8 +40,8 @@ pub fn process_message(
});

Ok(InsertBatch {
rows: RowData::from_rows(functions)?,
origin_timestamp: DateTime::from_timestamp(msg.received, 0),
rows: RowData::from_rows(functions)?,
sentry_received_timestamp: None,
})
}
Expand Down
8 changes: 5 additions & 3 deletions rust_snuba/src/processors/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,26 @@ use rust_arroyo::backends::kafka::types::KafkaPayload;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::processors::utils::enforce_retention;
use crate::types::{InsertBatch, KafkaMessageMetadata, RowData};

pub fn process_message(
payload: KafkaPayload,
metadata: KafkaMessageMetadata,
_config: &ProcessorConfig,
config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
let payload_bytes = payload.payload().context("Expected payload")?;
let mut msg: ProfileMessage = serde_json::from_slice(payload_bytes)?;

msg.retention_days = Some(enforce_retention(msg.retention_days, &config.env_config));
msg.offset = metadata.offset;
msg.partition = metadata.partition;

let origin_timestamp = DateTime::from_timestamp(msg.received, 0);

Ok(InsertBatch {
rows: RowData::from_rows([msg])?,
origin_timestamp,
rows: RowData::from_rows([msg])?,
sentry_received_timestamp: None,
})
}
Expand Down Expand Up @@ -50,7 +52,7 @@ struct ProfileMessage {
profile_id: Uuid,
project_id: u64,
received: i64,
retention_days: u32,
retention_days: Option<u16>,
trace_id: Uuid,
transaction_id: Uuid,
transaction_name: String,
Expand Down
31 changes: 20 additions & 11 deletions rust_snuba/src/processors/spans.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::collections::BTreeMap;

use anyhow::Context;
use chrono::DateTime;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;

use crate::config::ProcessorConfig;
use crate::processors::utils::{enforce_retention, hex_to_u64};
use crate::types::{InsertBatch, KafkaMessageMetadata};
use crate::types::{InsertBatch, KafkaMessageMetadata, RowData};

pub fn process_message(
payload: KafkaPayload,
Expand All @@ -18,13 +19,18 @@ pub fn process_message(
let payload_bytes = payload.payload().context("Expected payload")?;
let msg: FromSpanMessage = serde_json::from_slice(payload_bytes)?;

let origin_timestamp = DateTime::from_timestamp(msg.received as i64, 0);
let mut span: Span = msg.try_into()?;
span.retention_days = Some(enforce_retention(span.retention_days, &config.env_config));

span.retention_days = Some(enforce_retention(span.retention_days, &config.env_config));
span.offset = metadata.offset;
span.partition = metadata.partition;

InsertBatch::from_rows([span])
Ok(InsertBatch {
origin_timestamp,
rows: RowData::from_rows([span])?,
sentry_received_timestamp: None,
})
}

#[derive(Debug, Default, Deserialize)]
Expand All @@ -43,6 +49,7 @@ struct FromSpanMessage {
parent_span_id: u64,
profile_id: Option<Uuid>,
project_id: u64,
received: f64,
retention_days: Option<u16>,
#[serde(default, deserialize_with = "hex_to_u64")]
segment_id: u64,
Expand Down Expand Up @@ -411,6 +418,7 @@ mod tests {
parent_span_id: Option<String>,
profile_id: Option<Uuid>,
project_id: Option<u64>,
received: Option<f64>,
retention_days: Option<u16>,
segment_id: Option<String>,
sentry_tags: TestSentryTags,
Expand All @@ -431,15 +439,8 @@ mod tests {
profile_id: Some(Uuid::new_v4()),
project_id: Some(1),
retention_days: Some(90),
received: Some(1691105878.720),
segment_id: Some("deadbeefdeadbeef".into()),
span_id: Some("deadbeefdeadbeef".into()),
start_timestamp_ms: Some(1691105878720),
trace_id: Some(Uuid::new_v4()),
tags: Some(BTreeMap::from([
("tag1".into(), "value1".into()),
("tag2".into(), "123".into()),
("tag3".into(), "true".into()),
])),
sentry_tags: TestSentryTags {
action: Some("GET".into()),
domain: Some("targetdomain.tld:targetport".into()),
Expand All @@ -454,6 +455,14 @@ mod tests {
transaction_method: Some("GET".into()),
transaction_op: Some("navigation".into()),
},
span_id: Some("deadbeefdeadbeef".into()),
start_timestamp_ms: Some(1691105878720),
tags: Some(BTreeMap::from([
("tag1".into(), "value1".into()),
("tag2".into(), "123".into()),
("tag3".into(), "true".into()),
])),
trace_id: Some(Uuid::new_v4()),
}
}

Expand Down

0 comments on commit 0fa6894

Please sign in to comment.