Skip to content

Commit

Permalink
ref(processor): Introduce processing groups pipelines (#2862)
Browse files Browse the repository at this point in the history
This change makes another step forward to typed processing pipeline:
* split the envelope when possible into different processing groups
  * if transaction, all the item kept together in the same envelope
  * all the events which create event also kept together
* the rest of the items are move into separate groups, depending on the target
  • Loading branch information
olksdr committed Jan 12, 2024
1 parent aa272ed commit 29985ca
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 96 deletions.
363 changes: 349 additions & 14 deletions relay-server/src/actors/processor.rs

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions relay-server/src/actors/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ fn compute_sampling_decision(
///
/// This execution of dynamic sampling is technically a "simulation" since we will use the result
/// only for tagging errors and not for actually sampling incoming events.
fn tag_error_with_sampling_decision(state: &mut ProcessEnvelopeState, config: &Config) {
pub fn tag_error_with_sampling_decision(state: &mut ProcessEnvelopeState, config: &Config) {
let (Some(dsc), Some(event)) = (
state.managed_envelope.envelope().dsc(),
state.event.value_mut(),
Expand Down Expand Up @@ -240,7 +240,7 @@ mod tests {
use relay_sampling::evaluation::{ReservoirCounters, SamplingMatch};
use uuid::Uuid;

use crate::actors::processor::ProcessEnvelope;
use crate::actors::processor::{ProcessEnvelope, ProcessingGroup};
use crate::actors::project::ProjectState;
use crate::envelope::{ContentType, Envelope, Item, ItemType};
use crate::extractors::RequestMeta;
Expand Down Expand Up @@ -274,6 +274,7 @@ mod tests {
}
}

/// Always sets the processing item type to event.
fn process_envelope_with_root_project_state(
envelope: Box<Envelope>,
sampling_project_state: Option<Arc<ProjectState>>,
Expand Down Expand Up @@ -428,6 +429,7 @@ mod tests {
TestSemaphore::new(42).try_acquire().unwrap(),
outcome_aggregator.clone(),
test_store.clone(),
ProcessingGroup::Ungrouped,
),
profile_id: None,
event_metrics_extracted: false,
Expand Down
2 changes: 2 additions & 0 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,7 @@ mod tests {
use relay_test::mock_service;
use uuid::Uuid;

use crate::actors::processor::ProcessingGroup;
use crate::testutils::empty_envelope;

use super::*;
Expand Down Expand Up @@ -1177,6 +1178,7 @@ mod tests {
empty_envelope(),
services.outcome_aggregator.clone(),
services.test_store.clone(),
ProcessingGroup::Ungrouped,
)
.unwrap();
let message = ValidateEnvelope { envelope };
Expand Down
28 changes: 19 additions & 9 deletions relay-server/src/actors/spooler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use tokio::fs::DirBuilder;
use tokio::sync::mpsc;

use crate::actors::outcome::TrackOutcome;
use crate::actors::processor::ProcessingGroup;
use crate::actors::project_cache::{ProjectCache, UpdateBufferIndex};
use crate::actors::test_store::TestStore;
use crate::envelope::{Envelope, EnvelopeError};
Expand Down Expand Up @@ -386,7 +387,7 @@ impl OnDisk {
&self,
row: SqliteRow,
services: &Services,
) -> Result<ManagedEnvelope, BufferError> {
) -> Result<Vec<ManagedEnvelope>, BufferError> {
let envelope_row: Vec<u8> = row.try_get("envelope").map_err(BufferError::FetchFailed)?;
let envelope_bytes = bytes::Bytes::from(envelope_row);
let mut envelope = Envelope::parse_bytes(envelope_bytes)?;
Expand All @@ -398,12 +399,18 @@ impl OnDisk {

envelope.set_start_time(start_time.into_inner());

let managed_envelope = self.buffer_guard.enter(
envelope,
services.outcome_aggregator.clone(),
services.test_store.clone(),
)?;
Ok(managed_envelope)
ProcessingGroup::split_envelope(*envelope)
.into_iter()
.map(|(group, envelope)| {
let managed_envelope = self.buffer_guard.enter(
envelope,
services.outcome_aggregator.clone(),
services.test_store.clone(),
group,
)?;
Ok(managed_envelope)
})
.collect()
}

/// Tries to delete the envelopes from the persistent buffer in batches,
Expand Down Expand Up @@ -467,8 +474,10 @@ impl OnDisk {
};

match self.extract_envelope(envelope, services) {
Ok(managed_envelope) => {
sender.send(managed_envelope).ok();
Ok(managed_envelopes) => {
for managed_envelope in managed_envelopes {
sender.send(managed_envelope).ok();
}
}
Err(err) => relay_log::error!(
error = &err as &dyn Error,
Expand Down Expand Up @@ -1139,6 +1148,7 @@ mod tests {
empty_envelope(),
services.outcome_aggregator,
services.test_store,
ProcessingGroup::Ungrouped,
)
.unwrap();

Expand Down
43 changes: 11 additions & 32 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use relay_statsd::metric;
use serde::Deserialize;

use crate::actors::outcome::{DiscardReason, Outcome};
use crate::actors::processor::{ProcessMetricMeta, ProcessMetrics};
use crate::actors::processor::{ProcessMetricMeta, ProcessMetrics, ProcessingGroup};
use crate::actors::project_cache::{CheckEnvelope, ValidateEnvelope};
use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items};
use crate::service::ServiceState;
Expand Down Expand Up @@ -286,44 +286,20 @@ fn queue_envelope(
})
}

// Take all NEL reports and split them up into the separate envelopes with 1 item per
// envelope.
for nel_envelope in envelope.split_all_by(|item| matches!(item.ty(), ItemType::Nel)) {
relay_log::trace!("queueing separate envelopes for NEL report");
let buffer_guard = state.buffer_guard();
let nel_envelope = buffer_guard
// Split off the envelopes by item type.
let envelopes = ProcessingGroup::split_envelope(*managed_envelope.take_envelope());
for (group, envelope) in envelopes {
let envelope = buffer_guard
.enter(
nel_envelope,
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
group,
)
.map_err(BadStoreRequest::QueueFailed)?;
state
.project_cache()
.send(ValidateEnvelope::new(nel_envelope));
}

// Split the envelope into event-related items and other items. This allows to fast-track:
// 1. Envelopes with only session items. They only require rate limiting.
// 2. Event envelope processing can bail out if the event is filtered or rate limited,
// since all items depend on this event.
if let Some(event_envelope) = envelope.split_by(Item::requires_event) {
relay_log::trace!("queueing separate envelope for non-event items");

// The envelope has been split, so we need to fork the context.
let event_context = buffer_guard.enter(
event_envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
)?;
state
.project_cache()
.send(ValidateEnvelope::new(event_context));
state.project_cache().send(ValidateEnvelope::new(envelope));
}

// Update the old context before continuing with the source envelope.
managed_envelope.update();

if managed_envelope.envelope().is_empty() {
// The envelope can be empty here if it contained only metrics items which were removed
// above. In this case, the envelope was accepted and needs no further queueing.
Expand Down Expand Up @@ -356,6 +332,9 @@ pub async fn handle_envelope(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
// It's not clear at this point which group this envelope belongs to.
// The decission will be made while queueing in `queue_envelope` function.
ProcessingGroup::Ungrouped,
)
.map_err(BadStoreRequest::QueueFailed)?;

Expand Down
56 changes: 21 additions & 35 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use relay_quotas::DataCategory;
use relay_sampling::DynamicSamplingContext;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use smallvec::SmallVec;

use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::extractors::{PartialMeta, RequestMeta};
Expand Down Expand Up @@ -945,6 +945,11 @@ pub struct Envelope {
}

impl Envelope {
/// Creates an envelope from the provided parts.
pub fn from_parts(headers: EnvelopeHeaders, items: Items) -> Box<Self> {
Box::new(Self { items, headers })
}

/// Creates an envelope from request information.
pub fn from_request(event_id: Option<EventId>, meta: RequestMeta) -> Box<Self> {
Box::new(Self {
Expand Down Expand Up @@ -1000,6 +1005,11 @@ impl Envelope {
}
}

/// Returns reference to the [`EnvelopeHeaders`].
pub fn headers(&self) -> &EnvelopeHeaders {
&self.headers
}

/// Returns the number of items in this envelope.
#[allow(dead_code)]
pub fn len(&self) -> usize {
Expand Down Expand Up @@ -1045,6 +1055,11 @@ impl Envelope {
self.headers.sent_at
}

/// Sets the event id on the envelope.
pub fn set_event_id(&mut self, event_id: EventId) {
self.headers.event_id = Some(event_id);
}

/// Sets the timestamp at which an envelope is sent to the upstream.
pub fn set_sent_at(&mut self, sent_at: DateTime<Utc>) {
self.headers.sent_at = Some(sent_at);
Expand Down Expand Up @@ -1110,7 +1125,7 @@ impl Envelope {
self.items.iter_mut()
}

/// Returns the an option with a reference to the first item that matches
/// Returns an option with a reference to the first item that matches
/// the predicate, or None if the predicate is not matched by any item.
pub fn get_item_by<F>(&self, mut pred: F) -> Option<&Item>
where
Expand All @@ -1119,7 +1134,7 @@ impl Envelope {
self.items().find(|item| pred(item))
}

/// Returns the an option with a mutable reference to the first item that matches
/// Returns an option with a mutable reference to the first item that matches
/// the predicate, or None if the predicate is not matched by any item.
pub fn get_item_by_mut<F>(&mut self, mut pred: F) -> Option<&mut Item>
where
Expand Down Expand Up @@ -1152,8 +1167,9 @@ impl Envelope {

/// Splits off the items from the envelope using provided predicates.
///
/// First predicate is the the additional condition on the count of found items by second
/// First predicate is the additional condition on the count of found items by second
/// predicate.
#[cfg(test)]
fn split_off_items<C, F>(&mut self, cond: C, mut f: F) -> Option<SmallVec<[Item; 3]>>
where
C: Fn(usize) -> bool,
Expand All @@ -1178,6 +1194,7 @@ impl Envelope {
/// with all items that return `true`. Items that return `false` remain in this envelope.
///
/// The returned envelope assumes the same headers.
#[cfg(test)]
pub fn split_by<F>(&mut self, f: F) -> Option<Box<Self>>
where
F: FnMut(&Item) -> bool,
Expand All @@ -1190,37 +1207,6 @@ impl Envelope {
}))
}

/// Splits the envelope by the given predicate.
///
/// The main differents from `split_by()` is this function returns the list of the newly
/// constracted envelopes with all the items where the predicate returns `true`. Otherwise it
/// returns an empty list.
///
/// The returned envelopes assume the same headers.
pub fn split_all_by<F>(&mut self, f: F) -> SmallVec<[Box<Self>; 3]>
where
F: FnMut(&Item) -> bool,
{
let mut envelopes = smallvec![];
let Some(split_items) = self.split_off_items(|count| count == 0, f) else {
return envelopes;
};

let headers = &mut self.headers;

for item in split_items {
// Each item should get an envelope with the new event id.
headers.event_id = Some(EventId::new());

envelopes.push(Box::new(Envelope {
items: smallvec![item],
headers: headers.clone(),
}))
}

envelopes
}

/// Retains only the items specified by the predicate.
///
/// In other words, remove all elements where `f(&item)` returns `false`. This method operates
Expand Down
4 changes: 3 additions & 1 deletion relay-server/src/testutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ pub fn empty_envelope() -> Box<Envelope> {
.parse()
.unwrap();

Envelope::from_request(Some(EventId::new()), RequestMeta::new(dsn))
let mut envelope = Envelope::from_request(Some(EventId::new()), RequestMeta::new(dsn));
envelope.add_item(Item::new(ItemType::Event));
envelope
}

pub fn create_test_processor(config: Config) -> EnvelopeProcessorService {
Expand Down
3 changes: 3 additions & 0 deletions relay-server/src/utils/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fmt;
use relay_system::Addr;

use crate::actors::outcome::TrackOutcome;
use crate::actors::processor::ProcessingGroup;
use crate::actors::test_store::TestStore;
use crate::envelope::Envelope;
use crate::statsd::RelayHistograms;
Expand Down Expand Up @@ -92,6 +93,7 @@ impl BufferGuard {
envelope: Box<Envelope>,
outcome_aggregator: Addr<TrackOutcome>,
test_store: Addr<TestStore>,
group: ProcessingGroup,
) -> Result<ManagedEnvelope, BufferError> {
let permit = self.inner.try_acquire().ok_or(BufferError)?;

Expand All @@ -109,6 +111,7 @@ impl BufferGuard {
permit,
outcome_aggregator,
test_store,
group,
))
}
}
Loading

0 comments on commit 29985ca

Please sign in to comment.