Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into ref/buffer-received-at
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Oct 30, 2024
2 parents 7165f00 + a9c0a49 commit 73c5f9e
Show file tree
Hide file tree
Showing 31 changed files with 340 additions and 309 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@
- Allow profile chunks without release. ([#4155](https://github.com/getsentry/relay/pull/4155))
- Add validation for timestamps sent from the future. ([#4163](https://github.com/getsentry/relay/pull/4163))


**Features:**

- Retain empty string values in `span.data` and `event.contexts.trace.data`. ([#4174](https://github.com/getsentry/relay/pull/4174))

**Internal:**

- Add a metric that counts span volume in the root project for dynamic sampling (`c:spans/count_per_root_project@none`). ([#4134](https://github.com/getsentry/relay/pull/4134))
- Add a tag `target_project_id` to both root project metrics for dynamic sampling (`c:transactions/count_per_root_project@none` and `c:spans/count_per_root_project@none`) which shows the flow trace traffic from root to target projects. ([#4170](https://github.com/getsentry/relay/pull/4170))
- Use `DateTime<Utc>` instead of `Instant` for tracking the received time of the `Envelope`. ([#4184](https://github.com/getsentry/relay/pull/4184))

## 24.10.0

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ for more information.

## Development

To build Relay, we require the **latest stable Rust**. The crate is split into a
To build Relay, we require the **latest stable Rust** (install via [rustup](https://rustup.rs/)). The crate is split into a
workspace with multiple features, so when running building or running tests
always make sure to pass the `--all` and `--all-features` flags.
The `processing` feature additionally requires a C compiler and CMake.
Expand Down Expand Up @@ -75,6 +75,9 @@ Depending on the configuration, you may need to have a local instance of Sentry
running.

```bash
# Update rust
rustup update

# Initialize Relay for the first time
cargo run --all-features -- config init

Expand Down
5 changes: 0 additions & 5 deletions relay-common/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ pub fn instant_to_system_time(instant: Instant) -> SystemTime {
SystemTime::now() - instant.elapsed()
}

/// Converts an `Instant` into a `DateTime`.
pub fn instant_to_date_time(instant: Instant) -> chrono::DateTime<chrono::Utc> {
instant_to_system_time(instant).into()
}

/// Returns the number of milliseconds contained by this `Duration` as `f64`.
///
/// The returned value does include the fractional (nanosecond) part of the duration.
Expand Down
21 changes: 11 additions & 10 deletions relay-event-schema/src/protocol/contexts/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub struct TraceContext {
pub sampled: Annotated<bool>,

/// Data of the trace's root span.
#[metastructure(pii = "maybe", skip_serialization = "empty")]
#[metastructure(pii = "maybe", skip_serialization = "null")]
pub data: Annotated<SpanData>,

/// Additional arbitrary fields for forwards compatibility.
Expand Down Expand Up @@ -197,7 +197,8 @@ mod tests {
"tok": "test"
},
"custom_field": "something"
}
},
"custom_field_empty": ""
},
"other": "value",
"type": "trace"
Expand All @@ -222,15 +223,15 @@ mod tests {
);
map
}),
other: {
let mut map = Object::new();
map.insert(
"custom_field".into(),
Annotated::new(Value::String("something".into())),
);
map
},
other: Object::from([(
"custom_field".into(),
Annotated::new(Value::String("something".into())),
)]),
}),
other: Object::from([(
"custom_field_empty".into(),
Annotated::new(Value::String("".into())),
)]),
..Default::default()
}),
other: {
Expand Down
36 changes: 35 additions & 1 deletion relay-event-schema/src/protocol/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ pub struct SpanData {
additional_properties,
pii = "true",
retain = "true",
skip_serialization = "empty"
skip_serialization = "null" // applies to child elements
)]
pub other: Object<Value>,
}
Expand Down Expand Up @@ -931,4 +931,38 @@ mod tests {
assert_eq!(data.get_value("code\\.namespace"), Some(Val::String("ns")));
assert_eq!(data.get_value("unknown"), None);
}

#[test]
fn test_span_data_empty_well_known_field() {
let span = r#"{
"data": {
"lcp.url": ""
}
}"#;
let span: Annotated<Span> = Annotated::from_json(span).unwrap();
assert_eq!(span.to_json().unwrap(), r#"{"data":{"lcp.url":""}}"#);
}

#[test]
fn test_span_data_empty_custom_field() {
let span = r#"{
"data": {
"custom_field_empty": ""
}
}"#;
let span: Annotated<Span> = Annotated::from_json(span).unwrap();
assert_eq!(
span.to_json().unwrap(),
r#"{"data":{"custom_field_empty":""}}"#
);
}

#[test]
fn test_span_data_completely_empty() {
let span = r#"{
"data": {}
}"#;
let span: Annotated<Span> = Annotated::from_json(span).unwrap();
assert_eq!(span.to_json().unwrap(), r#"{"data":{}}"#);
}
}
5 changes: 3 additions & 2 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use bytes::Bytes;
use chrono::Utc;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use relay_config::Config;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{Pool, Sqlite};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use tempfile::TempDir;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -66,7 +67,7 @@ fn mock_envelope_with_project_key(project_key: &ProjectKey, size: &str) -> Box<E
));

let mut envelope = Envelope::parse_bytes(bytes).unwrap();
envelope.set_start_time(Instant::now());
envelope.set_received_at(Utc::now());
envelope
}

Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/endpoints/batch_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use axum::http::StatusCode;
use axum::response::IntoResponse;
use serde::{Deserialize, Serialize};

use crate::extractors::{SignedBytes, StartTime};
use crate::extractors::{ReceivedAt, SignedBytes};
use crate::service::ServiceState;
use crate::services::processor::ProcessBatchedMetrics;
use crate::services::projects::cache::BucketSource;
Expand All @@ -12,7 +12,7 @@ struct SendMetricsResponse {}

pub async fn handle(
state: ServiceState,
start_time: StartTime,
ReceivedAt(received_at): ReceivedAt,
body: SignedBytes,
) -> impl IntoResponse {
if !body.relay.internal {
Expand All @@ -22,7 +22,7 @@ pub async fn handle(
state.processor().send(ProcessBatchedMetrics {
payload: body.body,
source: BucketSource::Internal,
start_time: start_time.into_inner(),
received_at,
sent_at: None,
});

Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ fn queue_envelope(
relay_log::trace!("sending metrics into processing queue");
state.project_cache().send(ProcessMetrics {
data: MetricData::Raw(metric_items.into_vec()),
start_time: envelope.meta().start_time().into(),
received_at: envelope.received_at(),
sent_at: envelope.sent_at(),
project_key: envelope.meta().public_key(),
source: envelope.meta().into(),
Expand Down
21 changes: 14 additions & 7 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::collections::BTreeMap;
use std::fmt;
use std::io::{self, Write};
use std::ops::AddAssign;
use std::time::Instant;
use std::time::Duration;
use uuid::Uuid;

use bytes::Bytes;
Expand Down Expand Up @@ -1219,10 +1219,17 @@ impl Envelope {
}

/// Returns the time at which the envelope was received at this Relay.
///
/// This is the date time equivalent to [`start_time`](Self::start_time).
pub fn received_at(&self) -> DateTime<Utc> {
relay_common::time::instant_to_date_time(self.meta().start_time())
self.meta().received_at()
}

/// Returns the time elapsed in seconds since the envelope was received by this Relay.
///
/// In case the elapsed time is negative, it is assumed that no time elapsed.
pub fn age(&self) -> Duration {
(Utc::now() - self.received_at())
.to_std()
.unwrap_or(Duration::ZERO)
}

/// Sets the event id on the envelope.
Expand All @@ -1235,9 +1242,9 @@ impl Envelope {
self.headers.sent_at = Some(sent_at);
}

/// Sets the start time to the provided `Instant`.
pub fn set_start_time(&mut self, start_time: Instant) {
self.headers.meta.set_start_time(start_time)
/// Sets the received at to the provided `DateTime`.
pub fn set_received_at(&mut self, start_time: DateTime<Utc>) {
self.headers.meta.set_received_at(start_time)
}

/// Sets the data retention in days for items in this envelope.
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/extractors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
mod content_type;
mod forwarded_for;
mod mime;
mod received_at;
mod remote;
mod request_meta;
mod signed_json;
mod start_time;

pub use self::content_type::*;
pub use self::forwarded_for::*;
pub use self::mime::*;
pub use self::received_at::*;
pub use self::remote::*;
pub use self::request_meta::*;
pub use self::signed_json::*;
pub use self::start_time::*;
32 changes: 32 additions & 0 deletions relay-server/src/extractors/received_at.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::convert::Infallible;

use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use axum::Extension;
use chrono::{DateTime, Utc};

/// The time at which the request started.
#[derive(Clone, Copy, Debug)]
pub struct ReceivedAt(pub DateTime<Utc>);

impl ReceivedAt {
pub fn now() -> Self {
Self(Utc::now())
}
}

#[axum::async_trait]
impl<S> FromRequestParts<S> for ReceivedAt
where
S: Send + Sync,
{
type Rejection = Infallible;

async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
let Extension(start_time) = Extension::from_request_parts(parts, state)
.await
.expect("ReceivedAt middleware is not configured");

Ok(start_time)
}
}
Loading

0 comments on commit 73c5f9e

Please sign in to comment.